- Update dbis_core, cross-chain-pmm-lps, explorer-monorepo, metamask-integration, pr-workspace/chains - Omit embedded publish git dirs and empty placeholders from index Made-with: Cursor
337 lines
12 KiB
Python
Executable File
337 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Read-only HTTP API for IT inventory JSON (Phase 0 BFF stub).
|
|
|
|
Serves latest reports/status/live_inventory.json and drift.json from the repo tree.
|
|
Optional IT_READ_API_KEY: when set, /v1/* requires header X-API-Key (GET and POST).
|
|
|
|
Usage (from repo root):
|
|
IT_READ_API_KEY=secret python3 services/sankofa-it-read-api/server.py
|
|
# or
|
|
python3 services/sankofa-it-read-api/server.py # open /v1 without key
|
|
|
|
Env:
|
|
IT_READ_API_HOST (default 127.0.0.1)
|
|
IT_READ_API_PORT (default 8787)
|
|
IT_READ_API_KEY (optional)
|
|
IT_READ_API_CORS_ORIGINS (optional, comma-separated; enables CORS for browser direct calls)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
|
|
|
|
def _project_root() -> Path:
|
|
here = Path(__file__).resolve()
|
|
for p in [here.parent.parent.parent, *here.parents]:
|
|
if (p / "config" / "ip-addresses.conf").is_file():
|
|
return p
|
|
return Path.cwd()
|
|
|
|
|
|
ROOT = _project_root()
|
|
REPORTS = ROOT / "reports" / "status"
|
|
EXPORT_SCRIPT = ROOT / "scripts" / "it-ops" / "export-live-inventory-and-drift.sh"
|
|
COLLECTOR_CONTRACT = (
|
|
ROOT / "config" / "it-operations" / "live-collectors-contract.json"
|
|
)
|
|
API_KEY = os.environ.get("IT_READ_API_KEY", "").strip()
|
|
HOST = os.environ.get("IT_READ_API_HOST", "127.0.0.1")
|
|
PORT = int(os.environ.get("IT_READ_API_PORT", "8787"))
|
|
# Comma-separated origins for Access-Control-Allow-Origin (optional; portal should proxy via Next.js).
|
|
_CORS_RAW = os.environ.get("IT_READ_API_CORS_ORIGINS", "").strip()
|
|
CORS_ORIGINS = {o.strip() for o in _CORS_RAW.split(",") if o.strip()}
|
|
|
|
|
|
def _file_meta(path: Path) -> dict:
|
|
if not path.is_file():
|
|
return {"path": str(path), "exists": False}
|
|
st = path.stat()
|
|
return {
|
|
"path": str(path),
|
|
"exists": True,
|
|
"size_bytes": st.st_size,
|
|
"mtime_utc": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).strftime(
|
|
"%Y-%m-%dT%H:%M:%SZ"
|
|
),
|
|
}
|
|
|
|
|
|
def _load_json_file(path: Path) -> tuple[dict | list | None, str | None]:
|
|
if not path.is_file():
|
|
return None, "file_missing"
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8")), None
|
|
except json.JSONDecodeError as e:
|
|
return None, str(e)
|
|
|
|
|
|
def _declared_git_head(repo: Path) -> str | None:
|
|
"""Best-effort declared config revision (repo checkout on read API host)."""
|
|
if os.environ.get("IT_SKIP_GIT_HEAD", "").strip().lower() in ("1", "yes", "true"):
|
|
return None
|
|
try:
|
|
proc = subprocess.run(
|
|
["git", "-C", str(repo), "rev-parse", "--short", "HEAD"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
check=False,
|
|
)
|
|
if proc.returncode == 0 and proc.stdout.strip():
|
|
return proc.stdout.strip()
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
pass
|
|
return None
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
server_version = "SankofaITReadAPI/0.1"
|
|
|
|
def log_message(self, fmt: str, *args) -> None:
|
|
sys.stderr.write("%s - %s\n" % (self.address_string(), fmt % args))
|
|
|
|
def _maybe_cors(self) -> None:
|
|
if not CORS_ORIGINS:
|
|
return
|
|
origin = (self.headers.get("Origin") or "").strip()
|
|
if origin in CORS_ORIGINS:
|
|
self.send_header("Access-Control-Allow-Origin", origin)
|
|
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
|
|
self.send_header(
|
|
"Access-Control-Allow-Headers",
|
|
"Content-Type, X-API-Key",
|
|
)
|
|
self.send_header("Vary", "Origin")
|
|
|
|
def do_OPTIONS(self) -> None:
|
|
if CORS_ORIGINS:
|
|
self.send_response(204)
|
|
self._maybe_cors()
|
|
self.end_headers()
|
|
return
|
|
self._text(404, "Not found\n")
|
|
|
|
def _json(self, code: int, obj: object) -> None:
|
|
body = json.dumps(obj, indent=2).encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self._maybe_cors()
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _text(self, code: int, text: str, ctype: str = "text/plain") -> None:
|
|
body = text.encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", f"{ctype}; charset=utf-8")
|
|
self._maybe_cors()
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _auth_ok(self) -> bool:
|
|
if not API_KEY:
|
|
return True
|
|
return self.headers.get("X-API-Key") == API_KEY
|
|
|
|
def do_GET(self) -> None:
|
|
path = self.path.split("?", 1)[0].rstrip("/") or "/"
|
|
if path == "/health":
|
|
oidc_issuer = os.environ.get("IT_BFF_OIDC_ISSUER", "").strip()
|
|
self._json(
|
|
200,
|
|
{
|
|
"ok": True,
|
|
"service": "sankofa-it-read-api",
|
|
"project_root": str(ROOT),
|
|
"auth_required_for_v1": bool(API_KEY),
|
|
"oidc_issuer_configured": bool(oidc_issuer),
|
|
},
|
|
)
|
|
return
|
|
|
|
if path.startswith("/v1"):
|
|
if not self._auth_ok():
|
|
self._json(401, {"error": "unauthorized"})
|
|
return
|
|
|
|
if path == "/v1/collector-contract":
|
|
data, err = _load_json_file(COLLECTOR_CONTRACT)
|
|
if err == "file_missing":
|
|
self._json(404, {"error": "file_missing", "path": str(COLLECTOR_CONTRACT)})
|
|
return
|
|
if err:
|
|
self._json(500, {"error": "invalid_json", "detail": err})
|
|
return
|
|
assert isinstance(data, (dict, list))
|
|
self._json(200, data)
|
|
return
|
|
|
|
if path == "/v1/portmap/joined":
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
self._json(
|
|
200,
|
|
{
|
|
"implementation": "stub",
|
|
"collected_at": now,
|
|
"stale": True,
|
|
"rows": [],
|
|
"note": "UniFi/NPM live collectors not wired; see docs/02-architecture/IT_PORT_MAP_LAYERS_SPEC.md",
|
|
},
|
|
)
|
|
return
|
|
|
|
if path == "/v1/summary":
|
|
live_p = REPORTS / "live_inventory.json"
|
|
drift_p = REPORTS / "drift.json"
|
|
live_d, le = _load_json_file(live_p)
|
|
drift_d, de = _load_json_file(drift_p)
|
|
if le == "file_missing" and de == "file_missing":
|
|
self._json(404, {"error": "no_inventory_artifacts"})
|
|
return
|
|
dup = (
|
|
len(drift_d.get("duplicate_ips", {}))
|
|
if isinstance(drift_d, dict)
|
|
else 0
|
|
)
|
|
notes = drift_d.get("notes", []) if isinstance(drift_d, dict) else []
|
|
seed_err = any(
|
|
"seed_unreachable" in str(n) for n in notes
|
|
) or (
|
|
isinstance(live_d, dict) and live_d.get("error") == "seed_unreachable"
|
|
)
|
|
vmid_doc_missing: list[str] = []
|
|
vmid_live_extra: dict = {}
|
|
if isinstance(drift_d, dict):
|
|
raw_missing = drift_d.get("vmids_in_all_vmids_doc_not_on_cluster")
|
|
if isinstance(raw_missing, list):
|
|
vmid_doc_missing = [str(x) for x in raw_missing]
|
|
extra = drift_d.get("vmids_on_cluster_not_in_all_vmids_table")
|
|
if isinstance(extra, dict):
|
|
vmid_live_extra = extra
|
|
self._json(
|
|
200,
|
|
{
|
|
"service": "sankofa-it-read-api",
|
|
"envelope_at": datetime.now(timezone.utc).strftime(
|
|
"%Y-%m-%dT%H:%M:%SZ"
|
|
),
|
|
"declared_git_head": _declared_git_head(ROOT),
|
|
"artifacts": {
|
|
"live_inventory": _file_meta(live_p),
|
|
"drift": _file_meta(drift_p),
|
|
},
|
|
"live_collected_at": (live_d or {}).get("collected_at")
|
|
if isinstance(live_d, dict)
|
|
else None,
|
|
"drift_collected_at": (drift_d or {}).get("collected_at")
|
|
if isinstance(drift_d, dict)
|
|
else None,
|
|
"guest_count": (drift_d or {}).get("guest_count")
|
|
if isinstance(drift_d, dict)
|
|
else None,
|
|
"duplicate_ip_bucket_count": dup,
|
|
"seed_unreachable": bool(seed_err),
|
|
"drift_notes": notes if isinstance(notes, list) else [],
|
|
"vmids_in_all_vmids_doc_not_on_cluster": vmid_doc_missing,
|
|
"vmids_on_cluster_not_in_all_vmids_table": vmid_live_extra,
|
|
},
|
|
)
|
|
return
|
|
|
|
if path == "/v1/inventory/live":
|
|
f = REPORTS / "live_inventory.json"
|
|
elif path == "/v1/inventory/drift":
|
|
f = REPORTS / "drift.json"
|
|
else:
|
|
self._json(404, {"error": "not_found"})
|
|
return
|
|
data, err = _load_json_file(f)
|
|
if err == "file_missing":
|
|
self._json(404, {"error": "file_missing", "path": str(f)})
|
|
return
|
|
if err:
|
|
self._json(500, {"error": "invalid_json", "detail": err})
|
|
return
|
|
assert data is not None
|
|
self._json(200, data)
|
|
return
|
|
|
|
self._text(
|
|
404,
|
|
"Not found. GET /health, /v1/summary, /v1/collector-contract, /v1/inventory/*\n",
|
|
)
|
|
|
|
def do_POST(self) -> None:
|
|
path = self.path.split("?", 1)[0].rstrip("/") or "/"
|
|
if path != "/v1/inventory/refresh":
|
|
self._json(404, {"error": "not_found"})
|
|
return
|
|
if not API_KEY or not self._auth_ok():
|
|
self._json(401, {"error": "unauthorized"})
|
|
return
|
|
if not EXPORT_SCRIPT.is_file():
|
|
self._json(500, {"error": "export_script_missing"})
|
|
return
|
|
try:
|
|
proc = subprocess.run(
|
|
["bash", str(EXPORT_SCRIPT)],
|
|
cwd=str(ROOT),
|
|
check=False,
|
|
timeout=600,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
self._json(500, {"error": "export_timeout"})
|
|
return
|
|
if proc.returncode not in (0, 2):
|
|
self._json(
|
|
500,
|
|
{
|
|
"error": "export_failed",
|
|
"returncode": proc.returncode,
|
|
"stderr": (proc.stderr or "")[-4000:],
|
|
},
|
|
)
|
|
return
|
|
self._json(
|
|
200,
|
|
{
|
|
"ok": True,
|
|
"refreshed": True,
|
|
"drift_exit_code": proc.returncode,
|
|
"duplicate_guest_ip_conflict": proc.returncode == 2,
|
|
},
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
httpd = ThreadingHTTPServer((HOST, PORT), Handler)
|
|
print(f"sankofa-it-read-api listening on http://{HOST}:{PORT}", file=sys.stderr)
|
|
print(f" project_root={ROOT}", file=sys.stderr)
|
|
print(
|
|
" GET /health /v1/summary /v1/collector-contract /v1/portmap/joined",
|
|
file=sys.stderr,
|
|
)
|
|
print(" GET /v1/inventory/live /v1/inventory/drift POST /v1/inventory/refresh", file=sys.stderr)
|
|
if API_KEY:
|
|
print(" X-API-Key required for /v1/*", file=sys.stderr)
|
|
if CORS_ORIGINS:
|
|
print(f" CORS origins: {sorted(CORS_ORIGINS)}", file=sys.stderr)
|
|
try:
|
|
httpd.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("\nshutdown", file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|