#!/usr/bin/env python3 """Run ON a Proxmox cluster node (as root). Stdout: JSON live guest inventory.""" from __future__ import annotations import json import os import re import subprocess import sys from datetime import datetime, timezone def _run(cmd: list[str]) -> str: return subprocess.check_output(cmd, text=True, stderr=subprocess.DEVNULL) def _extract_ip_from_net_line(line: str) -> str | None: m = re.search(r"ip=([0-9.]+)", line) return m.group(1) if m else None def _extract_hwaddr_from_net_line(line: str) -> str | None: m = re.search(r"hwaddr=([0-9A-Fa-f:]+)", line, re.IGNORECASE) return m.group(1) if m else None def _read_config(path: str) -> str: try: with open(path, encoding="utf-8", errors="replace") as f: return f.read() except OSError: return "" def main() -> None: collected_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") try: raw = _run( ["pvesh", "get", "/cluster/resources", "--output-format", "json"] ) resources = json.loads(raw) except (subprocess.CalledProcessError, json.JSONDecodeError) as e: json.dump( { "collected_at": collected_at, "error": f"pvesh_cluster_resources_failed: {e}", "guests": [], }, sys.stdout, indent=2, ) return guests: list[dict] = [] for r in resources: t = r.get("type") if t not in ("lxc", "qemu"): continue vmid = r.get("vmid") node = r.get("node") if vmid is None or not node: continue vmid_s = str(vmid) name = r.get("name") or "" status = r.get("status") or "" if t == "lxc": cfg_path = f"/etc/pve/nodes/{node}/lxc/{vmid_s}.conf" else: cfg_path = f"/etc/pve/nodes/{node}/qemu-server/{vmid_s}.conf" body = _read_config(cfg_path) ip = "" mac = "" for line in body.splitlines(): if line.startswith("net0:"): got = _extract_ip_from_net_line(line) if got: ip = got h = _extract_hwaddr_from_net_line(line) if h: mac = h break if not ip and t == "qemu": for line in body.splitlines(): if line.startswith("ipconfig0:"): got = _extract_ip_from_net_line(line) if got: ip = got break if not ip and t == "qemu": for line in body.splitlines(): if line.startswith("net0:"): got = _extract_ip_from_net_line(line) if got: ip = got if not mac: h = _extract_hwaddr_from_net_line(line) if h: mac = h if ip: break guests.append( { "vmid": vmid_s, "type": t, "node": str(node), "name": name, "status": status, "ip": ip, "mac": mac, "config_path": cfg_path, } ) out: dict = { "collected_at": collected_at, "guests": sorted(guests, key=lambda g: int(g["vmid"])), } if os.environ.get("IT_COLLECT_IP_NEIGH", "").strip().lower() in ( "1", "yes", "true", ): neigh_lines: list[str] = [] try: raw_neigh = subprocess.check_output( ["ip", "-4", "neigh", "show", "dev", "vmbr0"], text=True, stderr=subprocess.DEVNULL, timeout=30, ) neigh_lines = [ ln.strip() for ln in raw_neigh.splitlines() if ln.strip() ][:500] except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): neigh_lines = [] out["ip_neigh_vmbr0_sample"] = { "collected_at": collected_at, "line_count": len(neigh_lines), "lines": neigh_lines, } json.dump(out, sys.stdout, indent=2) if __name__ == "__main__": main()