Files
FusionAGI/fusionagi/agents/witness_agent.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

220 lines
8.4 KiB
Python

"""Witness agent: meta-controller that arbitrates head outputs and produces final response."""
from typing import Any
from fusionagi.agents.base_agent import BaseAgent
# Approx 4 chars/token; limit context to ~6k tokens (~24k chars) to avoid overflow
DEFAULT_MAX_CONTEXT_CHARS = 24_000
from fusionagi.adapters.base import LLMAdapter
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
from fusionagi.schemas.head import HeadId, HeadOutput
from fusionagi.schemas.witness import (
AgreementMap,
TransparencyReport,
FinalResponse,
)
from fusionagi.multi_agent.consensus_engine import run_consensus
from fusionagi._logger import logger
WITNESS_COMPOSE_SYSTEM = """You are the Witness meta-controller in a 12-headed multi-agent system.
You receive structured outputs from specialist heads (Logic, Research, Strategy, Security, etc.).
Your job: produce a clear, coherent final answer that synthesizes the head contributions.
Use the agreed claims. Acknowledge disputes if any. Be concise and actionable.
Output only the final narrative text, no JSON or meta-commentary."""
class WitnessAgent(BaseAgent):
"""
Witness: consumes HeadOutput from content heads, runs consensus, composes FinalResponse.
"""
def __init__(
self,
adapter: LLMAdapter | None = None,
max_context_chars: int = DEFAULT_MAX_CONTEXT_CHARS,
) -> None:
super().__init__(
identity=HeadId.WITNESS.value,
role="Witness",
objective="Arbitrate head outputs, resolve conflicts, produce final narrative",
memory_access=True,
tool_permissions=[],
)
self._adapter = adapter
self._max_context_chars = max_context_chars
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
"""On witness_request, produce FinalResponse from head outputs."""
if envelope.message.intent != "witness_request":
return None
payload = envelope.message.payload or {}
head_outputs_data = payload.get("head_outputs", [])
user_prompt = payload.get("prompt", "")
head_outputs: list[HeadOutput] = []
for h in head_outputs_data:
if isinstance(h, dict):
try:
head_outputs.append(HeadOutput.model_validate(h))
except Exception as e:
logger.warning("Witness: skip invalid HeadOutput", extra={"error": str(e)})
elif isinstance(h, HeadOutput):
head_outputs.append(h)
logger.info(
"Witness handle_message",
extra={"head_count": len(head_outputs), "intent": envelope.message.intent},
)
response = self._produce_response(head_outputs, user_prompt)
if response is None:
return envelope.create_response(
"witness_failed",
payload={"error": "Failed to produce final response"},
)
return AgentMessageEnvelope(
message=AgentMessage(
sender=self.identity,
recipient=envelope.message.sender,
intent="witness_output",
payload={"final_response": response.model_dump()},
),
task_id=envelope.task_id,
correlation_id=envelope.correlation_id,
)
def _produce_response(
self,
head_outputs: list[HeadOutput],
user_prompt: str,
) -> FinalResponse | None:
"""Run consensus and compose final answer."""
agreement_map = run_consensus(head_outputs)
head_contributions: list[dict[str, Any]] = []
for out in head_outputs:
key_claims = [c.claim_text[:80] + "..." if len(c.claim_text) > 80 else c.claim_text for c in out.claims[:3]]
head_contributions.append({
"head_id": out.head_id.value,
"summary": out.summary,
"key_claims": key_claims,
})
safety_report = self._build_safety_report(head_outputs)
transparency = TransparencyReport(
head_contributions=head_contributions,
agreement_map=agreement_map,
safety_report=safety_report,
confidence_score=agreement_map.confidence_score,
)
final_answer = self._compose_final_answer(
head_outputs=head_outputs,
agreement_map=agreement_map,
user_prompt=user_prompt,
)
return FinalResponse(
final_answer=final_answer,
transparency_report=transparency,
head_contributions=head_contributions,
confidence_score=agreement_map.confidence_score,
)
def _build_safety_report(self, head_outputs: list[HeadOutput]) -> str:
"""Summarize safety-relevant findings from Safety head and risks."""
safety_summaries = []
all_risks: list[str] = []
for out in head_outputs:
if out.head_id == HeadId.SAFETY and out.summary:
safety_summaries.append(out.summary)
for r in out.risks:
if r.severity in ("high", "critical"):
all_risks.append(f"[{out.head_id.value}] {r.description}")
if safety_summaries:
return " ".join(safety_summaries)
if all_risks:
return "Risks identified: " + "; ".join(all_risks[:5])
return "No significant safety concerns raised."
def _truncate(self, text: str, max_len: int) -> str:
"""Truncate text with ellipsis if over max_len."""
if len(text) <= max_len:
return text
return text[: max_len - 3] + "..."
def _build_compose_context(
self,
head_outputs: list[HeadOutput],
agreement_map: AgreementMap,
user_prompt: str,
) -> str:
"""Build truncated context for LLM to avoid token overflow."""
max_chars = self._max_context_chars
prompt_limit = min(800, max_chars // 4)
summary_limit = min(300, max_chars // (len(head_outputs) * 2) if head_outputs else 300)
claim_limit = min(150, max_chars // 20)
user_trunc = self._truncate(user_prompt, prompt_limit)
context = f"User asked: {user_trunc}\n\n"
context += "Head summaries:\n"
for out in head_outputs:
s = self._truncate(out.summary or "", summary_limit)
context += f"- {out.head_id.value}: {s}\n"
context += "\nAgreed claims:\n"
for c in agreement_map.agreed_claims[:10]:
claim = self._truncate(c.get("claim_text", ""), claim_limit)
context += f"- {claim} (confidence: {c.get('confidence', 0)})\n"
if agreement_map.disputed_claims:
context += "\nDisputed:\n"
for c in agreement_map.disputed_claims[:5]:
claim = self._truncate(c.get("claim_text", ""), claim_limit)
context += f"- {claim}\n"
if len(context) > max_chars:
context = context[: max_chars - 20] + "\n...[truncated]"
return context
def _compose_final_answer(
self,
head_outputs: list[HeadOutput],
agreement_map: AgreementMap,
user_prompt: str,
) -> str:
"""Compose narrative from head outputs and consensus."""
if not self._adapter:
return self._fallback_compose(head_outputs, agreement_map)
context = self._build_compose_context(head_outputs, agreement_map, user_prompt)
messages = [
{"role": "system", "content": WITNESS_COMPOSE_SYSTEM},
{"role": "user", "content": context},
]
try:
result = self._adapter.complete(messages, temperature=0.3)
return result.strip() if result else self._fallback_compose(head_outputs, agreement_map)
except Exception as e:
logger.exception("Witness compose failed", extra={"error": str(e)})
return self._fallback_compose(head_outputs, agreement_map)
def _fallback_compose(
self,
head_outputs: list[HeadOutput],
agreement_map: AgreementMap,
) -> str:
"""Simple concatenation when no adapter."""
parts = []
for out in head_outputs[:5]:
parts.append(f"[{out.head_id.value}] {out.summary}")
if agreement_map.agreed_claims:
parts.append("Key points: " + "; ".join(
c.get("claim_text", "")[:60] for c in agreement_map.agreed_claims[:5]
))
return "\n\n".join(parts) if parts else "No head outputs available."