45 lines
1.8 KiB
Python
45 lines
1.8 KiB
Python
"""Human override hooks: events the orchestrator can fire before high-risk steps."""
|
|
|
|
from typing import Any, Callable
|
|
|
|
from fusionagi._logger import logger
|
|
|
|
# Callback: (event_type, payload) -> proceed: bool
|
|
OverrideCallback = Callable[[str, dict[str, Any]], bool]
|
|
|
|
|
|
class OverrideHooks:
|
|
"""Optional callbacks for human override; no UI, just interface and logging."""
|
|
|
|
def __init__(self) -> None:
|
|
self._hooks: list[OverrideCallback] = []
|
|
self._log: list[dict[str, Any]] = []
|
|
|
|
def register(self, callback: OverrideCallback) -> None:
|
|
"""Register a callback; if any returns False, treat as 'do not proceed'."""
|
|
self._hooks.append(callback)
|
|
|
|
def fire(self, event_type: str, payload: dict[str, Any]) -> bool:
|
|
"""
|
|
Fire event (e.g. task_paused_for_approval). If no hooks, return True (proceed).
|
|
If any hook returns False, return False (do not proceed). Log all events.
|
|
Exception in a hook implies do not proceed.
|
|
"""
|
|
entry = {"event": event_type, "payload": payload}
|
|
self._log.append(entry)
|
|
logger.info("Override fire", extra={"event_type": event_type})
|
|
for h in self._hooks:
|
|
try:
|
|
if not h(event_type, payload):
|
|
logger.info("Override hook returned do not proceed", extra={"event_type": event_type})
|
|
return False
|
|
except Exception:
|
|
logger.exception("Override hook raised", extra={"event_type": event_type})
|
|
return False
|
|
logger.debug("Override fire proceed", extra={"event_type": event_type})
|
|
return True
|
|
|
|
def get_log(self, limit: int = 100) -> list[dict[str, Any]]:
|
|
"""Return recent override events (for auditing)."""
|
|
return list(self._log[-limit:])
|