Merge pull request 'refactor: add Python bootstrap controller with resumable state' (#99) from stage into master
Some checks failed
Terraform Apply / Terraform Apply (push) Failing after 11m46s
Some checks failed
Terraform Apply / Terraform Apply (push) Failing after 11m46s
Reviewed-on: #99
This commit was merged in pull request #99.
This commit is contained in:
@@ -13,11 +13,19 @@ This folder defines role-based NixOS configs for a kubeadm cluster.
|
||||
- Shared cluster defaults in `modules/k8s-cluster-settings.nix`
|
||||
- Role-specific settings for control planes and workers
|
||||
- Generated per-node host configs from `flake.nix` (no duplicated host files)
|
||||
- Bootstrap helper commands:
|
||||
- Bootstrap helper commands on each node:
|
||||
- `th-kubeadm-init`
|
||||
- `th-kubeadm-join-control-plane`
|
||||
- `th-kubeadm-join-worker`
|
||||
- `th-kubeadm-status`
|
||||
- A Python bootstrap controller for orchestration:
|
||||
- `bootstrap/controller.py`
|
||||
|
||||
## Layered architecture
|
||||
|
||||
- `terraform/`: VM lifecycle only
|
||||
- `nixos/kubeadm/modules/`: declarative node OS config only
|
||||
- `nixos/kubeadm/bootstrap/controller.py`: imperative cluster reconciliation state machine
|
||||
|
||||
## Hardware config files
|
||||
|
||||
@@ -114,6 +122,15 @@ FAST_MODE=1 WORKER_PARALLELISM=3 REBUILD_TIMEOUT=45m REBUILD_RETRIES=2 ./scripts
|
||||
- `FAST_MODE=1` skips pre-rebuild remote GC cleanup to reduce wall-clock time.
|
||||
- Set `FAST_MODE=0` for a slower but more aggressive space cleanup pass.
|
||||
|
||||
### Bootstrap controller state
|
||||
|
||||
The controller stores checkpoints in both places:
|
||||
|
||||
- Remote (source of truth): `/var/lib/terrahome/bootstrap-state.json` on `cp-1`
|
||||
- Local copy (workflow/debug artifact): `nixos/kubeadm/bootstrap/bootstrap-state-last.json`
|
||||
|
||||
This makes retries resumable and keeps failure context visible from CI.
|
||||
|
||||
3. If you only want to reset Kubernetes state on existing VMs:
|
||||
|
||||
```bash
|
||||
|
||||
431
nixos/kubeadm/bootstrap/controller.py
Executable file
431
nixos/kubeadm/bootstrap/controller.py
Executable file
@@ -0,0 +1,431 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
REMOTE_STATE_PATH = "/var/lib/terrahome/bootstrap-state.json"
|
||||
|
||||
|
||||
def run_local(cmd, check=True, capture=False):
|
||||
if isinstance(cmd, str):
|
||||
shell = True
|
||||
else:
|
||||
shell = False
|
||||
return subprocess.run(
|
||||
cmd,
|
||||
shell=shell,
|
||||
check=check,
|
||||
text=True,
|
||||
capture_output=capture,
|
||||
)
|
||||
|
||||
|
||||
def load_inventory(inventory_file):
|
||||
inventory_file = Path(inventory_file).resolve()
|
||||
if not inventory_file.exists():
|
||||
raise RuntimeError(f"Missing inventory file: {inventory_file}")
|
||||
cmd = (
|
||||
"set -a; "
|
||||
f"source {shlex.quote(str(inventory_file))}; "
|
||||
"python3 - <<'PY'\n"
|
||||
"import json, os\n"
|
||||
"print(json.dumps(dict(os.environ)))\n"
|
||||
"PY"
|
||||
)
|
||||
proc = run_local(["bash", "-lc", cmd], capture=True)
|
||||
env = json.loads(proc.stdout)
|
||||
|
||||
node_ips = {}
|
||||
cp_names = []
|
||||
wk_names = []
|
||||
|
||||
control_planes = env.get("CONTROL_PLANES", "").strip()
|
||||
workers = env.get("WORKERS", "").strip()
|
||||
|
||||
if control_planes:
|
||||
for pair in control_planes.split():
|
||||
name, ip = pair.split("=", 1)
|
||||
node_ips[name] = ip
|
||||
cp_names.append(name)
|
||||
else:
|
||||
for key in sorted(k for k in env if k.startswith("CP_") and k[3:].isdigit()):
|
||||
idx = key.split("_", 1)[1]
|
||||
name = f"cp-{idx}"
|
||||
node_ips[name] = env[key]
|
||||
cp_names.append(name)
|
||||
|
||||
if workers:
|
||||
for pair in workers.split():
|
||||
name, ip = pair.split("=", 1)
|
||||
node_ips[name] = ip
|
||||
wk_names.append(name)
|
||||
else:
|
||||
for key in sorted(k for k in env if k.startswith("WK_") and k[3:].isdigit()):
|
||||
idx = key.split("_", 1)[1]
|
||||
name = f"wk-{idx}"
|
||||
node_ips[name] = env[key]
|
||||
wk_names.append(name)
|
||||
|
||||
if not cp_names or not wk_names:
|
||||
raise RuntimeError("Inventory must include control planes and workers")
|
||||
|
||||
primary_cp = env.get("PRIMARY_CONTROL_PLANE", "cp-1")
|
||||
if primary_cp not in node_ips:
|
||||
primary_cp = cp_names[0]
|
||||
|
||||
return {
|
||||
"env": env,
|
||||
"node_ips": node_ips,
|
||||
"cp_names": cp_names,
|
||||
"wk_names": wk_names,
|
||||
"primary_cp": primary_cp,
|
||||
"inventory_file": str(inventory_file),
|
||||
}
|
||||
|
||||
|
||||
class Controller:
|
||||
def __init__(self, cfg):
|
||||
self.env = cfg["env"]
|
||||
self.node_ips = cfg["node_ips"]
|
||||
self.cp_names = cfg["cp_names"]
|
||||
self.wk_names = cfg["wk_names"]
|
||||
self.primary_cp = cfg["primary_cp"]
|
||||
self.primary_ip = self.node_ips[self.primary_cp]
|
||||
|
||||
self.script_dir = Path(__file__).resolve().parent
|
||||
self.flake_dir = Path(self.env.get("FLAKE_DIR") or (self.script_dir.parent)).resolve()
|
||||
self.local_state_path = self.script_dir / "bootstrap-state-last.json"
|
||||
|
||||
self.ssh_user = self.env.get("SSH_USER", "micqdf")
|
||||
self.ssh_candidates = self.env.get("SSH_USER_CANDIDATES", f"root {self.ssh_user}").split()
|
||||
self.active_ssh_user = self.ssh_user
|
||||
self.ssh_key = self.env.get("SSH_KEY_PATH", str(Path.home() / ".ssh" / "id_ed25519"))
|
||||
self.ssh_opts = [
|
||||
"-o",
|
||||
"BatchMode=yes",
|
||||
"-o",
|
||||
"IdentitiesOnly=yes",
|
||||
"-o",
|
||||
"StrictHostKeyChecking=accept-new",
|
||||
"-i",
|
||||
self.ssh_key,
|
||||
]
|
||||
|
||||
self.rebuild_timeout = self.env.get("REBUILD_TIMEOUT", "45m")
|
||||
self.rebuild_retries = int(self.env.get("REBUILD_RETRIES", "2"))
|
||||
self.worker_parallelism = int(self.env.get("WORKER_PARALLELISM", "3"))
|
||||
self.fast_mode = self.env.get("FAST_MODE", "1")
|
||||
|
||||
def log(self, msg):
|
||||
print(f"==> {msg}")
|
||||
|
||||
def _ssh(self, user, ip, cmd, check=True):
|
||||
full = ["ssh", *self.ssh_opts, f"{user}@{ip}", f"bash -lc {shlex.quote(cmd)}"]
|
||||
return run_local(full, check=check, capture=True)
|
||||
|
||||
def detect_user(self, ip):
|
||||
for user in self.ssh_candidates:
|
||||
proc = self._ssh(user, ip, "true", check=False)
|
||||
if proc.returncode == 0:
|
||||
self.active_ssh_user = user
|
||||
self.log(f"Using SSH user '{user}' for {ip}")
|
||||
return
|
||||
raise RuntimeError(f"Unable to authenticate to {ip} with users: {', '.join(self.ssh_candidates)}")
|
||||
|
||||
def remote(self, ip, cmd, check=True):
|
||||
ordered = [self.active_ssh_user] + [u for u in self.ssh_candidates if u != self.active_ssh_user]
|
||||
last = None
|
||||
for user in ordered:
|
||||
proc = self._ssh(user, ip, cmd, check=False)
|
||||
if proc.returncode == 0:
|
||||
self.active_ssh_user = user
|
||||
return proc
|
||||
if proc.returncode != 255:
|
||||
last = proc
|
||||
break
|
||||
last = proc
|
||||
if check:
|
||||
stdout = (last.stdout or "").strip()
|
||||
stderr = (last.stderr or "").strip()
|
||||
raise RuntimeError(f"Remote command failed on {ip}: {cmd}\n{stdout}\n{stderr}")
|
||||
return last
|
||||
|
||||
def prepare_known_hosts(self):
|
||||
ssh_dir = Path.home() / ".ssh"
|
||||
ssh_dir.mkdir(parents=True, exist_ok=True)
|
||||
(ssh_dir / "known_hosts").touch()
|
||||
run_local(["chmod", "700", str(ssh_dir)])
|
||||
run_local(["chmod", "600", str(ssh_dir / "known_hosts")])
|
||||
for ip in self.node_ips.values():
|
||||
run_local(["ssh-keygen", "-R", ip], check=False)
|
||||
run_local(f"ssh-keyscan -H {shlex.quote(ip)} >> {shlex.quote(str(ssh_dir / 'known_hosts'))}", check=False)
|
||||
|
||||
def get_state(self):
|
||||
proc = self.remote(
|
||||
self.primary_ip,
|
||||
"sudo test -f /var/lib/terrahome/bootstrap-state.json && sudo cat /var/lib/terrahome/bootstrap-state.json || echo '{}'",
|
||||
)
|
||||
try:
|
||||
state = json.loads(proc.stdout.strip() or "{}")
|
||||
except Exception:
|
||||
state = {}
|
||||
return state
|
||||
|
||||
def set_state(self, state):
|
||||
payload = json.dumps(state, sort_keys=True)
|
||||
b64 = base64.b64encode(payload.encode()).decode()
|
||||
self.remote(
|
||||
self.primary_ip,
|
||||
(
|
||||
"sudo mkdir -p /var/lib/terrahome && "
|
||||
f"echo {shlex.quote(b64)} | base64 -d | sudo tee {REMOTE_STATE_PATH} >/dev/null"
|
||||
),
|
||||
)
|
||||
self.local_state_path.write_text(payload + "\n", encoding="utf-8")
|
||||
|
||||
def mark_done(self, key):
|
||||
state = self.get_state()
|
||||
state[key] = True
|
||||
state["updated_at"] = int(time.time())
|
||||
self.set_state(state)
|
||||
|
||||
def stage_done(self, key):
|
||||
return bool(self.get_state().get(key))
|
||||
|
||||
def prepare_remote_nix(self, ip):
|
||||
self.remote(ip, "sudo mkdir -p /etc/nix")
|
||||
self.remote(ip, "if [ -f /etc/nix/nix.conf ]; then sudo sed -i '/^trusted-users[[:space:]]*=/d' /etc/nix/nix.conf; fi")
|
||||
self.remote(ip, "echo 'trusted-users = root micqdf' | sudo tee -a /etc/nix/nix.conf >/dev/null")
|
||||
self.remote(ip, "sudo systemctl restart nix-daemon 2>/dev/null || true")
|
||||
|
||||
def prepare_remote_kubelet(self, ip):
|
||||
self.remote(ip, "sudo systemctl stop kubelet >/dev/null 2>&1 || true")
|
||||
self.remote(ip, "sudo systemctl disable kubelet >/dev/null 2>&1 || true")
|
||||
self.remote(ip, "sudo systemctl mask kubelet >/dev/null 2>&1 || true")
|
||||
self.remote(ip, "sudo systemctl reset-failed kubelet >/dev/null 2>&1 || true")
|
||||
|
||||
def prepare_remote_space(self, ip):
|
||||
self.remote(ip, "sudo nix-collect-garbage -d || true")
|
||||
self.remote(ip, "sudo nix --extra-experimental-features nix-command store gc || true")
|
||||
self.remote(ip, "sudo rm -rf /tmp/nix* /tmp/nixos-rebuild* || true")
|
||||
|
||||
def rebuild_node_once(self, name, ip):
|
||||
self.detect_user(ip)
|
||||
cmd = [
|
||||
"timeout",
|
||||
self.rebuild_timeout,
|
||||
"nixos-rebuild",
|
||||
"switch",
|
||||
"--flake",
|
||||
f"{self.flake_dir}#{name}",
|
||||
"--target-host",
|
||||
f"{self.active_ssh_user}@{ip}",
|
||||
"--use-remote-sudo",
|
||||
]
|
||||
env = os.environ.copy()
|
||||
env["NIX_SSHOPTS"] = " ".join(self.ssh_opts)
|
||||
proc = subprocess.run(cmd, text=True, env=env)
|
||||
return proc.returncode == 0
|
||||
|
||||
def rebuild_with_retry(self, name, ip):
|
||||
max_attempts = self.rebuild_retries + 1
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
self.log(f"Rebuild attempt {attempt}/{max_attempts} for {name}")
|
||||
if self.rebuild_node_once(name, ip):
|
||||
return
|
||||
if attempt < max_attempts:
|
||||
self.log(f"Rebuild failed for {name}, retrying in 20s")
|
||||
time.sleep(20)
|
||||
raise RuntimeError(f"Rebuild failed permanently for {name}")
|
||||
|
||||
def stage_preflight(self):
|
||||
if self.stage_done("preflight_done"):
|
||||
self.log("Preflight already complete")
|
||||
return
|
||||
self.prepare_known_hosts()
|
||||
self.detect_user(self.primary_ip)
|
||||
self.mark_done("preflight_done")
|
||||
|
||||
def stage_rebuild(self):
|
||||
if self.stage_done("nodes_rebuilt"):
|
||||
self.log("Node rebuild already complete")
|
||||
return
|
||||
|
||||
self.detect_user(self.primary_ip)
|
||||
for name in self.cp_names:
|
||||
ip = self.node_ips[name]
|
||||
self.log(f"Preparing and rebuilding {name} ({ip})")
|
||||
self.prepare_remote_nix(ip)
|
||||
self.prepare_remote_kubelet(ip)
|
||||
if self.fast_mode != "1":
|
||||
self.prepare_remote_space(ip)
|
||||
self.rebuild_with_retry(name, ip)
|
||||
|
||||
for name in self.wk_names:
|
||||
ip = self.node_ips[name]
|
||||
self.log(f"Preparing {name} ({ip})")
|
||||
self.prepare_remote_nix(ip)
|
||||
self.prepare_remote_kubelet(ip)
|
||||
if self.fast_mode != "1":
|
||||
self.prepare_remote_space(ip)
|
||||
|
||||
failures = []
|
||||
with ThreadPoolExecutor(max_workers=self.worker_parallelism) as pool:
|
||||
futures = {pool.submit(self.rebuild_with_retry, name, self.node_ips[name]): name for name in self.wk_names}
|
||||
for fut in as_completed(futures):
|
||||
name = futures[fut]
|
||||
try:
|
||||
fut.result()
|
||||
except Exception as exc:
|
||||
failures.append((name, str(exc)))
|
||||
if failures:
|
||||
raise RuntimeError(f"Worker rebuild failures: {failures}")
|
||||
|
||||
self.mark_done("nodes_rebuilt")
|
||||
|
||||
def has_admin_conf(self):
|
||||
return self.remote(self.primary_ip, "sudo test -f /etc/kubernetes/admin.conf", check=False).returncode == 0
|
||||
|
||||
def cluster_ready(self):
|
||||
cmd = "sudo test -f /etc/kubernetes/admin.conf && sudo kubectl --kubeconfig /etc/kubernetes/admin.conf get --raw=/readyz >/dev/null 2>&1"
|
||||
return self.remote(self.primary_ip, cmd, check=False).returncode == 0
|
||||
|
||||
def stage_init_primary(self):
|
||||
if self.stage_done("primary_initialized"):
|
||||
self.log("Primary control plane init already complete")
|
||||
return
|
||||
if self.has_admin_conf() and self.cluster_ready():
|
||||
self.log("Existing cluster detected on primary control plane")
|
||||
else:
|
||||
self.log(f"Initializing primary control plane on {self.primary_cp}")
|
||||
self.remote(self.primary_ip, "sudo th-kubeadm-init")
|
||||
self.mark_done("primary_initialized")
|
||||
|
||||
def stage_install_cni(self):
|
||||
if self.stage_done("cni_installed"):
|
||||
self.log("CNI install already complete")
|
||||
return
|
||||
self.log("Installing or upgrading Cilium")
|
||||
self.remote(self.primary_ip, "helm repo add cilium https://helm.cilium.io >/dev/null 2>&1 || true")
|
||||
self.remote(self.primary_ip, "helm repo update >/dev/null")
|
||||
self.remote(self.primary_ip, "kubectl create namespace kube-system >/dev/null 2>&1 || true")
|
||||
self.remote(
|
||||
self.primary_ip,
|
||||
"helm upgrade --install cilium cilium/cilium --namespace kube-system --set kubeProxyReplacement=true",
|
||||
)
|
||||
self.mark_done("cni_installed")
|
||||
|
||||
def cluster_has_node(self, name):
|
||||
cmd = f"sudo kubectl --kubeconfig /etc/kubernetes/admin.conf get node {shlex.quote(name)} >/dev/null 2>&1"
|
||||
return self.remote(self.primary_ip, cmd, check=False).returncode == 0
|
||||
|
||||
def build_join_cmds(self):
|
||||
if not self.has_admin_conf():
|
||||
self.remote(self.primary_ip, "sudo th-kubeadm-init")
|
||||
join_cmd = self.remote(
|
||||
self.primary_ip,
|
||||
"sudo KUBECONFIG=/etc/kubernetes/admin.conf kubeadm token create --print-join-command",
|
||||
).stdout.strip()
|
||||
cert_key = self.remote(
|
||||
self.primary_ip,
|
||||
"sudo KUBECONFIG=/etc/kubernetes/admin.conf kubeadm init phase upload-certs --upload-certs | tail -n 1",
|
||||
).stdout.strip()
|
||||
cp_join = f"{join_cmd} --control-plane --certificate-key {cert_key}"
|
||||
return join_cmd, cp_join
|
||||
|
||||
def stage_join_control_planes(self):
|
||||
if self.stage_done("control_planes_joined"):
|
||||
self.log("Control-plane join already complete")
|
||||
return
|
||||
_, cp_join = self.build_join_cmds()
|
||||
encoded = base64.b64encode(cp_join.encode()).decode()
|
||||
for node in self.cp_names:
|
||||
if node == self.primary_cp:
|
||||
continue
|
||||
if self.cluster_has_node(node):
|
||||
self.log(f"{node} already joined")
|
||||
continue
|
||||
self.log(f"Joining control plane {node}")
|
||||
ip = self.node_ips[node]
|
||||
self.remote(ip, f"sudo th-kubeadm-join-control-plane \"$(echo {encoded} | base64 -d)\"")
|
||||
self.mark_done("control_planes_joined")
|
||||
|
||||
def stage_join_workers(self):
|
||||
if self.stage_done("workers_joined"):
|
||||
self.log("Worker join already complete")
|
||||
return
|
||||
join_cmd, _ = self.build_join_cmds()
|
||||
encoded = base64.b64encode(join_cmd.encode()).decode()
|
||||
for node in self.wk_names:
|
||||
if self.cluster_has_node(node):
|
||||
self.log(f"{node} already joined")
|
||||
continue
|
||||
self.log(f"Joining worker {node}")
|
||||
ip = self.node_ips[node]
|
||||
self.remote(ip, f"sudo th-kubeadm-join-worker \"$(echo {encoded} | base64 -d)\"")
|
||||
self.mark_done("workers_joined")
|
||||
|
||||
def stage_verify(self):
|
||||
if self.stage_done("verified"):
|
||||
self.log("Verification already complete")
|
||||
return
|
||||
self.log("Final node verification")
|
||||
proc = self.remote(self.primary_ip, "sudo kubectl --kubeconfig /etc/kubernetes/admin.conf get nodes -o wide")
|
||||
print(proc.stdout)
|
||||
self.mark_done("verified")
|
||||
|
||||
def reconcile(self):
|
||||
self.stage_preflight()
|
||||
self.stage_rebuild()
|
||||
self.stage_init_primary()
|
||||
self.stage_install_cni()
|
||||
self.stage_join_control_planes()
|
||||
self.stage_join_workers()
|
||||
self.stage_verify()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="TerraHome kubeadm bootstrap controller")
|
||||
parser.add_argument("command", choices=[
|
||||
"reconcile",
|
||||
"preflight",
|
||||
"rebuild",
|
||||
"init-primary",
|
||||
"install-cni",
|
||||
"join-control-planes",
|
||||
"join-workers",
|
||||
"verify",
|
||||
])
|
||||
parser.add_argument("--inventory", default=str(Path(__file__).resolve().parent.parent / "scripts" / "inventory.env"))
|
||||
args = parser.parse_args()
|
||||
|
||||
cfg = load_inventory(args.inventory)
|
||||
ctl = Controller(cfg)
|
||||
|
||||
dispatch = {
|
||||
"reconcile": ctl.reconcile,
|
||||
"preflight": ctl.stage_preflight,
|
||||
"rebuild": ctl.stage_rebuild,
|
||||
"init-primary": ctl.stage_init_primary,
|
||||
"install-cni": ctl.stage_install_cni,
|
||||
"join-control-planes": ctl.stage_join_control_planes,
|
||||
"join-workers": ctl.stage_join_workers,
|
||||
"verify": ctl.stage_verify,
|
||||
}
|
||||
try:
|
||||
dispatch[args.command]()
|
||||
except Exception as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -2,8 +2,8 @@
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
FLAKE_DIR="${FLAKE_DIR:-$(cd "$SCRIPT_DIR/.." && pwd)}"
|
||||
INVENTORY_FILE="${1:-$SCRIPT_DIR/inventory.env}"
|
||||
CONTROLLER="$SCRIPT_DIR/../bootstrap/controller.py"
|
||||
|
||||
if [ ! -f "$INVENTORY_FILE" ]; then
|
||||
echo "Missing inventory file: $INVENTORY_FILE"
|
||||
@@ -11,340 +11,4 @@ if [ ! -f "$INVENTORY_FILE" ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# shellcheck disable=SC1090
|
||||
source "$INVENTORY_FILE"
|
||||
|
||||
SSH_USER="${SSH_USER:-micqdf}"
|
||||
SSH_KEY_PATH="${SSH_KEY_PATH:-$HOME/.ssh/id_ed25519}"
|
||||
SSH_OPTS="${SSH_OPTS:--o BatchMode=yes -o IdentitiesOnly=yes -o StrictHostKeyChecking=accept-new -i $SSH_KEY_PATH}"
|
||||
SSH_USER_CANDIDATES="${SSH_USER_CANDIDATES:-root $SSH_USER}"
|
||||
REBUILD_TIMEOUT="${REBUILD_TIMEOUT:-45m}"
|
||||
REBUILD_RETRIES="${REBUILD_RETRIES:-2}"
|
||||
WORKER_PARALLELISM="${WORKER_PARALLELISM:-3}"
|
||||
FAST_MODE="${FAST_MODE:-1}"
|
||||
|
||||
declare -A NODE_IPS=()
|
||||
declare -a CP_NAMES=()
|
||||
declare -a WK_NAMES=()
|
||||
|
||||
add_node_pair() {
|
||||
local role="$1"
|
||||
local pair="$2"
|
||||
local name="${pair%%=*}"
|
||||
local ip="${pair#*=}"
|
||||
|
||||
if [ -z "$name" ] || [ -z "$ip" ] || [ "$name" = "$ip" ]; then
|
||||
echo "Invalid node pair '$pair' (expected name=ip)."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
NODE_IPS["$name"]="$ip"
|
||||
if [ "$role" = "cp" ]; then
|
||||
CP_NAMES+=("$name")
|
||||
else
|
||||
WK_NAMES+=("$name")
|
||||
fi
|
||||
}
|
||||
|
||||
populate_nodes() {
|
||||
if [ -n "${CONTROL_PLANES:-}" ]; then
|
||||
for pair in $CONTROL_PLANES; do
|
||||
add_node_pair "cp" "$pair"
|
||||
done
|
||||
else
|
||||
while IFS= read -r var_name; do
|
||||
idx="${var_name#CP_}"
|
||||
add_node_pair "cp" "cp-$idx=${!var_name}"
|
||||
done < <(compgen -A variable | grep -E '^CP_[0-9]+$' | sort -V)
|
||||
fi
|
||||
|
||||
if [ -n "${WORKERS:-}" ]; then
|
||||
for pair in $WORKERS; do
|
||||
add_node_pair "wk" "$pair"
|
||||
done
|
||||
else
|
||||
while IFS= read -r var_name; do
|
||||
idx="${var_name#WK_}"
|
||||
add_node_pair "wk" "wk-$idx=${!var_name}"
|
||||
done < <(compgen -A variable | grep -E '^WK_[0-9]+$' | sort -V)
|
||||
fi
|
||||
|
||||
if [ "${#CP_NAMES[@]}" -eq 0 ]; then
|
||||
echo "No control planes found in inventory."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "${#WK_NAMES[@]}" -eq 0 ]; then
|
||||
echo "No workers found in inventory."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
remote() {
|
||||
local host_ip="$1"
|
||||
local cmd="$2"
|
||||
local quoted_cmd
|
||||
local candidate
|
||||
local candidates=()
|
||||
local rc=0
|
||||
|
||||
candidates+=("$ACTIVE_SSH_USER")
|
||||
for candidate in $SSH_USER_CANDIDATES; do
|
||||
if [ "$candidate" != "$ACTIVE_SSH_USER" ]; then
|
||||
candidates+=("$candidate")
|
||||
fi
|
||||
done
|
||||
|
||||
quoted_cmd="$(printf '%q' "$cmd")"
|
||||
for candidate in "${candidates[@]}"; do
|
||||
ssh $SSH_OPTS "$candidate@$host_ip" "bash -lc $quoted_cmd"
|
||||
rc=$?
|
||||
|
||||
if [ "$rc" -eq 0 ]; then
|
||||
ACTIVE_SSH_USER="$candidate"
|
||||
return 0
|
||||
fi
|
||||
|
||||
if [ "$rc" -ne 255 ]; then
|
||||
return "$rc"
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Remote command failed for all SSH users on $host_ip"
|
||||
return 1
|
||||
}
|
||||
|
||||
detect_ssh_user() {
|
||||
local probe_ip="$1"
|
||||
local candidate
|
||||
|
||||
for candidate in $SSH_USER_CANDIDATES; do
|
||||
if ssh $SSH_OPTS "$candidate@$probe_ip" "true" >/dev/null 2>&1; then
|
||||
ACTIVE_SSH_USER="$candidate"
|
||||
echo "==> Using SSH user '$ACTIVE_SSH_USER'"
|
||||
return 0
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Unable to authenticate to $probe_ip with candidates: $SSH_USER_CANDIDATES"
|
||||
return 1
|
||||
}
|
||||
|
||||
prepare_known_hosts() {
|
||||
mkdir -p "$HOME/.ssh"
|
||||
chmod 700 "$HOME/.ssh"
|
||||
touch "$HOME/.ssh/known_hosts"
|
||||
chmod 600 "$HOME/.ssh/known_hosts"
|
||||
|
||||
for node in "${!NODE_IPS[@]}"; do
|
||||
ssh-keygen -R "${NODE_IPS[$node]}" >/dev/null 2>&1 || true
|
||||
ssh-keyscan -H "${NODE_IPS[$node]}" >> "$HOME/.ssh/known_hosts" 2>/dev/null || true
|
||||
done
|
||||
}
|
||||
|
||||
cluster_has_node() {
|
||||
local node_name="$1"
|
||||
remote "$PRIMARY_CP_IP" "sudo kubectl --kubeconfig /etc/kubernetes/admin.conf get node $node_name >/dev/null 2>&1"
|
||||
}
|
||||
|
||||
has_admin_conf() {
|
||||
remote "$PRIMARY_CP_IP" "test -f /etc/kubernetes/admin.conf"
|
||||
}
|
||||
|
||||
cluster_ready() {
|
||||
remote "$PRIMARY_CP_IP" "test -f /etc/kubernetes/admin.conf && sudo kubectl --kubeconfig /etc/kubernetes/admin.conf get --raw=/readyz >/dev/null 2>&1"
|
||||
}
|
||||
|
||||
rebuild_node() {
|
||||
local node_name="$1"
|
||||
local node_ip="$2"
|
||||
|
||||
echo "==> Rebuilding $node_name on $node_ip"
|
||||
detect_ssh_user "$node_ip"
|
||||
timeout "$REBUILD_TIMEOUT" nixos-rebuild switch \
|
||||
--flake "$FLAKE_DIR#$node_name" \
|
||||
--target-host "$ACTIVE_SSH_USER@$node_ip" \
|
||||
--use-remote-sudo
|
||||
}
|
||||
|
||||
rebuild_node_with_retry() {
|
||||
local node_name="$1"
|
||||
local node_ip="$2"
|
||||
local attempt=1
|
||||
local max_attempts=$((REBUILD_RETRIES + 1))
|
||||
|
||||
while [ "$attempt" -le "$max_attempts" ]; do
|
||||
echo "==> Rebuild attempt $attempt/$max_attempts for $node_name"
|
||||
if rebuild_node "$node_name" "$node_ip"; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
if [ "$attempt" -lt "$max_attempts" ]; then
|
||||
echo "==> Rebuild failed for $node_name, retrying after 20s"
|
||||
sleep 20
|
||||
fi
|
||||
|
||||
attempt=$((attempt + 1))
|
||||
done
|
||||
|
||||
echo "==> Rebuild failed permanently for $node_name"
|
||||
return 1
|
||||
}
|
||||
|
||||
prepare_remote_nix_trust() {
|
||||
local node_ip="$1"
|
||||
echo "==> Ensuring nix trusted-users on $node_ip"
|
||||
remote "$node_ip" "sudo mkdir -p /etc/nix"
|
||||
remote "$node_ip" "if [ -f /etc/nix/nix.conf ]; then sudo sed -i '/^trusted-users[[:space:]]*=/d' /etc/nix/nix.conf; fi"
|
||||
remote "$node_ip" "echo 'trusted-users = root micqdf' | sudo tee -a /etc/nix/nix.conf >/dev/null"
|
||||
remote "$node_ip" "sudo systemctl restart nix-daemon 2>/dev/null || true"
|
||||
}
|
||||
|
||||
prepare_remote_space() {
|
||||
local node_ip="$1"
|
||||
echo "==> Reclaiming disk space on $node_ip"
|
||||
remote "$node_ip" "sudo nix-collect-garbage -d || true"
|
||||
remote "$node_ip" "sudo nix --extra-experimental-features nix-command store gc || true"
|
||||
remote "$node_ip" "sudo rm -rf /tmp/nix* /tmp/nixos-rebuild* || true"
|
||||
}
|
||||
|
||||
prepare_remote_kubelet() {
|
||||
local node_ip="$1"
|
||||
echo "==> Quiescing kubelet on $node_ip"
|
||||
remote "$node_ip" "sudo systemctl stop kubelet >/dev/null 2>&1 || true"
|
||||
remote "$node_ip" "sudo systemctl disable kubelet >/dev/null 2>&1 || true"
|
||||
remote "$node_ip" "sudo systemctl mask kubelet >/dev/null 2>&1 || true"
|
||||
remote "$node_ip" "sudo systemctl reset-failed kubelet >/dev/null 2>&1 || true"
|
||||
}
|
||||
|
||||
populate_nodes
|
||||
prepare_known_hosts
|
||||
export NIX_SSHOPTS="$SSH_OPTS"
|
||||
|
||||
PRIMARY_CONTROL_PLANE="${PRIMARY_CONTROL_PLANE:-cp-1}"
|
||||
if [ -z "${NODE_IPS[$PRIMARY_CONTROL_PLANE]:-}" ]; then
|
||||
PRIMARY_CONTROL_PLANE="${CP_NAMES[0]}"
|
||||
fi
|
||||
PRIMARY_CP_IP="${NODE_IPS[$PRIMARY_CONTROL_PLANE]}"
|
||||
ACTIVE_SSH_USER="$SSH_USER"
|
||||
detect_ssh_user "$PRIMARY_CP_IP"
|
||||
|
||||
for node in "${CP_NAMES[@]}"; do
|
||||
prepare_remote_nix_trust "${NODE_IPS[$node]}"
|
||||
prepare_remote_kubelet "${NODE_IPS[$node]}"
|
||||
if [ "$FAST_MODE" != "1" ]; then
|
||||
prepare_remote_space "${NODE_IPS[$node]}"
|
||||
fi
|
||||
rebuild_node_with_retry "$node" "${NODE_IPS[$node]}"
|
||||
done
|
||||
|
||||
worker_failures=0
|
||||
for node in "${WK_NAMES[@]}"; do
|
||||
prepare_remote_nix_trust "${NODE_IPS[$node]}"
|
||||
prepare_remote_kubelet "${NODE_IPS[$node]}"
|
||||
if [ "$FAST_MODE" != "1" ]; then
|
||||
prepare_remote_space "${NODE_IPS[$node]}"
|
||||
fi
|
||||
done
|
||||
|
||||
active_jobs=0
|
||||
for node in "${WK_NAMES[@]}"; do
|
||||
(
|
||||
rebuild_node_with_retry "$node" "${NODE_IPS[$node]}"
|
||||
) &
|
||||
|
||||
active_jobs=$((active_jobs + 1))
|
||||
if [ "$active_jobs" -ge "$WORKER_PARALLELISM" ]; then
|
||||
if ! wait -n; then
|
||||
worker_failures=$((worker_failures + 1))
|
||||
fi
|
||||
active_jobs=$((active_jobs - 1))
|
||||
fi
|
||||
done
|
||||
|
||||
while [ "$active_jobs" -gt 0 ]; do
|
||||
if ! wait -n; then
|
||||
worker_failures=$((worker_failures + 1))
|
||||
fi
|
||||
active_jobs=$((active_jobs - 1))
|
||||
done
|
||||
|
||||
if [ "$worker_failures" -gt 0 ]; then
|
||||
echo "==> $worker_failures worker rebuild job(s) failed"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "==> Initializing control plane on $PRIMARY_CONTROL_PLANE"
|
||||
detect_ssh_user "$PRIMARY_CP_IP"
|
||||
if has_admin_conf && cluster_ready; then
|
||||
echo "==> Existing cluster detected on $PRIMARY_CONTROL_PLANE; skipping kubeadm init"
|
||||
else
|
||||
echo "==> Control plane not fully initialized; running kubeadm init on $PRIMARY_CONTROL_PLANE"
|
||||
remote "$PRIMARY_CP_IP" "sudo th-kubeadm-init"
|
||||
|
||||
echo "==> Installing Cilium on $PRIMARY_CONTROL_PLANE"
|
||||
remote "$PRIMARY_CP_IP" "helm repo add cilium https://helm.cilium.io >/dev/null 2>&1 || true"
|
||||
remote "$PRIMARY_CP_IP" "helm repo update >/dev/null"
|
||||
remote "$PRIMARY_CP_IP" "kubectl create namespace kube-system >/dev/null 2>&1 || true"
|
||||
remote "$PRIMARY_CP_IP" "helm upgrade --install cilium cilium/cilium --namespace kube-system --set kubeProxyReplacement=true"
|
||||
fi
|
||||
|
||||
echo "==> Building kubeadm join commands"
|
||||
if ! has_admin_conf; then
|
||||
echo "==> admin.conf missing on $PRIMARY_CONTROL_PLANE; running kubeadm init"
|
||||
remote "$PRIMARY_CP_IP" "sudo th-kubeadm-init"
|
||||
fi
|
||||
|
||||
if ! has_admin_conf; then
|
||||
echo "==> admin.conf still missing after init attempt; aborting"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
JOIN_CMD="$(remote "$PRIMARY_CP_IP" "sudo KUBECONFIG=/etc/kubernetes/admin.conf kubeadm token create --print-join-command")"
|
||||
CERT_KEY="$(remote "$PRIMARY_CP_IP" "sudo KUBECONFIG=/etc/kubernetes/admin.conf kubeadm init phase upload-certs --upload-certs | tail -n 1")"
|
||||
CP_JOIN_CMD="$JOIN_CMD --control-plane --certificate-key $CERT_KEY"
|
||||
|
||||
join_control_plane() {
|
||||
local node_ip="$1"
|
||||
local encoded
|
||||
encoded="$(printf '%s' "$CP_JOIN_CMD" | base64 -w0)"
|
||||
remote "$node_ip" "sudo th-kubeadm-join-control-plane \"\$(echo $encoded | base64 -d)\""
|
||||
}
|
||||
|
||||
join_worker() {
|
||||
local node_ip="$1"
|
||||
local encoded
|
||||
encoded="$(printf '%s' "$JOIN_CMD" | base64 -w0)"
|
||||
remote "$node_ip" "sudo th-kubeadm-join-worker \"\$(echo $encoded | base64 -d)\""
|
||||
}
|
||||
|
||||
echo "==> Joining remaining control planes"
|
||||
for node in "${CP_NAMES[@]}"; do
|
||||
if [ "$node" = "$PRIMARY_CONTROL_PLANE" ]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
if cluster_has_node "$node"; then
|
||||
echo "$node already joined; skipping"
|
||||
else
|
||||
join_control_plane "${NODE_IPS[$node]}"
|
||||
fi
|
||||
done
|
||||
|
||||
echo "==> Joining workers"
|
||||
for node in "${WK_NAMES[@]}"; do
|
||||
if cluster_has_node "$node"; then
|
||||
echo "$node already joined; skipping"
|
||||
else
|
||||
join_worker "${NODE_IPS[$node]}"
|
||||
fi
|
||||
done
|
||||
|
||||
echo "==> Final node list"
|
||||
if remote "$PRIMARY_CP_IP" "test -f /etc/kubernetes/admin.conf"; then
|
||||
remote "$PRIMARY_CP_IP" "sudo kubectl --kubeconfig /etc/kubernetes/admin.conf get nodes -o wide"
|
||||
else
|
||||
echo "==> /etc/kubernetes/admin.conf missing on $PRIMARY_CONTROL_PLANE; attempting late init"
|
||||
remote "$PRIMARY_CP_IP" "sudo th-kubeadm-init"
|
||||
remote "$PRIMARY_CP_IP" "sudo kubectl --kubeconfig /etc/kubernetes/admin.conf get nodes -o wide"
|
||||
fi
|
||||
python3 "$CONTROLLER" reconcile --inventory "$INVENTORY_FILE"
|
||||
|
||||
Reference in New Issue
Block a user