Files
FusionAGI/docs/api_middleware_interface_spec.md
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

13 KiB

FusionAGI API and Middleware Interface Specification

1. Dvādaśa HTTP/WebSocket API

The main programmatic entry point is the Dvādaśa API, a FastAPI application exposing session-based prompts and streaming responses.

Request Flow (Prompt → Response)

sequenceDiagram
    participant Client
    participant API as Dvādaśa API
    participant Safety as SafetyPipeline
    participant Orch as Orchestrator
    participant Heads as Dvādaśa Heads
    participant Witness as Witness

    Client->>API: POST /v1/sessions/{id}/prompt
    API->>Safety: pre_check(prompt)
    Safety-->>API: ModerationResult
    API->>Orch: submit_task(goal)
    Orch->>Heads: run_heads_parallel
    Heads->>Witness: head outputs
    Witness->>Orch: final_answer
    Orch->>API: FinalResponse
    API->>Safety: post_check(final_answer)
    Safety-->>API: OutputScanResult
    API->>Client: 200 + FinalResponse

1.1 Application Factory

  • Location: fusionagi/api/app.py
  • Factory: create_app(adapter: Any = None, cors_origins: list[str] | None = None) -> FastAPI
    • Creates FastAPI app with title "FusionAGI Dvādaśa API"
    • Accepts optional LLMAdapter for head/Witness LLM calls
    • Accepts optional cors_origins to enable CORS middleware (e.g. ["*"] or ["https://example.com"])
    • Mounts router at prefix /v1 with tag dvadasa
    • Uses startup event to initialize Orchestrator, EventBus, SessionStore, SafetyPipeline

1.2 Base URL and Router Structure

Prefix Router Tag
/v1 api_router dvadasa
/v1/sessions sessions_router sessions

1.3 HTTP Endpoints

Method Path Description
POST /v1/sessions Create a new session
POST /v1/sessions/{session_id}/prompt Submit prompt and receive sync FinalResponse
WebSocket /v1/sessions/{session_id}/stream Streaming Dvādaśa response

POST /v1/sessions

Query params:

  • user_id (optional, str): User identifier

Response (200):

{
  "session_id": "uuid-string",
  "user_id": "optional-user-id"
}

POST /v1/sessions/{session_id}/prompt

Path params:

  • session_id (required): Session UUID

Body:

{
  "prompt": "string (required)",
  "use_all_heads": false
}

Pipeline:

  1. SafetyPipeline pre_check(prompt) - input moderation
  2. parse_user_input(prompt) - detect UserIntent (normal, head_strategy, show_dissent, etc.)
  3. Submit task via orchestrator.submit_task(goal=prompt[:200])
  4. select_heads_for_complexity(prompt) or explicit head from /head <id> command
  5. run_dvadasa() - parallel heads + Witness synthesis
  6. SafetyPipeline post_check(final_answer) - output scan (PII, blocked content)
  7. Append to session history

Response (200):

{
  "task_id": "string",
  "final_answer": "string",
  "transparency_report": { "head_contributions": [...] },
  "head_contributions": [...],
  "confidence_score": 0.0
}

Errors:

  • 400: missing prompt, pre_check failed, post_check failed
  • 404: session not found
  • 500: Dvādaśa failed
  • 503: service not initialized

WebSocket /v1/sessions/{session_id}/stream

Flow:

  1. Client connects and accepts
  2. Client sends {"prompt": "..."} JSON
  3. Server streams events via send_json:
    • heads_running
    • head_complete (per head: head_id, summary)
    • witness_running
    • complete (final_answer, transparency_report, head_contributions, confidence_score)
    • error (message)

1.4 Dependencies and App State

fusionagi/api/dependencies.py provides:

Function Returns Purpose
default_orchestrator(adapter) (Orchestrator, EventBus) Build orchestrator with Dvādaśa heads + Witness
get_orchestrator() Orchestrator From app state
get_event_bus() EventBus From app state
get_session_store() SessionStore In-memory session store
get_safety_pipeline() SafetyPipeline Pre/post checks
set_app_state(orchestrator, event_bus, session_store) - Populate app state
ensure_initialized(adapter) - Lazy init for tests

SessionStore interface:

  • create(session_id, user_id) -> dict
  • get(session_id) -> dict | None
  • append_history(session_id, entry) -> None

2. Request Pipeline (SafetyPipeline - Inline "Middleware")

FusionAGI does not use HTTP middleware (CORS, auth, etc.) by default. The SafetyPipeline acts as an inline request/response pipeline invoked inside route handlers. CORS can be enabled via create_app(cors_origins=[...]).

2.1 Components

fusionagi/governance/safety_pipeline.py

Class Role
InputModerator Pre-check: block/transform user input
OutputScanner Post-check: PII, blocked patterns in final answer
SafetyPipeline Combines moderator + scanner + optional audit log

2.2 InputModerator Interface

def add_blocked_pattern(self, pattern: str) -> None   # Regex
def add_blocked_phrase(self, phrase: str) -> None     # Exact phrase
def moderate(self, text: str) -> ModerationResult

ModerationResult: allowed: bool, transformed: str | None, reason: str | None

2.3 OutputScanner Interface

def add_pii_pattern(self, name: str, pattern: str) -> None
def add_blocked_pattern(self, pattern: str) -> None
def scan(self, text: str) -> OutputScanResult

OutputScanResult: passed: bool, flags: list[str], sanitized: str | None

