#!/usr/bin/env python3 import argparse import json import os import ssl import subprocess import sys import time import urllib.error import urllib.parse import urllib.request def api_context(): endpoint = os.environ["TF_VAR_proxmox_endpoint"].strip().removesuffix("/api2/json").rstrip("/") token_id = os.environ["TF_VAR_proxmox_api_token_id"] token_secret = os.environ["TF_VAR_proxmox_api_token_secret"] insecure = os.environ.get("TF_VAR_proxmox_insecure", "false").lower() == "true" context = ssl._create_unverified_context() if insecure else None headers = {"Authorization": f"PVEAPIToken={token_id}={token_secret}"} return endpoint, context, headers ENDPOINT, SSL_CONTEXT, HEADERS = api_context() def request(method, path, data=None, timeout=60): body = None headers = dict(HEADERS) if data is not None: encoded = urllib.parse.urlencode(data) if method == "DELETE": path = f"{path}?{encoded}" else: body = encoded.encode() headers["Content-Type"] = "application/x-www-form-urlencoded" req = urllib.request.Request( f"{ENDPOINT}/api2/json{path}", method=method, headers=headers, data=body, ) with urllib.request.urlopen(req, context=SSL_CONTEXT, timeout=timeout) as resp: return resp.read() def is_missing_vm_error(err): return err.code == 404 or (err.code == 500 and "conf' does not exist" in err.reason) def vm_exists(target): try: request("GET", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/status/current") return True except urllib.error.HTTPError as err: if is_missing_vm_error(err): return False raise def vm_config(target): try: raw = request("GET", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/config") except urllib.error.HTTPError as err: if is_missing_vm_error(err): return {} raise return json.loads(raw).get("data", {}) def wait_absent(target): for _ in range(60): if not vm_exists(target): return time.sleep(5) raise RuntimeError(f"VM {target['vm_id']} still exists after delete") def normalize_target(raw, address=None): initialization = raw.get("initialization") or [] cloud_init_storage = raw.get("cloud_init_storage") if not cloud_init_storage and initialization and isinstance(initialization, list): cloud_init_storage = (initialization[0] or {}).get("datastore_id") return { "address": address or raw.get("address"), "name": raw["name"], "vm_id": int(raw["vm_id"]), "node_name": raw.get("node_name") or os.environ.get("TF_VAR_proxmox_node_name", "flex"), "cloud_init_storage": cloud_init_storage or os.environ.get("TF_VAR_proxmox_cloud_init_storage_pool", "Flash"), "tags": raw.get("tags") or [], "description": raw.get("description") or "", } def targets_from_plan(terraform_dir, plan_path): result = subprocess.run( ["terraform", "-chdir=" + terraform_dir, "show", "-json", plan_path], check=True, text=True, stdout=subprocess.PIPE, ) plan = json.loads(result.stdout) targets = [] for change in plan.get("resource_changes", []): if change.get("type") != "proxmox_virtual_environment_vm": continue after = (change.get("change") or {}).get("after") or {} if not after.get("name") or after.get("vm_id") is None: continue targets.append(normalize_target(after, change.get("address"))) return targets def targets_from_output(terraform_dir): result = subprocess.run( ["terraform", "-chdir=" + terraform_dir, "output", "-json", "proxmox_target_vms"], check=True, text=True, stdout=subprocess.PIPE, ) return [normalize_target(target) for target in json.loads(result.stdout)] def targets_from_file(path): with open(path, encoding="utf-8") as handle: data = json.load(handle) if isinstance(data, dict) and "proxmox_target_vms" in data: data = data["proxmox_target_vms"]["value"] return [normalize_target(target) for target in data] def load_targets(args): if args.targets_file: return targets_from_file(args.targets_file) if args.plan: return targets_from_plan(args.terraform_dir, args.plan) return targets_from_output(args.terraform_dir) def terraform_state(terraform_dir): result = subprocess.run( ["terraform", "-chdir=" + terraform_dir, "state", "list"], check=False, text=True, stdout=subprocess.PIPE, ) return set(result.stdout.splitlines()) def tags_from_config(config): raw = config.get("tags") or "" if isinstance(raw, list): return set(raw) return {tag for tag in raw.split(";") if tag} def assert_owned(target, config): actual_name = config.get("name") if actual_name != target["name"]: raise RuntimeError( f"Refusing to delete VM {target['vm_id']}: expected name {target['name']!r}, got {actual_name!r}" ) tags = tags_from_config(config) expected_tags = set(target.get("tags") or []) description = config.get("description") or "" expected_description = target.get("description") or "" has_expected_tags = bool(expected_tags) and expected_tags.issubset(tags) has_expected_description = bool(expected_description) and description == expected_description if not has_expected_tags and not has_expected_description: raise RuntimeError( f"Refusing to delete VM {target['vm_id']} ({target['name']}): ownership tags/description do not match" ) def delete_cloud_init(target): volume = urllib.parse.quote( f"{target['cloud_init_storage']}:vm-{target['vm_id']}-cloudinit", safe="", ) try: request( "DELETE", f"/nodes/{target['node_name']}/storage/{target['cloud_init_storage']}/content/{volume}", ) print(f"Deleted orphan cloud-init volume for VM {target['vm_id']}") except urllib.error.HTTPError as err: if err.code == 404: print(f"No orphan cloud-init volume for VM {target['vm_id']}") return raise def delete_vm(target): config = vm_config(target) assert_owned(target, config) print(f"Deleting Terraform-owned VM {target['vm_id']} ({target['name']})") try: request("POST", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/status/stop") time.sleep(10) except urllib.error.HTTPError as err: if err.code not in (400, 500): raise request( "DELETE", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}", {"purge": "1", "destroy-unreferenced-disks": "1"}, ) wait_absent(target) delete_cloud_init(target) def cleanup_orphan_cloud_init(targets): for target in targets: if vm_exists(target): print(f"VM {target['vm_id']} exists; keeping cloud-init volume") continue delete_cloud_init(target) def cleanup_untracked_vms(targets, terraform_dir): state = terraform_state(terraform_dir) for target in targets: if target.get("address") and target["address"] in state: continue if not vm_exists(target): continue delete_vm(target) def cleanup_post_destroy(targets): remaining = [] for target in targets: if vm_exists(target): delete_vm(target) if vm_exists(target): remaining.append(f"{target['vm_id']} ({target['name']})") if remaining: raise RuntimeError("Target VMs still exist after cleanup: " + ", ".join(remaining)) def main(): parser = argparse.ArgumentParser() parser.add_argument("--mode", choices=("orphan-cloudinit", "untracked-vms", "post-destroy"), required=True) parser.add_argument("--terraform-dir", default="terraform") parser.add_argument("--plan") parser.add_argument("--targets-file") args = parser.parse_args() targets = load_targets(args) if not targets: print("No Proxmox target VMs found") return if args.mode == "orphan-cloudinit": cleanup_orphan_cloud_init(targets) elif args.mode == "untracked-vms": cleanup_untracked_vms(targets, args.terraform_dir) else: cleanup_post_destroy(targets) if __name__ == "__main__": try: main() except Exception as err: print(f"ERROR: {err}", file=sys.stderr) sys.exit(1)