300 lines
10 KiB
Python
300 lines
10 KiB
Python
"""Tests for self-improvement: schemas, correction, recommender, training, FusionAGILoop."""
|
|
|
|
import pytest
|
|
|
|
from fusionagi.schemas.recommendation import (
|
|
Recommendation,
|
|
RecommendationKind,
|
|
TrainingSuggestion,
|
|
TrainingSuggestionKind,
|
|
)
|
|
from fusionagi.schemas.task import TaskState
|
|
from fusionagi.core import EventBus, Orchestrator, StateManager
|
|
from fusionagi.memory import ReflectiveMemory
|
|
from fusionagi.agents import CriticAgent
|
|
from fusionagi.self_improvement import (
|
|
SelfCorrectionLoop,
|
|
AutoRecommender,
|
|
AutoTrainer,
|
|
FusionAGILoop,
|
|
)
|
|
class TestRecommendationSchemas:
|
|
"""Test Recommendation and TrainingSuggestion schemas."""
|
|
|
|
def test_recommendation_minimal(self):
|
|
r = Recommendation(title="Fix X", description="Do Y")
|
|
assert r.kind == RecommendationKind.OTHER
|
|
assert r.title == "Fix X"
|
|
assert r.priority == 0
|
|
assert r.source_task_id is None
|
|
assert r.created_at is not None
|
|
|
|
def test_recommendation_full(self):
|
|
r = Recommendation(
|
|
kind=RecommendationKind.STRATEGY_CHANGE,
|
|
title="Change strategy",
|
|
description="Use dependency order",
|
|
source_task_id="t1",
|
|
priority=8,
|
|
)
|
|
assert r.kind == RecommendationKind.STRATEGY_CHANGE
|
|
assert r.priority == 8
|
|
assert r.source_task_id == "t1"
|
|
|
|
def test_recommendation_title_whitespace_invalid(self):
|
|
with pytest.raises(ValueError, match="title"):
|
|
Recommendation(title=" ", description="x")
|
|
|
|
def test_training_suggestion_minimal(self):
|
|
s = TrainingSuggestion(key="heuristic_1", value="prefer linear")
|
|
assert s.kind == TrainingSuggestionKind.OTHER
|
|
assert s.key == "heuristic_1"
|
|
assert s.reason == ""
|
|
assert s.created_at is not None
|
|
|
|
def test_training_suggestion_full(self):
|
|
s = TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
|
|
key="h1",
|
|
value={"hint": "retry on timeout"},
|
|
source_task_id="t1",
|
|
reason="From failure",
|
|
)
|
|
assert s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE
|
|
assert s.source_task_id == "t1"
|
|
|
|
def test_training_suggestion_key_whitespace_invalid(self):
|
|
with pytest.raises(ValueError, match="key"):
|
|
TrainingSuggestion(key=" ", value="x")
|
|
|
|
|
|
class TestAutoRecommender:
|
|
"""Test AutoRecommender."""
|
|
|
|
def test_recommend_from_evaluation_empty(self):
|
|
rec = AutoRecommender()
|
|
out = rec.recommend_from_evaluation("t1", {})
|
|
assert out == []
|
|
|
|
def test_recommend_from_evaluation_suggestions(self):
|
|
rec = AutoRecommender()
|
|
out = rec.recommend_from_evaluation(
|
|
"t1",
|
|
{"suggestions": ["Retry", "Use tool X"], "success": False, "score": 0.3},
|
|
)
|
|
assert len(out) >= 2
|
|
assert any("Retry" in r.description for r in out)
|
|
assert any(r.kind == RecommendationKind.TRAINING_TARGET for r in out)
|
|
|
|
def test_recommend_from_evaluation_error_analysis_only(self):
|
|
rec = AutoRecommender()
|
|
out = rec.recommend_from_evaluation(
|
|
"t1",
|
|
{"error_analysis": ["Timeout"], "suggestions": [], "success": False},
|
|
)
|
|
assert len(out) >= 1
|
|
assert any(r.kind == RecommendationKind.STRATEGY_CHANGE for r in out)
|
|
|
|
def test_recommend_from_lessons_no_memory(self):
|
|
rec = AutoRecommender()
|
|
assert rec.recommend_from_lessons() == []
|
|
|
|
def test_recommend_from_lessons_with_memory(self):
|
|
mem = ReflectiveMemory()
|
|
mem.add_lesson({"task_id": "t1", "outcome": "failed", "evaluation": {"suggestions": ["Retry"]}})
|
|
rec = AutoRecommender(reflective_memory=mem)
|
|
out = rec.recommend_from_lessons(limit_lessons=5)
|
|
assert len(out) >= 1
|
|
|
|
def test_recommend_dedupe_and_sort(self):
|
|
rec = AutoRecommender()
|
|
out = rec.recommend(
|
|
task_id="t1",
|
|
evaluation={"suggestions": ["A", "A"], "success": True, "score": 0.9},
|
|
include_lessons=False,
|
|
)
|
|
assert len(out) == 1
|
|
assert out[0].priority <= 10
|
|
|
|
|
|
class TestAutoTrainer:
|
|
"""Test AutoTrainer."""
|
|
|
|
def test_suggest_from_evaluation_empty(self):
|
|
tr = AutoTrainer()
|
|
out = tr.suggest_from_evaluation(
|
|
"t1",
|
|
{"suggestions": [], "success": True, "score": 1.0},
|
|
)
|
|
assert out == []
|
|
|
|
def test_suggest_from_evaluation_suggestions(self):
|
|
tr = AutoTrainer()
|
|
out = tr.suggest_from_evaluation(
|
|
"t1",
|
|
{"suggestions": ["Heuristic A"], "success": True, "score": 0.8},
|
|
)
|
|
assert len(out) >= 1
|
|
assert any(s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE for s in out)
|
|
|
|
def test_suggest_from_evaluation_failure_adds_fine_tune(self):
|
|
tr = AutoTrainer()
|
|
out = tr.suggest_from_evaluation(
|
|
"t1",
|
|
{"suggestions": [], "success": False, "score": 0.2},
|
|
)
|
|
assert any(s.kind == TrainingSuggestionKind.FINE_TUNE_DATASET for s in out)
|
|
|
|
def test_apply_heuristic_updates_no_memory(self):
|
|
tr = AutoTrainer()
|
|
sugs = [
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
|
|
key="k1",
|
|
value="v1",
|
|
),
|
|
]
|
|
n = tr.apply_heuristic_updates(sugs)
|
|
assert n == 0
|
|
|
|
def test_apply_heuristic_updates_with_memory(self):
|
|
mem = ReflectiveMemory()
|
|
tr = AutoTrainer(reflective_memory=mem)
|
|
sugs = [
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
|
|
key="k1",
|
|
value="v1",
|
|
),
|
|
]
|
|
n = tr.apply_heuristic_updates(sugs)
|
|
assert n == 1
|
|
assert mem.get_heuristic("k1") == "v1"
|
|
|
|
def test_run_auto_training_returns_suggestions(self):
|
|
mem = ReflectiveMemory()
|
|
tr = AutoTrainer(reflective_memory=mem)
|
|
out = tr.run_auto_training(
|
|
task_id="t1",
|
|
evaluation={"suggestions": ["h1"], "success": True, "score": 0.7},
|
|
apply_heuristics=True,
|
|
)
|
|
assert len(out) >= 1
|
|
assert mem.get_heuristic("heuristic_from_task_t1_0") == "h1"
|
|
|
|
|
|
class TestSelfCorrectionLoop:
|
|
"""Test SelfCorrectionLoop (with stub critic)."""
|
|
|
|
def test_suggest_retry_non_failed_task(self):
|
|
bus = EventBus()
|
|
state = StateManager()
|
|
orch = Orchestrator(event_bus=bus, state_manager=state)
|
|
task_id = orch.submit_task(goal="x")
|
|
critic = CriticAgent(identity="critic")
|
|
loop = SelfCorrectionLoop(
|
|
state_manager=state,
|
|
orchestrator=orch,
|
|
critic_agent=critic,
|
|
max_retries_per_task=2,
|
|
)
|
|
ok, ctx = loop.suggest_retry(task_id)
|
|
assert ok is False
|
|
assert ctx == {}
|
|
|
|
def test_suggest_retry_failed_task_runs_reflection(self):
|
|
bus = EventBus()
|
|
state = StateManager()
|
|
orch = Orchestrator(event_bus=bus, state_manager=state)
|
|
task_id = orch.submit_task(goal="x")
|
|
orch.set_task_state(task_id, TaskState.ACTIVE, force=True)
|
|
state.append_trace(task_id, {"step": "s1"})
|
|
orch.set_task_state(task_id, TaskState.FAILED)
|
|
orch.register_agent("critic", CriticAgent(identity="critic"))
|
|
critic = orch.get_agent("critic")
|
|
loop = SelfCorrectionLoop(
|
|
state_manager=state,
|
|
orchestrator=orch,
|
|
critic_agent=critic,
|
|
max_retries_per_task=2,
|
|
)
|
|
ok, ctx = loop.suggest_retry(task_id)
|
|
assert isinstance(ok, bool)
|
|
if ok:
|
|
assert "evaluation" in ctx
|
|
|
|
def test_prepare_retry_non_failed_no_op(self):
|
|
bus = EventBus()
|
|
state = StateManager()
|
|
orch = Orchestrator(event_bus=bus, state_manager=state)
|
|
task_id = orch.submit_task(goal="x")
|
|
critic = CriticAgent(identity="critic")
|
|
loop = SelfCorrectionLoop(
|
|
state_manager=state,
|
|
orchestrator=orch,
|
|
critic_agent=critic,
|
|
)
|
|
loop.prepare_retry(task_id)
|
|
assert orch.get_task_state(task_id) == TaskState.PENDING
|
|
|
|
def test_correction_recommendations_failed_task(self):
|
|
bus = EventBus()
|
|
state = StateManager()
|
|
orch = Orchestrator(event_bus=bus, state_manager=state)
|
|
task_id = orch.submit_task(goal="x")
|
|
orch.set_task_state(task_id, TaskState.ACTIVE, force=True)
|
|
orch.set_task_state(task_id, TaskState.FAILED)
|
|
orch.register_agent("critic", CriticAgent(identity="critic"))
|
|
critic = orch.get_agent("critic")
|
|
loop = SelfCorrectionLoop(
|
|
state_manager=state,
|
|
orchestrator=orch,
|
|
critic_agent=critic,
|
|
)
|
|
recs = loop.correction_recommendations(task_id)
|
|
assert isinstance(recs, list)
|
|
assert all(isinstance(r, Recommendation) for r in recs)
|
|
|
|
|
|
class TestFusionAGILoop:
|
|
"""Test FusionAGILoop wiring."""
|
|
|
|
def test_loop_subscribe_and_unsubscribe(self):
|
|
bus = EventBus()
|
|
state = StateManager()
|
|
orch = Orchestrator(event_bus=bus, state_manager=state)
|
|
critic = CriticAgent(identity="critic")
|
|
orch.register_agent("critic", critic)
|
|
loop = FusionAGILoop(
|
|
event_bus=bus,
|
|
state_manager=state,
|
|
orchestrator=orch,
|
|
critic_agent=critic,
|
|
reflective_memory=None,
|
|
)
|
|
loop.unsubscribe()
|
|
bus.publish("task_state_changed", {"task_id": "x", "to_state": "failed"})
|
|
bus.publish("reflection_done", {"task_id": "y", "evaluation": {}})
|
|
assert True
|
|
|
|
def test_run_after_reflection(self):
|
|
bus = EventBus()
|
|
state = StateManager()
|
|
orch = Orchestrator(event_bus=bus, state_manager=state)
|
|
critic = CriticAgent(identity="critic")
|
|
mem = ReflectiveMemory()
|
|
loop = FusionAGILoop(
|
|
event_bus=bus,
|
|
state_manager=state,
|
|
orchestrator=orch,
|
|
critic_agent=critic,
|
|
reflective_memory=mem,
|
|
)
|
|
recs, sugs = loop.run_after_reflection(
|
|
task_id="t1",
|
|
evaluation={"suggestions": ["Improve plan"], "success": True, "score": 0.8},
|
|
)
|
|
assert isinstance(recs, list)
|
|
assert isinstance(sugs, list)
|
|
loop.unsubscribe()
|