#!/usr/bin/env python3 """ Read-only HTTP API for IT inventory JSON (Phase 0 BFF stub). Serves latest reports/status/live_inventory.json and drift.json from the repo tree. Optional IT_READ_API_KEY: when set, /v1/* requires header X-API-Key (GET and POST). Usage (from repo root): IT_READ_API_KEY=secret python3 services/sankofa-it-read-api/server.py # or python3 services/sankofa-it-read-api/server.py # open /v1 without key Env: IT_READ_API_HOST (default 127.0.0.1) IT_READ_API_PORT (default 8787) IT_READ_API_KEY (optional) IT_READ_API_CORS_ORIGINS (optional, comma-separated; enables CORS for browser direct calls) """ from __future__ import annotations import json import os import subprocess import sys from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path def _project_root() -> Path: here = Path(__file__).resolve() for p in [here.parent.parent.parent, *here.parents]: if (p / "config" / "ip-addresses.conf").is_file(): return p return Path.cwd() ROOT = _project_root() REPORTS = ROOT / "reports" / "status" EXPORT_SCRIPT = ROOT / "scripts" / "it-ops" / "export-live-inventory-and-drift.sh" COLLECTOR_CONTRACT = ( ROOT / "config" / "it-operations" / "live-collectors-contract.json" ) API_KEY = os.environ.get("IT_READ_API_KEY", "").strip() HOST = os.environ.get("IT_READ_API_HOST", "127.0.0.1") PORT = int(os.environ.get("IT_READ_API_PORT", "8787")) # Comma-separated origins for Access-Control-Allow-Origin (optional; portal should proxy via Next.js). _CORS_RAW = os.environ.get("IT_READ_API_CORS_ORIGINS", "").strip() CORS_ORIGINS = {o.strip() for o in _CORS_RAW.split(",") if o.strip()} def _file_meta(path: Path) -> dict: if not path.is_file(): return {"path": str(path), "exists": False} st = path.stat() return { "path": str(path), "exists": True, "size_bytes": st.st_size, "mtime_utc": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ" ), } def _load_json_file(path: Path) -> tuple[dict | list | None, str | None]: if not path.is_file(): return None, "file_missing" try: return json.loads(path.read_text(encoding="utf-8")), None except json.JSONDecodeError as e: return None, str(e) def _declared_git_head(repo: Path) -> str | None: """Best-effort declared config revision (repo checkout on read API host).""" if os.environ.get("IT_SKIP_GIT_HEAD", "").strip().lower() in ("1", "yes", "true"): return None try: proc = subprocess.run( ["git", "-C", str(repo), "rev-parse", "--short", "HEAD"], capture_output=True, text=True, timeout=5, check=False, ) if proc.returncode == 0 and proc.stdout.strip(): return proc.stdout.strip() except (OSError, subprocess.TimeoutExpired): pass return None class Handler(BaseHTTPRequestHandler): server_version = "SankofaITReadAPI/0.1" def log_message(self, fmt: str, *args) -> None: sys.stderr.write("%s - %s\n" % (self.address_string(), fmt % args)) def _maybe_cors(self) -> None: if not CORS_ORIGINS: return origin = (self.headers.get("Origin") or "").strip() if origin in CORS_ORIGINS: self.send_header("Access-Control-Allow-Origin", origin) self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header( "Access-Control-Allow-Headers", "Content-Type, X-API-Key", ) self.send_header("Vary", "Origin") def do_OPTIONS(self) -> None: if CORS_ORIGINS: self.send_response(204) self._maybe_cors() self.end_headers() return self._text(404, "Not found\n") def _json(self, code: int, obj: object) -> None: body = json.dumps(obj, indent=2).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json") self._maybe_cors() self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _text(self, code: int, text: str, ctype: str = "text/plain") -> None: body = text.encode("utf-8") self.send_response(code) self.send_header("Content-Type", f"{ctype}; charset=utf-8") self._maybe_cors() self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _auth_ok(self) -> bool: if not API_KEY: return True return self.headers.get("X-API-Key") == API_KEY def do_GET(self) -> None: path = self.path.split("?", 1)[0].rstrip("/") or "/" if path == "/health": oidc_issuer = os.environ.get("IT_BFF_OIDC_ISSUER", "").strip() self._json( 200, { "ok": True, "service": "sankofa-it-read-api", "project_root": str(ROOT), "auth_required_for_v1": bool(API_KEY), "oidc_issuer_configured": bool(oidc_issuer), }, ) return if path.startswith("/v1"): if not self._auth_ok(): self._json(401, {"error": "unauthorized"}) return if path == "/v1/collector-contract": data, err = _load_json_file(COLLECTOR_CONTRACT) if err == "file_missing": self._json(404, {"error": "file_missing", "path": str(COLLECTOR_CONTRACT)}) return if err: self._json(500, {"error": "invalid_json", "detail": err}) return assert isinstance(data, (dict, list)) self._json(200, data) return if path == "/v1/portmap/joined": now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") self._json( 200, { "implementation": "stub", "collected_at": now, "stale": True, "rows": [], "note": "UniFi/NPM live collectors not wired; see docs/02-architecture/IT_PORT_MAP_LAYERS_SPEC.md", }, ) return if path == "/v1/summary": live_p = REPORTS / "live_inventory.json" drift_p = REPORTS / "drift.json" live_d, le = _load_json_file(live_p) drift_d, de = _load_json_file(drift_p) if le == "file_missing" and de == "file_missing": self._json(404, {"error": "no_inventory_artifacts"}) return dup = ( len(drift_d.get("duplicate_ips", {})) if isinstance(drift_d, dict) else 0 ) notes = drift_d.get("notes", []) if isinstance(drift_d, dict) else [] seed_err = any( "seed_unreachable" in str(n) for n in notes ) or ( isinstance(live_d, dict) and live_d.get("error") == "seed_unreachable" ) vmid_doc_missing: list[str] = [] vmid_live_extra: dict = {} if isinstance(drift_d, dict): raw_missing = drift_d.get("vmids_in_all_vmids_doc_not_on_cluster") if isinstance(raw_missing, list): vmid_doc_missing = [str(x) for x in raw_missing] extra = drift_d.get("vmids_on_cluster_not_in_all_vmids_table") if isinstance(extra, dict): vmid_live_extra = extra self._json( 200, { "service": "sankofa-it-read-api", "envelope_at": datetime.now(timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ" ), "declared_git_head": _declared_git_head(ROOT), "artifacts": { "live_inventory": _file_meta(live_p), "drift": _file_meta(drift_p), }, "live_collected_at": (live_d or {}).get("collected_at") if isinstance(live_d, dict) else None, "drift_collected_at": (drift_d or {}).get("collected_at") if isinstance(drift_d, dict) else None, "guest_count": (drift_d or {}).get("guest_count") if isinstance(drift_d, dict) else None, "duplicate_ip_bucket_count": dup, "seed_unreachable": bool(seed_err), "drift_notes": notes if isinstance(notes, list) else [], "vmids_in_all_vmids_doc_not_on_cluster": vmid_doc_missing, "vmids_on_cluster_not_in_all_vmids_table": vmid_live_extra, }, ) return if path == "/v1/inventory/live": f = REPORTS / "live_inventory.json" elif path == "/v1/inventory/drift": f = REPORTS / "drift.json" else: self._json(404, {"error": "not_found"}) return data, err = _load_json_file(f) if err == "file_missing": self._json(404, {"error": "file_missing", "path": str(f)}) return if err: self._json(500, {"error": "invalid_json", "detail": err}) return assert data is not None self._json(200, data) return self._text( 404, "Not found. GET /health, /v1/summary, /v1/collector-contract, /v1/inventory/*\n", ) def do_POST(self) -> None: path = self.path.split("?", 1)[0].rstrip("/") or "/" if path != "/v1/inventory/refresh": self._json(404, {"error": "not_found"}) return if not API_KEY or not self._auth_ok(): self._json(401, {"error": "unauthorized"}) return if not EXPORT_SCRIPT.is_file(): self._json(500, {"error": "export_script_missing"}) return try: proc = subprocess.run( ["bash", str(EXPORT_SCRIPT)], cwd=str(ROOT), check=False, timeout=600, capture_output=True, text=True, ) except subprocess.TimeoutExpired: self._json(500, {"error": "export_timeout"}) return if proc.returncode not in (0, 2): self._json( 500, { "error": "export_failed", "returncode": proc.returncode, "stderr": (proc.stderr or "")[-4000:], }, ) return self._json( 200, { "ok": True, "refreshed": True, "drift_exit_code": proc.returncode, "duplicate_guest_ip_conflict": proc.returncode == 2, }, ) def main() -> None: httpd = ThreadingHTTPServer((HOST, PORT), Handler) print(f"sankofa-it-read-api listening on http://{HOST}:{PORT}", file=sys.stderr) print(f" project_root={ROOT}", file=sys.stderr) print( " GET /health /v1/summary /v1/collector-contract /v1/portmap/joined", file=sys.stderr, ) print(" GET /v1/inventory/live /v1/inventory/drift POST /v1/inventory/refresh", file=sys.stderr) if API_KEY: print(" X-API-Key required for /v1/*", file=sys.stderr) if CORS_ORIGINS: print(f" CORS origins: {sorted(CORS_ORIGINS)}", file=sys.stderr) try: httpd.serve_forever() except KeyboardInterrupt: print("\nshutdown", file=sys.stderr) if __name__ == "__main__": main()