276 lines
8.5 KiB
Python
276 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import ssl
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
|
|
def api_context():
|
|
endpoint = os.environ["TF_VAR_proxmox_endpoint"].strip().removesuffix("/api2/json").rstrip("/")
|
|
token_id = os.environ["TF_VAR_proxmox_api_token_id"]
|
|
token_secret = os.environ["TF_VAR_proxmox_api_token_secret"]
|
|
insecure = os.environ.get("TF_VAR_proxmox_insecure", "false").lower() == "true"
|
|
context = ssl._create_unverified_context() if insecure else None
|
|
headers = {"Authorization": f"PVEAPIToken={token_id}={token_secret}"}
|
|
return endpoint, context, headers
|
|
|
|
|
|
ENDPOINT, SSL_CONTEXT, HEADERS = api_context()
|
|
|
|
|
|
def request(method, path, data=None, timeout=60):
|
|
body = None
|
|
headers = dict(HEADERS)
|
|
if data is not None:
|
|
encoded = urllib.parse.urlencode(data)
|
|
if method == "DELETE":
|
|
path = f"{path}?{encoded}"
|
|
else:
|
|
body = encoded.encode()
|
|
headers["Content-Type"] = "application/x-www-form-urlencoded"
|
|
|
|
req = urllib.request.Request(
|
|
f"{ENDPOINT}/api2/json{path}",
|
|
method=method,
|
|
headers=headers,
|
|
data=body,
|
|
)
|
|
with urllib.request.urlopen(req, context=SSL_CONTEXT, timeout=timeout) as resp:
|
|
return resp.read()
|
|
|
|
|
|
def is_missing_vm_error(err):
|
|
return err.code == 404 or (err.code == 500 and "conf' does not exist" in err.reason)
|
|
|
|
|
|
def vm_exists(target):
|
|
try:
|
|
request("GET", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/status/current")
|
|
return True
|
|
except urllib.error.HTTPError as err:
|
|
if is_missing_vm_error(err):
|
|
return False
|
|
raise
|
|
|
|
|
|
def vm_config(target):
|
|
try:
|
|
raw = request("GET", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/config")
|
|
except urllib.error.HTTPError as err:
|
|
if is_missing_vm_error(err):
|
|
return {}
|
|
raise
|
|
return json.loads(raw).get("data", {})
|
|
|
|
|
|
def wait_absent(target):
|
|
for _ in range(60):
|
|
if not vm_exists(target):
|
|
return
|
|
time.sleep(5)
|
|
raise RuntimeError(f"VM {target['vm_id']} still exists after delete")
|
|
|
|
|
|
def normalize_target(raw, address=None):
|
|
initialization = raw.get("initialization") or []
|
|
cloud_init_storage = raw.get("cloud_init_storage")
|
|
if not cloud_init_storage and initialization and isinstance(initialization, list):
|
|
cloud_init_storage = (initialization[0] or {}).get("datastore_id")
|
|
|
|
return {
|
|
"address": address or raw.get("address"),
|
|
"name": raw["name"],
|
|
"vm_id": int(raw["vm_id"]),
|
|
"node_name": raw.get("node_name") or os.environ.get("TF_VAR_proxmox_node_name", "flex"),
|
|
"cloud_init_storage": cloud_init_storage
|
|
or os.environ.get("TF_VAR_proxmox_cloud_init_storage_pool", "Flash"),
|
|
"tags": raw.get("tags") or [],
|
|
"description": raw.get("description") or "",
|
|
}
|
|
|
|
|
|
def targets_from_plan(terraform_dir, plan_path):
|
|
result = subprocess.run(
|
|
["terraform", "-chdir=" + terraform_dir, "show", "-json", plan_path],
|
|
check=True,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
plan = json.loads(result.stdout)
|
|
targets = []
|
|
for change in plan.get("resource_changes", []):
|
|
if change.get("type") != "proxmox_virtual_environment_vm":
|
|
continue
|
|
after = (change.get("change") or {}).get("after") or {}
|
|
if not after.get("name") or after.get("vm_id") is None:
|
|
continue
|
|
targets.append(normalize_target(after, change.get("address")))
|
|
return targets
|
|
|
|
|
|
def targets_from_output(terraform_dir):
|
|
result = subprocess.run(
|
|
["terraform", "-chdir=" + terraform_dir, "output", "-json", "proxmox_target_vms"],
|
|
check=True,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
return [normalize_target(target) for target in json.loads(result.stdout)]
|
|
|
|
|
|
def targets_from_file(path):
|
|
with open(path, encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
if isinstance(data, dict) and "proxmox_target_vms" in data:
|
|
data = data["proxmox_target_vms"]["value"]
|
|
return [normalize_target(target) for target in data]
|
|
|
|
|
|
def load_targets(args):
|
|
if args.targets_file:
|
|
return targets_from_file(args.targets_file)
|
|
if args.plan:
|
|
return targets_from_plan(args.terraform_dir, args.plan)
|
|
return targets_from_output(args.terraform_dir)
|
|
|
|
|
|
def terraform_state(terraform_dir):
|
|
result = subprocess.run(
|
|
["terraform", "-chdir=" + terraform_dir, "state", "list"],
|
|
check=False,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
return set(result.stdout.splitlines())
|
|
|
|
|
|
def tags_from_config(config):
|
|
raw = config.get("tags") or ""
|
|
if isinstance(raw, list):
|
|
return set(raw)
|
|
return {tag for tag in raw.split(";") if tag}
|
|
|
|
|
|
def assert_owned(target, config):
|
|
actual_name = config.get("name")
|
|
if actual_name != target["name"]:
|
|
raise RuntimeError(
|
|
f"Refusing to delete VM {target['vm_id']}: expected name {target['name']!r}, got {actual_name!r}"
|
|
)
|
|
|
|
tags = tags_from_config(config)
|
|
expected_tags = set(target.get("tags") or [])
|
|
description = config.get("description") or ""
|
|
expected_description = target.get("description") or ""
|
|
has_expected_tags = bool(expected_tags) and expected_tags.issubset(tags)
|
|
has_expected_description = bool(expected_description) and description == expected_description
|
|
|
|
if not has_expected_tags and not has_expected_description:
|
|
raise RuntimeError(
|
|
f"Refusing to delete VM {target['vm_id']} ({target['name']}): ownership tags/description do not match"
|
|
)
|
|
|
|
|
|
def delete_cloud_init(target):
|
|
volume = urllib.parse.quote(
|
|
f"{target['cloud_init_storage']}:vm-{target['vm_id']}-cloudinit",
|
|
safe="",
|
|
)
|
|
try:
|
|
request(
|
|
"DELETE",
|
|
f"/nodes/{target['node_name']}/storage/{target['cloud_init_storage']}/content/{volume}",
|
|
)
|
|
print(f"Deleted orphan cloud-init volume for VM {target['vm_id']}")
|
|
except urllib.error.HTTPError as err:
|
|
if err.code == 404:
|
|
print(f"No orphan cloud-init volume for VM {target['vm_id']}")
|
|
return
|
|
raise
|
|
|
|
|
|
def delete_vm(target):
|
|
config = vm_config(target)
|
|
assert_owned(target, config)
|
|
print(f"Deleting Terraform-owned VM {target['vm_id']} ({target['name']})")
|
|
try:
|
|
request("POST", f"/nodes/{target['node_name']}/qemu/{target['vm_id']}/status/stop")
|
|
time.sleep(10)
|
|
except urllib.error.HTTPError as err:
|
|
if err.code not in (400, 500):
|
|
raise
|
|
|
|
request(
|
|
"DELETE",
|
|
f"/nodes/{target['node_name']}/qemu/{target['vm_id']}",
|
|
{"purge": "1", "destroy-unreferenced-disks": "1"},
|
|
)
|
|
wait_absent(target)
|
|
delete_cloud_init(target)
|
|
|
|
|
|
def cleanup_orphan_cloud_init(targets):
|
|
for target in targets:
|
|
if vm_exists(target):
|
|
print(f"VM {target['vm_id']} exists; keeping cloud-init volume")
|
|
continue
|
|
delete_cloud_init(target)
|
|
|
|
|
|
def cleanup_untracked_vms(targets, terraform_dir):
|
|
state = terraform_state(terraform_dir)
|
|
for target in targets:
|
|
if target.get("address") and target["address"] in state:
|
|
continue
|
|
if not vm_exists(target):
|
|
continue
|
|
delete_vm(target)
|
|
|
|
|
|
def cleanup_post_destroy(targets):
|
|
remaining = []
|
|
for target in targets:
|
|
if vm_exists(target):
|
|
delete_vm(target)
|
|
if vm_exists(target):
|
|
remaining.append(f"{target['vm_id']} ({target['name']})")
|
|
|
|
if remaining:
|
|
raise RuntimeError("Target VMs still exist after cleanup: " + ", ".join(remaining))
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--mode", choices=("orphan-cloudinit", "untracked-vms", "post-destroy"), required=True)
|
|
parser.add_argument("--terraform-dir", default="terraform")
|
|
parser.add_argument("--plan")
|
|
parser.add_argument("--targets-file")
|
|
args = parser.parse_args()
|
|
|
|
targets = load_targets(args)
|
|
if not targets:
|
|
print("No Proxmox target VMs found")
|
|
return
|
|
|
|
if args.mode == "orphan-cloudinit":
|
|
cleanup_orphan_cloud_init(targets)
|
|
elif args.mode == "untracked-vms":
|
|
cleanup_untracked_vms(targets, args.terraform_dir)
|
|
else:
|
|
cleanup_post_destroy(targets)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as err:
|
|
print(f"ERROR: {err}", file=sys.stderr)
|
|
sys.exit(1)
|