Files
FusionAGI/tests/test_self_improvement.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

300 lines
10 KiB
Python

"""Tests for self-improvement: schemas, correction, recommender, training, FusionAGILoop."""
import pytest
from fusionagi.schemas.recommendation import (
Recommendation,
RecommendationKind,
TrainingSuggestion,
TrainingSuggestionKind,
)
from fusionagi.schemas.task import TaskState
from fusionagi.core import EventBus, Orchestrator, StateManager
from fusionagi.memory import ReflectiveMemory
from fusionagi.agents import CriticAgent
from fusionagi.self_improvement import (
SelfCorrectionLoop,
AutoRecommender,
AutoTrainer,
FusionAGILoop,
)
class TestRecommendationSchemas:
"""Test Recommendation and TrainingSuggestion schemas."""
def test_recommendation_minimal(self):
r = Recommendation(title="Fix X", description="Do Y")
assert r.kind == RecommendationKind.OTHER
assert r.title == "Fix X"
assert r.priority == 0
assert r.source_task_id is None
assert r.created_at is not None
def test_recommendation_full(self):
r = Recommendation(
kind=RecommendationKind.STRATEGY_CHANGE,
title="Change strategy",
description="Use dependency order",
source_task_id="t1",
priority=8,
)
assert r.kind == RecommendationKind.STRATEGY_CHANGE
assert r.priority == 8
assert r.source_task_id == "t1"
def test_recommendation_title_whitespace_invalid(self):
with pytest.raises(ValueError, match="title"):
Recommendation(title=" ", description="x")
def test_training_suggestion_minimal(self):
s = TrainingSuggestion(key="heuristic_1", value="prefer linear")
assert s.kind == TrainingSuggestionKind.OTHER
assert s.key == "heuristic_1"
assert s.reason == ""
assert s.created_at is not None
def test_training_suggestion_full(self):
s = TrainingSuggestion(
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
key="h1",
value={"hint": "retry on timeout"},
source_task_id="t1",
reason="From failure",
)
assert s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE
assert s.source_task_id == "t1"
def test_training_suggestion_key_whitespace_invalid(self):
with pytest.raises(ValueError, match="key"):
TrainingSuggestion(key=" ", value="x")
class TestAutoRecommender:
"""Test AutoRecommender."""
def test_recommend_from_evaluation_empty(self):
rec = AutoRecommender()
out = rec.recommend_from_evaluation("t1", {})
assert out == []
def test_recommend_from_evaluation_suggestions(self):
rec = AutoRecommender()
out = rec.recommend_from_evaluation(
"t1",
{"suggestions": ["Retry", "Use tool X"], "success": False, "score": 0.3},
)
assert len(out) >= 2
assert any("Retry" in r.description for r in out)
assert any(r.kind == RecommendationKind.TRAINING_TARGET for r in out)
def test_recommend_from_evaluation_error_analysis_only(self):
rec = AutoRecommender()
out = rec.recommend_from_evaluation(
"t1",
{"error_analysis": ["Timeout"], "suggestions": [], "success": False},
)
assert len(out) >= 1
assert any(r.kind == RecommendationKind.STRATEGY_CHANGE for r in out)
def test_recommend_from_lessons_no_memory(self):
rec = AutoRecommender()
assert rec.recommend_from_lessons() == []
def test_recommend_from_lessons_with_memory(self):
mem = ReflectiveMemory()
mem.add_lesson({"task_id": "t1", "outcome": "failed", "evaluation": {"suggestions": ["Retry"]}})
rec = AutoRecommender(reflective_memory=mem)
out = rec.recommend_from_lessons(limit_lessons=5)
assert len(out) >= 1
def test_recommend_dedupe_and_sort(self):
rec = AutoRecommender()
out = rec.recommend(
task_id="t1",
evaluation={"suggestions": ["A", "A"], "success": True, "score": 0.9},
include_lessons=False,
)
assert len(out) == 1
assert out[0].priority <= 10
class TestAutoTrainer:
"""Test AutoTrainer."""
def test_suggest_from_evaluation_empty(self):
tr = AutoTrainer()
out = tr.suggest_from_evaluation(
"t1",
{"suggestions": [], "success": True, "score": 1.0},
)
assert out == []
def test_suggest_from_evaluation_suggestions(self):
tr = AutoTrainer()
out = tr.suggest_from_evaluation(
"t1",
{"suggestions": ["Heuristic A"], "success": True, "score": 0.8},
)
assert len(out) >= 1
assert any(s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE for s in out)
def test_suggest_from_evaluation_failure_adds_fine_tune(self):
tr = AutoTrainer()
out = tr.suggest_from_evaluation(
"t1",
{"suggestions": [], "success": False, "score": 0.2},
)
assert any(s.kind == TrainingSuggestionKind.FINE_TUNE_DATASET for s in out)
def test_apply_heuristic_updates_no_memory(self):
tr = AutoTrainer()
sugs = [
TrainingSuggestion(
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
key="k1",
value="v1",
),
]
n = tr.apply_heuristic_updates(sugs)
assert n == 0
def test_apply_heuristic_updates_with_memory(self):
mem = ReflectiveMemory()
tr = AutoTrainer(reflective_memory=mem)
sugs = [
TrainingSuggestion(
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
key="k1",
value="v1",
),
]
n = tr.apply_heuristic_updates(sugs)
assert n == 1
assert mem.get_heuristic("k1") == "v1"
def test_run_auto_training_returns_suggestions(self):
mem = ReflectiveMemory()
tr = AutoTrainer(reflective_memory=mem)
out = tr.run_auto_training(
task_id="t1",
evaluation={"suggestions": ["h1"], "success": True, "score": 0.7},
apply_heuristics=True,
)
assert len(out) >= 1
assert mem.get_heuristic("heuristic_from_task_t1_0") == "h1"
class TestSelfCorrectionLoop:
"""Test SelfCorrectionLoop (with stub critic)."""
def test_suggest_retry_non_failed_task(self):
bus = EventBus()
state = StateManager()
orch = Orchestrator(event_bus=bus, state_manager=state)
task_id = orch.submit_task(goal="x")
critic = CriticAgent(identity="critic")
loop = SelfCorrectionLoop(
state_manager=state,
orchestrator=orch,
critic_agent=critic,
max_retries_per_task=2,
)
ok, ctx = loop.suggest_retry(task_id)
assert ok is False
assert ctx == {}
def test_suggest_retry_failed_task_runs_reflection(self):
bus = EventBus()
state = StateManager()
orch = Orchestrator(event_bus=bus, state_manager=state)
task_id = orch.submit_task(goal="x")
orch.set_task_state(task_id, TaskState.ACTIVE, force=True)
state.append_trace(task_id, {"step": "s1"})
orch.set_task_state(task_id, TaskState.FAILED)
orch.register_agent("critic", CriticAgent(identity="critic"))
critic = orch.get_agent("critic")
loop = SelfCorrectionLoop(
state_manager=state,
orchestrator=orch,
critic_agent=critic,
max_retries_per_task=2,
)
ok, ctx = loop.suggest_retry(task_id)
assert isinstance(ok, bool)
if ok:
assert "evaluation" in ctx
def test_prepare_retry_non_failed_no_op(self):
bus = EventBus()
state = StateManager()
orch = Orchestrator(event_bus=bus, state_manager=state)
task_id = orch.submit_task(goal="x")
critic = CriticAgent(identity="critic")
loop = SelfCorrectionLoop(
state_manager=state,
orchestrator=orch,
critic_agent=critic,
)
loop.prepare_retry(task_id)
assert orch.get_task_state(task_id) == TaskState.PENDING
def test_correction_recommendations_failed_task(self):
bus = EventBus()
state = StateManager()
orch = Orchestrator(event_bus=bus, state_manager=state)
task_id = orch.submit_task(goal="x")
orch.set_task_state(task_id, TaskState.ACTIVE, force=True)
orch.set_task_state(task_id, TaskState.FAILED)
orch.register_agent("critic", CriticAgent(identity="critic"))
critic = orch.get_agent("critic")
loop = SelfCorrectionLoop(
state_manager=state,
orchestrator=orch,
critic_agent=critic,
)
recs = loop.correction_recommendations(task_id)
assert isinstance(recs, list)
assert all(isinstance(r, Recommendation) for r in recs)
class TestFusionAGILoop:
"""Test FusionAGILoop wiring."""
def test_loop_subscribe_and_unsubscribe(self):
bus = EventBus()
state = StateManager()
orch = Orchestrator(event_bus=bus, state_manager=state)
critic = CriticAgent(identity="critic")
orch.register_agent("critic", critic)
loop = FusionAGILoop(
event_bus=bus,
state_manager=state,
orchestrator=orch,
critic_agent=critic,
reflective_memory=None,
)
loop.unsubscribe()
bus.publish("task_state_changed", {"task_id": "x", "to_state": "failed"})
bus.publish("reflection_done", {"task_id": "y", "evaluation": {}})
assert True
def test_run_after_reflection(self):
bus = EventBus()
state = StateManager()
orch = Orchestrator(event_bus=bus, state_manager=state)
critic = CriticAgent(identity="critic")
mem = ReflectiveMemory()
loop = FusionAGILoop(
event_bus=bus,
state_manager=state,
orchestrator=orch,
critic_agent=critic,
reflective_memory=mem,
)
recs, sugs = loop.run_after_reflection(
task_id="t1",
evaluation={"suggestions": ["Improve plan"], "success": True, "score": 0.8},
)
assert isinstance(recs, list)
assert isinstance(sugs, list)
loop.unsubscribe()