"""Tests for self-improvement: schemas, correction, recommender, training, FusionAGILoop.""" import pytest from fusionagi.schemas.recommendation import ( Recommendation, RecommendationKind, TrainingSuggestion, TrainingSuggestionKind, ) from fusionagi.schemas.task import TaskState from fusionagi.core import EventBus, Orchestrator, StateManager from fusionagi.memory import ReflectiveMemory from fusionagi.agents import CriticAgent from fusionagi.self_improvement import ( SelfCorrectionLoop, AutoRecommender, AutoTrainer, FusionAGILoop, ) class TestRecommendationSchemas: """Test Recommendation and TrainingSuggestion schemas.""" def test_recommendation_minimal(self): r = Recommendation(title="Fix X", description="Do Y") assert r.kind == RecommendationKind.OTHER assert r.title == "Fix X" assert r.priority == 0 assert r.source_task_id is None assert r.created_at is not None def test_recommendation_full(self): r = Recommendation( kind=RecommendationKind.STRATEGY_CHANGE, title="Change strategy", description="Use dependency order", source_task_id="t1", priority=8, ) assert r.kind == RecommendationKind.STRATEGY_CHANGE assert r.priority == 8 assert r.source_task_id == "t1" def test_recommendation_title_whitespace_invalid(self): with pytest.raises(ValueError, match="title"): Recommendation(title=" ", description="x") def test_training_suggestion_minimal(self): s = TrainingSuggestion(key="heuristic_1", value="prefer linear") assert s.kind == TrainingSuggestionKind.OTHER assert s.key == "heuristic_1" assert s.reason == "" assert s.created_at is not None def test_training_suggestion_full(self): s = TrainingSuggestion( kind=TrainingSuggestionKind.HEURISTIC_UPDATE, key="h1", value={"hint": "retry on timeout"}, source_task_id="t1", reason="From failure", ) assert s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE assert s.source_task_id == "t1" def test_training_suggestion_key_whitespace_invalid(self): with pytest.raises(ValueError, match="key"): TrainingSuggestion(key=" ", value="x") class TestAutoRecommender: """Test AutoRecommender.""" def test_recommend_from_evaluation_empty(self): rec = AutoRecommender() out = rec.recommend_from_evaluation("t1", {}) assert out == [] def test_recommend_from_evaluation_suggestions(self): rec = AutoRecommender() out = rec.recommend_from_evaluation( "t1", {"suggestions": ["Retry", "Use tool X"], "success": False, "score": 0.3}, ) assert len(out) >= 2 assert any("Retry" in r.description for r in out) assert any(r.kind == RecommendationKind.TRAINING_TARGET for r in out) def test_recommend_from_evaluation_error_analysis_only(self): rec = AutoRecommender() out = rec.recommend_from_evaluation( "t1", {"error_analysis": ["Timeout"], "suggestions": [], "success": False}, ) assert len(out) >= 1 assert any(r.kind == RecommendationKind.STRATEGY_CHANGE for r in out) def test_recommend_from_lessons_no_memory(self): rec = AutoRecommender() assert rec.recommend_from_lessons() == [] def test_recommend_from_lessons_with_memory(self): mem = ReflectiveMemory() mem.add_lesson({"task_id": "t1", "outcome": "failed", "evaluation": {"suggestions": ["Retry"]}}) rec = AutoRecommender(reflective_memory=mem) out = rec.recommend_from_lessons(limit_lessons=5) assert len(out) >= 1 def test_recommend_dedupe_and_sort(self): rec = AutoRecommender() out = rec.recommend( task_id="t1", evaluation={"suggestions": ["A", "A"], "success": True, "score": 0.9}, include_lessons=False, ) assert len(out) == 1 assert out[0].priority <= 10 class TestAutoTrainer: """Test AutoTrainer.""" def test_suggest_from_evaluation_empty(self): tr = AutoTrainer() out = tr.suggest_from_evaluation( "t1", {"suggestions": [], "success": True, "score": 1.0}, ) assert out == [] def test_suggest_from_evaluation_suggestions(self): tr = AutoTrainer() out = tr.suggest_from_evaluation( "t1", {"suggestions": ["Heuristic A"], "success": True, "score": 0.8}, ) assert len(out) >= 1 assert any(s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE for s in out) def test_suggest_from_evaluation_failure_adds_fine_tune(self): tr = AutoTrainer() out = tr.suggest_from_evaluation( "t1", {"suggestions": [], "success": False, "score": 0.2}, ) assert any(s.kind == TrainingSuggestionKind.FINE_TUNE_DATASET for s in out) def test_apply_heuristic_updates_no_memory(self): tr = AutoTrainer() sugs = [ TrainingSuggestion( kind=TrainingSuggestionKind.HEURISTIC_UPDATE, key="k1", value="v1", ), ] n = tr.apply_heuristic_updates(sugs) assert n == 0 def test_apply_heuristic_updates_with_memory(self): mem = ReflectiveMemory() tr = AutoTrainer(reflective_memory=mem) sugs = [ TrainingSuggestion( kind=TrainingSuggestionKind.HEURISTIC_UPDATE, key="k1", value="v1", ), ] n = tr.apply_heuristic_updates(sugs) assert n == 1 assert mem.get_heuristic("k1") == "v1" def test_run_auto_training_returns_suggestions(self): mem = ReflectiveMemory() tr = AutoTrainer(reflective_memory=mem) out = tr.run_auto_training( task_id="t1", evaluation={"suggestions": ["h1"], "success": True, "score": 0.7}, apply_heuristics=True, ) assert len(out) >= 1 assert mem.get_heuristic("heuristic_from_task_t1_0") == "h1" class TestSelfCorrectionLoop: """Test SelfCorrectionLoop (with stub critic).""" def test_suggest_retry_non_failed_task(self): bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) task_id = orch.submit_task(goal="x") critic = CriticAgent(identity="critic") loop = SelfCorrectionLoop( state_manager=state, orchestrator=orch, critic_agent=critic, max_retries_per_task=2, ) ok, ctx = loop.suggest_retry(task_id) assert ok is False assert ctx == {} def test_suggest_retry_failed_task_runs_reflection(self): bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) task_id = orch.submit_task(goal="x") orch.set_task_state(task_id, TaskState.ACTIVE, force=True) state.append_trace(task_id, {"step": "s1"}) orch.set_task_state(task_id, TaskState.FAILED) orch.register_agent("critic", CriticAgent(identity="critic")) critic = orch.get_agent("critic") loop = SelfCorrectionLoop( state_manager=state, orchestrator=orch, critic_agent=critic, max_retries_per_task=2, ) ok, ctx = loop.suggest_retry(task_id) assert isinstance(ok, bool) if ok: assert "evaluation" in ctx def test_prepare_retry_non_failed_no_op(self): bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) task_id = orch.submit_task(goal="x") critic = CriticAgent(identity="critic") loop = SelfCorrectionLoop( state_manager=state, orchestrator=orch, critic_agent=critic, ) loop.prepare_retry(task_id) assert orch.get_task_state(task_id) == TaskState.PENDING def test_correction_recommendations_failed_task(self): bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) task_id = orch.submit_task(goal="x") orch.set_task_state(task_id, TaskState.ACTIVE, force=True) orch.set_task_state(task_id, TaskState.FAILED) orch.register_agent("critic", CriticAgent(identity="critic")) critic = orch.get_agent("critic") loop = SelfCorrectionLoop( state_manager=state, orchestrator=orch, critic_agent=critic, ) recs = loop.correction_recommendations(task_id) assert isinstance(recs, list) assert all(isinstance(r, Recommendation) for r in recs) class TestFusionAGILoop: """Test FusionAGILoop wiring.""" def test_loop_subscribe_and_unsubscribe(self): bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) critic = CriticAgent(identity="critic") orch.register_agent("critic", critic) loop = FusionAGILoop( event_bus=bus, state_manager=state, orchestrator=orch, critic_agent=critic, reflective_memory=None, ) loop.unsubscribe() bus.publish("task_state_changed", {"task_id": "x", "to_state": "failed"}) bus.publish("reflection_done", {"task_id": "y", "evaluation": {}}) assert True def test_run_after_reflection(self): bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) critic = CriticAgent(identity="critic") mem = ReflectiveMemory() loop = FusionAGILoop( event_bus=bus, state_manager=state, orchestrator=orch, critic_agent=critic, reflective_memory=mem, ) recs, sugs = loop.run_after_reflection( task_id="t1", evaluation={"suggestions": ["Improve plan"], "success": True, "score": 0.8}, ) assert isinstance(recs, list) assert isinstance(sugs, list) loop.unsubscribe()