"""Telemetry tracer: per-head latency, costs, event bus subscription.""" from collections import deque from dataclasses import dataclass, field from typing import Any import time from fusionagi._logger import logger _tracer: "TelemetryTracer | None" = None @dataclass class TraceEntry: """Single trace entry.""" event_type: str task_id: str | None head_id: str | None latency_ms: float | None payload: dict[str, Any] timestamp: float = field(default_factory=time.monotonic) class TelemetryTracer: """In-memory ring buffer for traces; subscribes to event bus.""" def __init__(self, max_entries: int = 10000) -> None: self._entries: deque[TraceEntry] = deque(maxlen=max_entries) self._subscription: Any = None self._starts: dict[str, float] = {} def subscribe(self, event_bus: Any) -> None: """Subscribe to event bus for message_received, dvadasa_complete.""" def on_message(_event_type: str, payload: dict[str, Any]) -> None: task_id = payload.get("task_id", "") recipient = payload.get("recipient", "") self._starts[f"{task_id}:{recipient}"] = time.monotonic() def on_dvadasa(_event_type: str, payload: dict[str, Any]) -> None: task_id = payload.get("task_id", "") head_count = payload.get("head_count", 0) self._entries.append( TraceEntry( event_type="dvadasa_complete", task_id=task_id, head_id=None, latency_ms=None, payload={"head_count": head_count}, ) ) try: event_bus.subscribe("message_received", on_message) event_bus.subscribe("dvadasa_complete", on_dvadasa) except Exception as e: logger.warning("Telemetry subscribe failed", extra={"error": str(e)}) def record_head_output(self, task_id: str, head_id: str, start: float | None = None) -> None: """Record head completion with optional latency.""" key = f"{task_id}:{head_id}" end = time.monotonic() latency_ms = (end - self._starts.pop(key, end)) * 1000 if start is None else (end - start) * 1000 self._entries.append( TraceEntry( event_type="head_output", task_id=task_id, head_id=head_id, latency_ms=latency_ms, payload={}, ) ) def get_traces(self, task_id: str | None = None, limit: int = 100) -> list[dict[str, Any]]: """Return traces, optionally filtered by task_id.""" out = [] for e in reversed(self._entries): if task_id and e.task_id != task_id: continue out.append({ "event_type": e.event_type, "task_id": e.task_id, "head_id": e.head_id, "latency_ms": e.latency_ms, "payload": e.payload, }) if len(out) >= limit: break return out def get_tracer() -> TelemetryTracer | None: return _tracer def set_tracer(t: TelemetryTracer | None) -> None: global _tracer _tracer = t