Default PII: SSN, credit card patterns.

2.4 SafetyPipeline Interface

def pre_check(self, user_input: str) -> ModerationResult
def post_check(self, final_answer: str) -> OutputScanResult

Used in fusionagi/api/routes/sessions.py around prompt submission and final answer delivery.


3. Multi-Modal Interface Layer

The Interface layer provides abstractions for admin control and user interaction across modalities. This is not HTTP middleware but a conceptual middleware between end users and the core.

3.1 Base Abstractions

fusionagi/interfaces/base.py

Type Description
ModalityType Enum: TEXT, VOICE, VISUAL, HAPTIC, GESTURE, BIOMETRIC
InterfaceMessage Pydantic: id, modality, content, metadata, timestamp, user_id, session_id
InterfaceCapabilities supported_modalities, supports_streaming, supports_interruption, supports_multimodal, latency_ms, max_concurrent_sessions

InterfaceAdapter (ABC):

def capabilities(self) -> InterfaceCapabilities
async def send(self, message: InterfaceMessage) -> None
async def receive(self, timeout_seconds: float | None = None) -> InterfaceMessage | None
async def stream_send(self, messages: AsyncIterator[InterfaceMessage]) -> None  # default impl
async def initialize(self) -> None
async def shutdown(self) -> None
def validate_message(self, message: InterfaceMessage) -> bool

3.2 AdminControlPanel

fusionagi/interfaces/admin_panel.py

Constructor: AdminControlPanel(orchestrator, event_bus, state_manager, voice_library?, conversation_tuner?, policy_engine?, audit_log?, session_count_callback?)

Area Methods
Voice add_voice_profile, list_voices, update_voice_profile, remove_voice_profile, set_default_voice
Conversation register_conversation_style, list_conversation_styles, get_conversation_style, set_default_conversation_style
Agents configure_agent, get_agent_config, list_agents, enable_agent, disable_agent
Monitoring get_system_status, get_task_statistics, get_recent_events
Governance get_audit_entries, update_policy
Config export_configuration, import_configuration

Models: SystemStatus, AgentConfig

3.3 MultiModalUI

fusionagi/interfaces/multimodal_ui.py

Constructor: MultiModalUI(orchestrator, conversation_manager, voice_interface?, llm_process_callback?)

Area Methods
Sessions create_session, get_session, active_session_count, end_session
Modalities register_interface, enable_modality, disable_modality, get_available_modalities
I/O send_to_user, receive_from_user
Tasks submit_task_interactive
Conversation converse
Stats get_session_statistics

UserSession: session_id, user_id, conversation_session_id, active_modalities, preferences, accessibility_settings, started_at, last_activity_at

3.4 Voice Layer

fusionagi/interfaces/voice.py

Protocols:

  • TTSAdapter: async def synthesize(text, voice_id?, **kwargs) -> bytes | None
  • STTAdapter: async def transcribe(audio_data?, timeout_seconds?, **kwargs) -> str | None

VoiceLibrary: add_voice, remove_voice, get_voice, list_voices, set_default_voice, get_default_voice, update_voice

VoiceProfile: id, name, language, gender, age_range, style, pitch, speed, provider, provider_voice_id, metadata

VoiceInterface(InterfaceAdapter): implements send (TTS), receive (STT), set_active_voice, capabilities

3.5 Conversation Layer

fusionagi/interfaces/conversation.py

ConversationStyle: formality, verbosity, personality_traits, empathy_level, proactivity, humor_level, technical_depth

ConversationTuner: register_style, get_style, list_styles, set_default_style, get_default_style, tune_for_context

ConversationManager: create_session, get_session, add_turn, get_history, get_style_for_session, update_style, end_session, get_context_summary

ConversationTurn: turn_id, session_id, speaker (user|agent|system), content, intent, sentiment, confidence, timestamp


4. Architecture Summary

flowchart TB
    subgraph clients [Clients]
        HTTP[HTTP Client]
        WS[WebSocket Client]
    end

    subgraph api [Dvādaśa API]
        Sessions[POST /v1/sessions]
        Prompt[POST /v1/sessions/id/prompt]
        Stream[WS /v1/sessions/id/stream]
    end

    subgraph pipeline [Request Pipeline]
        PreCheck[SafetyPipeline.pre_check]
        PostCheck[SafetyPipeline.post_check]
    end

    subgraph core [Core]
        Orch[Orchestrator]
        Heads[Dvādaśa Heads]
        Witness[Witness]
    end

    subgraph interfaces [Interface Layer]
        Admin[AdminControlPanel]
        UI[MultiModalUI]
        Adapters[InterfaceAdapters]
    end

    HTTP --> Sessions
    HTTP --> Prompt
    WS --> Stream
    Prompt --> PreCheck
    PreCheck --> Orch
    Orch --> Heads
    Heads --> Witness
    Witness --> PostCheck
    PostCheck --> Prompt
    Admin --> Orch
    UI --> Orch
    UI --> Adapters

5. Adding HTTP Middleware

To enable CORS when creating the app:

from fusionagi.api import create_app

app = create_app(
    cors_origins=["*"],  # or ["https://example.com", "https://app.example.com"]
)

For additional custom middleware (auth, logging, etc.), use app.add_middleware() after creating the app. FusionAGI relies on FastAPI/Starlette defaults (ServerErrorMiddleware, ExceptionMiddleware, AsyncExitStackMiddleware) when no custom middleware is configured.