220 lines
8.4 KiB
Python
220 lines
8.4 KiB
Python
"""Witness agent: meta-controller that arbitrates head outputs and produces final response."""
|
|
|
|
from typing import Any
|
|
|
|
from fusionagi.agents.base_agent import BaseAgent
|
|
|
|
# Approx 4 chars/token; limit context to ~6k tokens (~24k chars) to avoid overflow
|
|
DEFAULT_MAX_CONTEXT_CHARS = 24_000
|
|
from fusionagi.adapters.base import LLMAdapter
|
|
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
|
|
from fusionagi.schemas.head import HeadId, HeadOutput
|
|
from fusionagi.schemas.witness import (
|
|
AgreementMap,
|
|
TransparencyReport,
|
|
FinalResponse,
|
|
)
|
|
from fusionagi.multi_agent.consensus_engine import run_consensus
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
WITNESS_COMPOSE_SYSTEM = """You are the Witness meta-controller in a 12-headed multi-agent system.
|
|
You receive structured outputs from specialist heads (Logic, Research, Strategy, Security, etc.).
|
|
Your job: produce a clear, coherent final answer that synthesizes the head contributions.
|
|
Use the agreed claims. Acknowledge disputes if any. Be concise and actionable.
|
|
Output only the final narrative text, no JSON or meta-commentary."""
|
|
|
|
|
|
class WitnessAgent(BaseAgent):
|
|
"""
|
|
Witness: consumes HeadOutput from content heads, runs consensus, composes FinalResponse.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
adapter: LLMAdapter | None = None,
|
|
max_context_chars: int = DEFAULT_MAX_CONTEXT_CHARS,
|
|
) -> None:
|
|
super().__init__(
|
|
identity=HeadId.WITNESS.value,
|
|
role="Witness",
|
|
objective="Arbitrate head outputs, resolve conflicts, produce final narrative",
|
|
memory_access=True,
|
|
tool_permissions=[],
|
|
)
|
|
self._adapter = adapter
|
|
self._max_context_chars = max_context_chars
|
|
|
|
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
|
|
"""On witness_request, produce FinalResponse from head outputs."""
|
|
if envelope.message.intent != "witness_request":
|
|
return None
|
|
|
|
payload = envelope.message.payload or {}
|
|
head_outputs_data = payload.get("head_outputs", [])
|
|
user_prompt = payload.get("prompt", "")
|
|
|
|
head_outputs: list[HeadOutput] = []
|
|
for h in head_outputs_data:
|
|
if isinstance(h, dict):
|
|
try:
|
|
head_outputs.append(HeadOutput.model_validate(h))
|
|
except Exception as e:
|
|
logger.warning("Witness: skip invalid HeadOutput", extra={"error": str(e)})
|
|
elif isinstance(h, HeadOutput):
|
|
head_outputs.append(h)
|
|
|
|
logger.info(
|
|
"Witness handle_message",
|
|
extra={"head_count": len(head_outputs), "intent": envelope.message.intent},
|
|
)
|
|
|
|
response = self._produce_response(head_outputs, user_prompt)
|
|
if response is None:
|
|
return envelope.create_response(
|
|
"witness_failed",
|
|
payload={"error": "Failed to produce final response"},
|
|
)
|
|
|
|
return AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender=self.identity,
|
|
recipient=envelope.message.sender,
|
|
intent="witness_output",
|
|
payload={"final_response": response.model_dump()},
|
|
),
|
|
task_id=envelope.task_id,
|
|
correlation_id=envelope.correlation_id,
|
|
)
|
|
|
|
def _produce_response(
|
|
self,
|
|
head_outputs: list[HeadOutput],
|
|
user_prompt: str,
|
|
) -> FinalResponse | None:
|
|
"""Run consensus and compose final answer."""
|
|
agreement_map = run_consensus(head_outputs)
|
|
|
|
head_contributions: list[dict[str, Any]] = []
|
|
for out in head_outputs:
|
|
key_claims = [c.claim_text[:80] + "..." if len(c.claim_text) > 80 else c.claim_text for c in out.claims[:3]]
|
|
head_contributions.append({
|
|
"head_id": out.head_id.value,
|
|
"summary": out.summary,
|
|
"key_claims": key_claims,
|
|
})
|
|
|
|
safety_report = self._build_safety_report(head_outputs)
|
|
|
|
transparency = TransparencyReport(
|
|
head_contributions=head_contributions,
|
|
agreement_map=agreement_map,
|
|
safety_report=safety_report,
|
|
confidence_score=agreement_map.confidence_score,
|
|
)
|
|
|
|
final_answer = self._compose_final_answer(
|
|
head_outputs=head_outputs,
|
|
agreement_map=agreement_map,
|
|
user_prompt=user_prompt,
|
|
)
|
|
|
|
return FinalResponse(
|
|
final_answer=final_answer,
|
|
transparency_report=transparency,
|
|
head_contributions=head_contributions,
|
|
confidence_score=agreement_map.confidence_score,
|
|
)
|
|
|
|
def _build_safety_report(self, head_outputs: list[HeadOutput]) -> str:
|
|
"""Summarize safety-relevant findings from Safety head and risks."""
|
|
safety_summaries = []
|
|
all_risks: list[str] = []
|
|
for out in head_outputs:
|
|
if out.head_id == HeadId.SAFETY and out.summary:
|
|
safety_summaries.append(out.summary)
|
|
for r in out.risks:
|
|
if r.severity in ("high", "critical"):
|
|
all_risks.append(f"[{out.head_id.value}] {r.description}")
|
|
if safety_summaries:
|
|
return " ".join(safety_summaries)
|
|
if all_risks:
|
|
return "Risks identified: " + "; ".join(all_risks[:5])
|
|
return "No significant safety concerns raised."
|
|
|
|
def _truncate(self, text: str, max_len: int) -> str:
|
|
"""Truncate text with ellipsis if over max_len."""
|
|
if len(text) <= max_len:
|
|
return text
|
|
return text[: max_len - 3] + "..."
|
|
|
|
def _build_compose_context(
|
|
self,
|
|
head_outputs: list[HeadOutput],
|
|
agreement_map: AgreementMap,
|
|
user_prompt: str,
|
|
) -> str:
|
|
"""Build truncated context for LLM to avoid token overflow."""
|
|
max_chars = self._max_context_chars
|
|
prompt_limit = min(800, max_chars // 4)
|
|
summary_limit = min(300, max_chars // (len(head_outputs) * 2) if head_outputs else 300)
|
|
claim_limit = min(150, max_chars // 20)
|
|
|
|
user_trunc = self._truncate(user_prompt, prompt_limit)
|
|
context = f"User asked: {user_trunc}\n\n"
|
|
context += "Head summaries:\n"
|
|
for out in head_outputs:
|
|
s = self._truncate(out.summary or "", summary_limit)
|
|
context += f"- {out.head_id.value}: {s}\n"
|
|
context += "\nAgreed claims:\n"
|
|
for c in agreement_map.agreed_claims[:10]:
|
|
claim = self._truncate(c.get("claim_text", ""), claim_limit)
|
|
context += f"- {claim} (confidence: {c.get('confidence', 0)})\n"
|
|
if agreement_map.disputed_claims:
|
|
context += "\nDisputed:\n"
|
|
for c in agreement_map.disputed_claims[:5]:
|
|
claim = self._truncate(c.get("claim_text", ""), claim_limit)
|
|
context += f"- {claim}\n"
|
|
|
|
if len(context) > max_chars:
|
|
context = context[: max_chars - 20] + "\n...[truncated]"
|
|
return context
|
|
|
|
def _compose_final_answer(
|
|
self,
|
|
head_outputs: list[HeadOutput],
|
|
agreement_map: AgreementMap,
|
|
user_prompt: str,
|
|
) -> str:
|
|
"""Compose narrative from head outputs and consensus."""
|
|
if not self._adapter:
|
|
return self._fallback_compose(head_outputs, agreement_map)
|
|
|
|
context = self._build_compose_context(head_outputs, agreement_map, user_prompt)
|
|
|
|
messages = [
|
|
{"role": "system", "content": WITNESS_COMPOSE_SYSTEM},
|
|
{"role": "user", "content": context},
|
|
]
|
|
try:
|
|
result = self._adapter.complete(messages, temperature=0.3)
|
|
return result.strip() if result else self._fallback_compose(head_outputs, agreement_map)
|
|
except Exception as e:
|
|
logger.exception("Witness compose failed", extra={"error": str(e)})
|
|
return self._fallback_compose(head_outputs, agreement_map)
|
|
|
|
def _fallback_compose(
|
|
self,
|
|
head_outputs: list[HeadOutput],
|
|
agreement_map: AgreementMap,
|
|
) -> str:
|
|
"""Simple concatenation when no adapter."""
|
|
parts = []
|
|
for out in head_outputs[:5]:
|
|
parts.append(f"[{out.head_id.value}] {out.summary}")
|
|
if agreement_map.agreed_claims:
|
|
parts.append("Key points: " + "; ".join(
|
|
c.get("claim_text", "")[:60] for c in agreement_map.agreed_claims[:5]
|
|
))
|
|
return "\n\n".join(parts) if parts else "No head outputs available."
|