Files
FusionAGI/tests/test_phase2_phase3.py

147 lines
5.3 KiB
Python
Raw Permalink Normal View History

"""Phase 2/3: end-to-end flow with stub adapter, tools, executor, critic, reflection, governance."""
from fusionagi.core import EventBus, StateManager, Orchestrator
from fusionagi.agents import PlannerAgent, ReasonerAgent, ExecutorAgent, CriticAgent
from fusionagi.adapters import StubAdapter
from fusionagi.tools import ToolRegistry, ToolDef
from fusionagi.memory import WorkingMemory, EpisodicMemory, ReflectiveMemory
from fusionagi.reflection import run_reflection
from fusionagi.governance import Guardrails, RateLimiter, OverrideHooks, AccessControl, PolicyEngine
from fusionagi.schemas import TaskState, AgentMessage, AgentMessageEnvelope
from fusionagi.schemas.policy import PolicyRule, PolicyEffect
def test_planner_with_stub_adapter() -> None:
adapter = StubAdapter('{"steps":[{"id":"s1","description":"Step 1","dependencies":[]}],"fallback_paths":[]}')
planner = PlannerAgent(adapter=adapter)
env = AgentMessageEnvelope(
message=AgentMessage(sender="o", recipient="planner", intent="plan_request", payload={"goal": "Test"}),
task_id="t1",
)
out = planner.handle_message(env)
assert out is not None
assert out.message.intent == "plan_ready"
steps = out.message.payload["plan"]["steps"]
assert len(steps) == 1
assert steps[0]["id"] == "s1"
def test_executor_runs_tool_and_appends_trace() -> None:
state = StateManager()
reg = ToolRegistry()
reg.register(ToolDef(name="noop", description="No-op", fn=lambda: "ok", permission_scope=["*"]))
executor = ExecutorAgent(registry=reg, state_manager=state)
env = AgentMessageEnvelope(
message=AgentMessage(
sender="o",
recipient="executor",
intent="execute_step",
payload={
"step_id": "s1",
"plan": {"steps": [{"id": "s1", "description": "No-op", "dependencies": [], "tool_name": "noop", "tool_args": {}}], "fallback_paths": []},
"tool_name": "noop",
"tool_args": {},
},
),
task_id="task-1",
)
out = executor.handle_message(env)
assert out is not None
assert out.message.intent == "step_done"
trace = state.get_trace("task-1")
assert len(trace) == 1
assert trace[0].get("tool") == "noop"
assert trace[0].get("result") == "ok"
def test_critic_returns_evaluation() -> None:
critic = CriticAgent(adapter=None)
env = AgentMessageEnvelope(
message=AgentMessage(
sender="o",
recipient="critic",
intent="evaluate_request",
payload={"outcome": "completed", "trace": [], "plan": None},
),
task_id="t1",
)
out = critic.handle_message(env)
assert out is not None
assert out.message.intent == "evaluation_ready"
ev = out.message.payload["evaluation"]
assert "score" in ev
assert ev["success"] is True
def test_reflection_writes_to_reflective_memory() -> None:
critic = CriticAgent(adapter=None)
reflective = ReflectiveMemory()
ev = run_reflection(critic, "t1", "completed", [], None, reflective)
assert ev is not None
lessons = reflective.get_lessons(limit=5)
assert len(lessons) == 1
assert lessons[0]["task_id"] == "t1"
def test_guardrails_block_path() -> None:
g = Guardrails()
g.block_path_prefix("/etc")
result = g.pre_check("file_read", {"path": "/etc/passwd"})
assert result.allowed is False
assert result.error_message
result = g.pre_check("file_read", {"path": "/tmp/foo"})
assert result.allowed is True
def test_rate_limiter() -> None:
# Rate limiter is not yet wired to executor/orchestrator; tested in isolation here.
r = RateLimiter(max_calls=2, window_seconds=10.0)
assert r.allow("agent1")[0] is True
assert r.allow("agent1")[0] is True
assert r.allow("agent1")[0] is False
def test_override_hooks() -> None:
h = OverrideHooks()
seen = []
h.register(lambda e, p: (seen.append((e, p)), True)[1])
assert h.fire("task_paused_for_approval", {"task_id": "t1"}) is True
assert len(seen) == 1
assert seen[0][0] == "task_paused_for_approval"
def test_access_control_deny() -> None:
ac = AccessControl()
ac.deny("executor", "noop")
assert ac.allowed("executor", "noop") is False
assert ac.allowed("executor", "other_tool") is True
assert ac.allowed("planner", "noop") is True
def test_policy_engine_update_rule() -> None:
pe = PolicyEngine()
r = PolicyRule(rule_id="r1", effect=PolicyEffect.DENY, condition={"tool_name": "noop"}, reason="blocked", priority=1)
pe.add_rule(r)
assert pe.get_rule("r1") is not None
assert pe.get_rule("r1").reason == "blocked"
assert pe.update_rule("r1", {"reason": "updated"}) is True
assert pe.get_rule("r1").reason == "updated"
assert pe.update_rule("r1", {"priority": 5}) is True
assert pe.get_rule("r1").priority == 5
assert pe.remove_rule("r1") is True
assert pe.get_rule("r1") is None
assert pe.remove_rule("r1") is False
if __name__ == "__main__":
test_planner_with_stub_adapter()
test_executor_runs_tool_and_appends_trace()
test_critic_returns_evaluation()
test_reflection_writes_to_reflective_memory()
test_guardrails_block_path()
test_rate_limiter()
test_override_hooks()
test_access_control_deny()
test_policy_engine_update_rule()
print("Phase 2/3 tests OK")