"""Phase 1 success: orchestrator + stub agents + task + message flow (no LLM).""" from fusionagi.core import EventBus, StateManager, Orchestrator from fusionagi.agents import PlannerAgent from fusionagi.schemas import TaskState, AgentMessage, AgentMessageEnvelope def test_orchestrator_register_submit_get_state() -> None: bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) planner = PlannerAgent(identity="planner") orch.register_agent("planner", planner) task_id = orch.submit_task(goal="Test goal", constraints=[]) assert task_id assert orch.get_task_state(task_id) == TaskState.PENDING task = orch.get_task(task_id) assert task is not None assert task.goal == "Test goal" def test_planner_handle_message_returns_plan_ready() -> None: planner = PlannerAgent(identity="planner") envelope = AgentMessageEnvelope( message=AgentMessage( sender="orchestrator", recipient="planner", intent="plan_request", payload={"goal": "Do something"}, ), task_id="task-1", ) out = planner.handle_message(envelope) assert out is not None assert out.message.intent == "plan_ready" assert "plan" in out.message.payload steps = out.message.payload["plan"]["steps"] assert len(steps) == 3 assert steps[0]["id"] == "step_1" def test_event_bus_publish_subscribe() -> None: bus = EventBus() seen: list[tuple[str, dict]] = [] bus.subscribe("task_created", lambda t, p: seen.append((t, p))) bus.publish("task_created", {"task_id": "t1", "goal": "g1"}) assert len(seen) == 1 assert seen[0][0] == "task_created" assert seen[0][1]["task_id"] == "t1" def test_event_bus_handler_failure() -> None: bus = EventBus() seen: list[tuple[str, dict]] = [] def good_handler(t: str, p: dict) -> None: seen.append((t, p)) def bad_handler(t: str, p: dict) -> None: raise RuntimeError("handler failed") bus.subscribe("ev", good_handler) bus.subscribe("ev", bad_handler) bus.publish("ev", {"task_id": "t1"}) assert len(seen) == 1 assert seen[0][0] == "ev" assert seen[0][1]["task_id"] == "t1" def test_event_bus_get_recent_events() -> None: """EventBus(history_size=N) records events; get_recent_events returns them.""" bus = EventBus(history_size=10) bus.publish("task_created", {"task_id": "t1", "goal": "g1"}) bus.publish("task_state_changed", {"task_id": "t1", "to_state": "active"}) events = bus.get_recent_events(limit=5) assert len(events) == 2 assert events[0]["event_type"] == "task_created" assert events[0]["payload"]["task_id"] == "t1" assert "timestamp" in events[0] assert events[1]["event_type"] == "task_state_changed" # Default history_size=0 returns [] bus_default = EventBus() assert bus_default.get_recent_events(5) == [] def test_state_manager_task_and_trace() -> None: from fusionagi.schemas.task import Task, TaskState state = StateManager() task = Task(task_id="t1", goal="g1", state=TaskState.ACTIVE) state.set_task(task) assert state.get_task_state("t1") == TaskState.ACTIVE state.append_trace("t1", {"step": "s1", "result": "ok"}) trace = state.get_trace("t1") assert len(trace) == 1 assert trace[0]["step"] == "s1" def test_orchestrator_set_task_state() -> None: bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) task_id = orch.submit_task(goal="G", constraints=[]) assert orch.get_task_state(task_id) == TaskState.PENDING orch.set_task_state(task_id, TaskState.ACTIVE) assert orch.get_task_state(task_id) == TaskState.ACTIVE def test_orchestrator_route_message_return() -> None: bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) planner = PlannerAgent(identity="planner") orch.register_agent("planner", planner) envelope = AgentMessageEnvelope( message=AgentMessage( sender="orch", recipient="planner", intent="plan_request", payload={"goal": "Do something"}, ), task_id="t1", ) response = orch.route_message_return(envelope) assert response is not None assert response.message.intent == "plan_ready" assert "plan" in response.message.payload def test_orchestrator_unregister_removes_from_parent() -> None: bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) planner = PlannerAgent(identity="planner") child = PlannerAgent(identity="child") orch.register_agent("planner", planner) orch.register_sub_agent("planner", "child", child) assert orch.get_sub_agents("planner") == ["child"] orch.unregister_agent("child") assert orch.get_sub_agents("planner") == [] assert orch.get_agent("child") is None def test_tot_multi_branch() -> None: """Test that Tree-of-Thought works with multiple branches.""" from fusionagi.adapters import StubAdapter from fusionagi.reasoning import run_tree_of_thought # Create adapter that returns JSON for evaluation adapter = StubAdapter('{"score": 0.8, "reason": "good approach"}') # Should not raise NotImplementedError anymore response, trace = run_tree_of_thought(adapter, "What is 2+2?", max_branches=2) # Should return a response assert response is not None assert len(trace) > 0 if __name__ == "__main__": test_orchestrator_register_submit_get_state() test_planner_handle_message_returns_plan_ready() test_event_bus_publish_subscribe() test_event_bus_handler_failure() test_state_manager_task_and_trace() test_orchestrator_set_task_state() test_orchestrator_route_message_return() test_orchestrator_unregister_removes_from_parent() test_tot_not_implemented() print("Phase 1 tests OK")