Files
HetznerTerra/scripts/proxmox-rebuild-cleanup.py
T
micqdf a33a993867
Deploy Grafana Content / Grafana Content (push) Failing after 1m14s
Deploy Cluster / Terraform (push) Failing after 4m59s
Deploy Cluster / Ansible (push) Has been skipped
fix: harden cluster rebuild determinism
2026-04-30 07:36:27 +00:00

276 lines
8.5 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import os
import ssl
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
def api_context():
endpoint = os.environ["TF_VAR_proxmox_endpoint"].strip().removesuffix("/api2/json").rstrip("/")
token_id = os.environ["TF_VAR_proxmox_api_token_id"]
token_secret = os.environ["TF_VAR_proxmox_api_token_secret"]
insecure = os.environ.get("TF_VAR_proxmox_insecure", "false").lower() == "true"
context = ssl._create_unverified_context() if insecure else None
headers = {"Authorization": f"PVEAPIToken={token_id}={token_secret}"}
return endpoint, context, headers
ENDPOINT, SSL_CONTEXT, HEADERS = api_context()
def request(method, path, data=None, timeout=60):
body = None
headers = dict(HEADERS)
if data is not None:
encoded = urllib.parse.urlencode(data)
if method == "DELETE":
path = f"{path}?{encoded}"
else:
body = encoded.encode()
headers["Content-Type"] = "application/x-www-form-urlencoded"
req = urllib.request.Request(
f"{ENDPOINT}/api2/json{path}",
method=method,
headers=headers,
data=body,
)
with urllib.request.urlopen(req, context=SSL_CONTEXT, timeout=timeout) as resp:
return resp.read()
def is_missing_vm_error(err):
return err.code == 404 or (err.code == 500 and "conf' does not exist" in err.reason)
def vm_exists(target):
try:
request("GET", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/status/current")
return True
except urllib.error.HTTPError as err:
if is_missing_vm_error(err):
return False
raise
def vm_config(target):
try:
raw = request("GET", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/config")
except urllib.error.HTTPError as err:
if is_missing_vm_error(err):
return {}
raise
return json.loads(raw).get("data", {})
def wait_absent(target):
for _ in range(60):
if not vm_exists(target):
return
time.sleep(5)
raise RuntimeError(f"VM {target['vm_id']} still exists after delete")
def normalize_target(raw, address=None):
initialization = raw.get("initialization") or []
cloud_init_storage = raw.get("cloud_init_storage")
if not cloud_init_storage and initialization and isinstance(initialization, list):
cloud_init_storage = (initialization[0] or {}).get("datastore_id")
return {
"address": address or raw.get("address"),
"name": raw["name"],
"vm_id": int(raw["vm_id"]),
"node_name": raw.get("node_name") or os.environ.get("TF_VAR_proxmox_node_name", "flex"),
"cloud_init_storage": cloud_init_storage
or os.environ.get("TF_VAR_proxmox_cloud_init_storage_pool", "Flash"),
"tags": raw.get("tags") or [],
"description": raw.get("description") or "",
}
def targets_from_plan(terraform_dir, plan_path):
result = subprocess.run(
["terraform", "-chdir=" + terraform_dir, "show", "-json", plan_path],
check=True,
text=True,
stdout=subprocess.PIPE,
)
plan = json.loads(result.stdout)
targets = []
for change in plan.get("resource_changes", []):
if change.get("type") != "proxmox_virtual_environment_vm":
continue
after = (change.get("change") or {}).get("after") or {}
if not after.get("name") or after.get("vm_id") is None:
continue
targets.append(normalize_target(after, change.get("address")))
return targets
def targets_from_output(terraform_dir):
result = subprocess.run(
["terraform", "-chdir=" + terraform_dir, "output", "-json", "proxmox_target_vms"],
check=True,
text=True,
stdout=subprocess.PIPE,
)
return [normalize_target(target) for target in json.loads(result.stdout)]
def targets_from_file(path):
with open(path, encoding="utf-8") as handle:
data = json.load(handle)
if isinstance(data, dict) and "proxmox_target_vms" in data:
data = data["proxmox_target_vms"]["value"]
return [normalize_target(target) for target in data]
def load_targets(args):
if args.targets_file:
return targets_from_file(args.targets_file)
if args.plan:
return targets_from_plan(args.terraform_dir, args.plan)
return targets_from_output(args.terraform_dir)
def terraform_state(terraform_dir):
result = subprocess.run(
["terraform", "-chdir=" + terraform_dir, "state", "list"],
check=False,
text=True,
stdout=subprocess.PIPE,
)
return set(result.stdout.splitlines())
def tags_from_config(config):
raw = config.get("tags") or ""
if isinstance(raw, list):
return set(raw)
return {tag for tag in raw.split(";") if tag}
def assert_owned(target, config):
actual_name = config.get("name")
if actual_name != target["name"]:
raise RuntimeError(
f"Refusing to delete VM {target['vm_id']}: expected name {target['name']!r}, got {actual_name!r}"
)
tags = tags_from_config(config)
expected_tags = set(target.get("tags") or [])
description = config.get("description") or ""
expected_description = target.get("description") or ""
has_expected_tags = bool(expected_tags) and expected_tags.issubset(tags)
has_expected_description = bool(expected_description) and description == expected_description
if not has_expected_tags and not has_expected_description:
raise RuntimeError(
f"Refusing to delete VM {target['vm_id']} ({target['name']}): ownership tags/description do not match"
)
def delete_cloud_init(target):
volume = urllib.parse.quote(
f"{target['cloud_init_storage']}:vm-{target['vm_id']}-cloudinit",
safe="",
)
try:
request(
"DELETE",
f"/nodes/{target['node_name']}/storage/{target['cloud_init_storage']}/content/{volume}",
)
print(f"Deleted orphan cloud-init volume for VM {target['vm_id']}")
except urllib.error.HTTPError as err:
if err.code == 404:
print(f"No orphan cloud-init volume for VM {target['vm_id']}")
return
raise
def delete_vm(target):
config = vm_config(target)
assert_owned(target, config)
print(f"Deleting Terraform-owned VM {target['vm_id']} ({target['name']})")
try:
request("POST", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/status/stop")
time.sleep(10)
except urllib.error.HTTPError as err:
if err.code not in (400, 500):
raise
request(
"DELETE",
f"/nodes/{target['node_name']}/qemu/{target['vm_id']}",
{"purge": "1", "destroy-unreferenced-disks": "1"},
)
wait_absent(target)
delete_cloud_init(target)
def cleanup_orphan_cloud_init(targets):
for target in targets:
if vm_exists(target):
print(f"VM {target['vm_id']} exists; keeping cloud-init volume")
continue
delete_cloud_init(target)
def cleanup_untracked_vms(targets, terraform_dir):
state = terraform_state(terraform_dir)
for target in targets:
if target.get("address") and target["address"] in state:
continue
if not vm_exists(target):
continue
delete_vm(target)
def cleanup_post_destroy(targets):
remaining = []
for target in targets:
if vm_exists(target):
delete_vm(target)
if vm_exists(target):
remaining.append(f"{target['vm_id']} ({target['name']})")
if remaining:
raise RuntimeError("Target VMs still exist after cleanup: " + ", ".join(remaining))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode", choices=("orphan-cloudinit", "untracked-vms", "post-destroy"), required=True)
parser.add_argument("--terraform-dir", default="terraform")
parser.add_argument("--plan")
parser.add_argument("--targets-file")
args = parser.parse_args()
targets = load_targets(args)
if not targets:
print("No Proxmox target VMs found")
return
if args.mode == "orphan-cloudinit":
cleanup_orphan_cloud_init(targets)
elif args.mode == "untracked-vms":
cleanup_untracked_vms(targets, args.terraform_dir)
else:
cleanup_post_destroy(targets)
if __name__ == "__main__":
try:
main()
except Exception as err:
print(f"ERROR: {err}", file=sys.stderr)
sys.exit(1)