168 lines
6.1 KiB
Python
168 lines
6.1 KiB
Python
"""Auto training: suggest and apply heuristic updates from reflection and failures."""
|
|
|
|
from typing import Any, Protocol
|
|
|
|
from fusionagi.schemas.recommendation import TrainingSuggestion, TrainingSuggestionKind
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class ReflectiveMemoryLike(Protocol):
|
|
"""Protocol for reflective memory: set_heuristic, get_lessons."""
|
|
|
|
def set_heuristic(self, key: str, value: Any) -> None: ...
|
|
def get_lessons(self, limit: int = 50) -> list[dict[str, Any]]: ...
|
|
def get_all_heuristics(self) -> dict[str, Any]: ...
|
|
|
|
|
|
class AutoTrainer:
|
|
"""
|
|
Suggests training actions (heuristic updates, prompt tuning, fine-tune datasets)
|
|
from lessons and evaluations, and applies heuristic updates to reflective memory.
|
|
"""
|
|
|
|
def __init__(self, reflective_memory: ReflectiveMemoryLike | None = None) -> None:
|
|
"""
|
|
Initialize the auto-trainer.
|
|
|
|
Args:
|
|
reflective_memory: Optional reflective memory for applying heuristics.
|
|
"""
|
|
self._memory = reflective_memory
|
|
|
|
def suggest_from_evaluation(
|
|
self,
|
|
task_id: str,
|
|
evaluation: dict[str, Any],
|
|
) -> list[TrainingSuggestion]:
|
|
"""
|
|
From a single Critic evaluation, produce training suggestions
|
|
(heuristic_update from suggestions, fine_tune_dataset on failure).
|
|
"""
|
|
suggestions: list[TrainingSuggestion] = []
|
|
ev_suggestions = evaluation.get("suggestions", [])
|
|
success = evaluation.get("success", False)
|
|
score = evaluation.get("score", 0.5)
|
|
|
|
for i, s in enumerate(ev_suggestions[:5]):
|
|
key = f"heuristic_from_task_{task_id}_{i}"
|
|
suggestions.append(
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
|
|
key=key,
|
|
value=s,
|
|
source_task_id=task_id,
|
|
reason="From Critic evaluation suggestion",
|
|
)
|
|
)
|
|
if not success or score < 0.5:
|
|
suggestions.append(
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.FINE_TUNE_DATASET,
|
|
key=f"training_target_{task_id}",
|
|
value={
|
|
"task_id": task_id,
|
|
"outcome": "failed" if not success else "low_score",
|
|
"score": score,
|
|
"suggestions": ev_suggestions,
|
|
},
|
|
source_task_id=task_id,
|
|
reason="Task failed or low score; add to training dataset",
|
|
)
|
|
)
|
|
return suggestions
|
|
|
|
def suggest_from_lessons(self, limit_lessons: int = 20) -> list[TrainingSuggestion]:
|
|
"""
|
|
Aggregate lessons into training suggestions (e.g. strategy_param
|
|
or heuristic updates from repeated patterns).
|
|
"""
|
|
if not self._memory:
|
|
return []
|
|
lessons = self._memory.get_lessons(limit=limit_lessons)
|
|
suggestions: list[TrainingSuggestion] = []
|
|
for lesson in lessons[-10:]:
|
|
ev = lesson.get("evaluation", {})
|
|
tid = lesson.get("task_id", "unknown")
|
|
for i, s in enumerate(ev.get("suggestions", [])[:2]):
|
|
key = f"lesson_heuristic_{tid}_{i}"
|
|
suggestions.append(
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
|
|
key=key,
|
|
value=s,
|
|
source_task_id=tid,
|
|
reason="From reflective lesson",
|
|
)
|
|
)
|
|
return suggestions
|
|
|
|
def suggest_training(
|
|
self,
|
|
task_id: str | None = None,
|
|
evaluation: dict[str, Any] | None = None,
|
|
include_lessons: bool = True,
|
|
) -> list[TrainingSuggestion]:
|
|
"""
|
|
Produce all training suggestions from optional evaluation and
|
|
optionally from lessons.
|
|
"""
|
|
out: list[TrainingSuggestion] = []
|
|
if task_id and evaluation:
|
|
out.extend(self.suggest_from_evaluation(task_id, evaluation))
|
|
if include_lessons:
|
|
out.extend(self.suggest_from_lessons())
|
|
logger.debug(
|
|
"AutoTrainer.suggest_training",
|
|
extra={"count": len(out), "task_id": task_id},
|
|
)
|
|
return out
|
|
|
|
def apply_heuristic_updates(
|
|
self,
|
|
suggestions: list[TrainingSuggestion],
|
|
reflective_memory: ReflectiveMemoryLike | None = None,
|
|
) -> int:
|
|
"""
|
|
Apply heuristic-update suggestions to reflective memory.
|
|
Returns number of heuristics applied. Other suggestion kinds are logged
|
|
but not applied (e.g. fine_tune_dataset for external pipelines).
|
|
"""
|
|
memory = reflective_memory or self._memory
|
|
if not memory:
|
|
logger.warning("AutoTrainer.apply_heuristic_updates: no reflective memory")
|
|
return 0
|
|
applied = 0
|
|
for s in suggestions:
|
|
if s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE:
|
|
memory.set_heuristic(s.key, s.value)
|
|
applied += 1
|
|
logger.info(
|
|
"AutoTrainer: applied heuristic",
|
|
extra={"key": s.key, "source_task_id": s.source_task_id},
|
|
)
|
|
else:
|
|
logger.info(
|
|
"AutoTrainer: suggestion not applied (use external pipeline)",
|
|
extra={"kind": s.kind.value, "key": s.key},
|
|
)
|
|
return applied
|
|
|
|
def run_auto_training(
|
|
self,
|
|
task_id: str | None = None,
|
|
evaluation: dict[str, Any] | None = None,
|
|
apply_heuristics: bool = True,
|
|
) -> list[TrainingSuggestion]:
|
|
"""
|
|
Suggest training from evaluation/lessons and optionally apply
|
|
heuristic updates. Returns all suggestions (for logging or external use).
|
|
"""
|
|
suggestions = self.suggest_training(
|
|
task_id=task_id,
|
|
evaluation=evaluation,
|
|
include_lessons=True,
|
|
)
|
|
if apply_heuristics:
|
|
self.apply_heuristic_updates(suggestions)
|
|
return suggestions
|