96 lines
3.6 KiB
Python
96 lines
3.6 KiB
Python
"""Critic / Evaluator agent: evaluates task outcome, error analysis, suggested improvements."""
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
from fusionagi.agents.base_agent import BaseAgent
|
|
from fusionagi.adapters.base import LLMAdapter
|
|
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class CriticAgent(BaseAgent):
|
|
"""Evaluates task outcome and execution trace; emits evaluation_ready."""
|
|
|
|
def __init__(
|
|
self,
|
|
identity: str = "critic",
|
|
adapter: LLMAdapter | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
identity=identity,
|
|
role="Critic",
|
|
objective="Evaluate outcomes and suggest improvements",
|
|
memory_access=True,
|
|
tool_permissions=[],
|
|
)
|
|
self._adapter = adapter
|
|
|
|
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
|
|
"""On evaluate_request, return evaluation_ready with score, analysis, suggestions."""
|
|
if envelope.message.intent != "evaluate_request":
|
|
return None
|
|
logger.info(
|
|
"Critic handle_message",
|
|
extra={"recipient": self.identity, "intent": envelope.message.intent},
|
|
)
|
|
payload = envelope.message.payload
|
|
task_id = envelope.task_id
|
|
outcome = payload.get("outcome", "unknown")
|
|
trace = payload.get("trace", [])
|
|
plan = payload.get("plan")
|
|
if self._adapter:
|
|
evaluation = self._evaluate_with_llm(outcome, trace, plan)
|
|
else:
|
|
evaluation = {
|
|
"success": outcome == "completed",
|
|
"score": 1.0 if outcome == "completed" else 0.0,
|
|
"error_analysis": [],
|
|
"suggestions": ["Enable LLM for detailed evaluation"],
|
|
}
|
|
logger.info(
|
|
"Critic response",
|
|
extra={"recipient": self.identity, "response_intent": "evaluation_ready"},
|
|
)
|
|
return AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender=self.identity,
|
|
recipient=envelope.message.sender,
|
|
intent="evaluation_ready",
|
|
payload={"evaluation": evaluation},
|
|
),
|
|
task_id=task_id,
|
|
correlation_id=envelope.correlation_id,
|
|
)
|
|
|
|
def _evaluate_with_llm(
|
|
self,
|
|
outcome: str,
|
|
trace: list[dict[str, Any]],
|
|
plan: dict[str, Any] | None,
|
|
) -> dict[str, Any]:
|
|
"""Use adapter to produce evaluation (score, error_analysis, suggestions)."""
|
|
context = f"Outcome: {outcome}\nTrace (last 5): {json.dumps(trace[-5:], default=str)}\n"
|
|
if plan:
|
|
context += f"Plan: {json.dumps(plan.get('steps', [])[:5], default=str)}"
|
|
messages = [
|
|
{"role": "system", "content": "You evaluate task execution. Output JSON: {\"success\": bool, \"score\": 0-1, \"error_analysis\": [], \"suggestions\": []}. Output only JSON."},
|
|
{"role": "user", "content": context},
|
|
]
|
|
try:
|
|
raw = self._adapter.complete(messages)
|
|
for start in ("```json", "```"):
|
|
if raw.strip().startswith(start):
|
|
raw = raw.strip()[len(start):].strip()
|
|
if raw.endswith("```"):
|
|
raw = raw[:-3].strip()
|
|
return json.loads(raw)
|
|
except Exception:
|
|
logger.exception("Critic evaluation parse failed, using fallback")
|
|
return {
|
|
"success": outcome == "completed",
|
|
"score": 0.5,
|
|
"error_analysis": ["Evaluation parse failed"],
|
|
"suggestions": [],
|
|
}
|