Files

100 lines
3.1 KiB
Python
Raw Permalink Normal View History

"""WebSocket streaming for Dvādaśa responses."""
import asyncio
import json
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from fusionagi.api.dependencies import get_orchestrator, get_session_store, get_event_bus
from fusionagi.core import run_heads_parallel, run_witness, select_heads_for_complexity
from fusionagi.schemas.commands import parse_user_input
from fusionagi.schemas.head import HeadId, HeadOutput
async def handle_stream(
session_id: str,
prompt: str,
send_fn: Any,
) -> None:
"""
Run Dvādaśa flow and stream events to WebSocket.
Events: heads_running, head_complete, heads_done, witness_running, complete.
"""
from fusionagi.api.dependencies import ensure_initialized
ensure_initialized()
store = get_session_store()
orch = get_orchestrator()
bus = get_event_bus()
if not store or not orch:
await send_fn({"type": "error", "message": "Service not initialized"})
return
sess = store.get(session_id)
if not sess:
await send_fn({"type": "error", "message": "Session not found"})
return
if not prompt:
await send_fn({"type": "error", "message": "prompt is required"})
return
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
parsed = parse_user_input(prompt)
task_id = orch.submit_task(goal=prompt[:200])
head_ids = select_heads_for_complexity(prompt)
if parsed.intent.value == "head_strategy" and parsed.head_id:
head_ids = [parsed.head_id]
await send_fn({"type": "heads_running", "message": "Heads running…"})
def run_heads():
return run_heads_parallel(orch, task_id, prompt, head_ids=head_ids)
try:
head_outputs = await loop.run_in_executor(executor, run_heads)
except Exception as e:
await send_fn({"type": "error", "message": str(e)})
return
for ho in head_outputs:
await send_fn({
"type": "head_complete",
"head_id": ho.head_id.value,
"summary": ho.summary,
})
await send_fn({
"type": "head_speak",
"head_id": ho.head_id.value,
"summary": ho.summary,
"audio_base64": None,
})
await send_fn({"type": "witness_running", "message": "Witness composing…"})
def run_wit():
return run_witness(orch, task_id, head_outputs, prompt)
try:
final = await loop.run_in_executor(executor, run_wit)
except Exception as e:
await send_fn({"type": "error", "message": str(e)})
return
if final:
await send_fn({
"type": "complete",
"final_answer": final.final_answer,
"transparency_report": final.transparency_report.model_dump(),
"head_contributions": final.head_contributions,
"confidence_score": final.confidence_score,
})
store.append_history(session_id, {
"prompt": prompt,
"final_answer": final.final_answer,
"confidence_score": final.confidence_score,
})
else:
await send_fn({"type": "error", "message": "Failed to produce response"})