"""Auto training: suggest and apply heuristic updates from reflection and failures.""" from typing import Any, Protocol from fusionagi.schemas.recommendation import TrainingSuggestion, TrainingSuggestionKind from fusionagi._logger import logger class ReflectiveMemoryLike(Protocol): """Protocol for reflective memory: set_heuristic, get_lessons.""" def set_heuristic(self, key: str, value: Any) -> None: ... def get_lessons(self, limit: int = 50) -> list[dict[str, Any]]: ... def get_all_heuristics(self) -> dict[str, Any]: ... class AutoTrainer: """ Suggests training actions (heuristic updates, prompt tuning, fine-tune datasets) from lessons and evaluations, and applies heuristic updates to reflective memory. """ def __init__(self, reflective_memory: ReflectiveMemoryLike | None = None) -> None: """ Initialize the auto-trainer. Args: reflective_memory: Optional reflective memory for applying heuristics. """ self._memory = reflective_memory def suggest_from_evaluation( self, task_id: str, evaluation: dict[str, Any], ) -> list[TrainingSuggestion]: """ From a single Critic evaluation, produce training suggestions (heuristic_update from suggestions, fine_tune_dataset on failure). """ suggestions: list[TrainingSuggestion] = [] ev_suggestions = evaluation.get("suggestions", []) success = evaluation.get("success", False) score = evaluation.get("score", 0.5) for i, s in enumerate(ev_suggestions[:5]): key = f"heuristic_from_task_{task_id}_{i}" suggestions.append( TrainingSuggestion( kind=TrainingSuggestionKind.HEURISTIC_UPDATE, key=key, value=s, source_task_id=task_id, reason="From Critic evaluation suggestion", ) ) if not success or score < 0.5: suggestions.append( TrainingSuggestion( kind=TrainingSuggestionKind.FINE_TUNE_DATASET, key=f"training_target_{task_id}", value={ "task_id": task_id, "outcome": "failed" if not success else "low_score", "score": score, "suggestions": ev_suggestions, }, source_task_id=task_id, reason="Task failed or low score; add to training dataset", ) ) return suggestions def suggest_from_lessons(self, limit_lessons: int = 20) -> list[TrainingSuggestion]: """ Aggregate lessons into training suggestions (e.g. strategy_param or heuristic updates from repeated patterns). """ if not self._memory: return [] lessons = self._memory.get_lessons(limit=limit_lessons) suggestions: list[TrainingSuggestion] = [] for lesson in lessons[-10:]: ev = lesson.get("evaluation", {}) tid = lesson.get("task_id", "unknown") for i, s in enumerate(ev.get("suggestions", [])[:2]): key = f"lesson_heuristic_{tid}_{i}" suggestions.append( TrainingSuggestion( kind=TrainingSuggestionKind.HEURISTIC_UPDATE, key=key, value=s, source_task_id=tid, reason="From reflective lesson", ) ) return suggestions def suggest_training( self, task_id: str | None = None, evaluation: dict[str, Any] | None = None, include_lessons: bool = True, ) -> list[TrainingSuggestion]: """ Produce all training suggestions from optional evaluation and optionally from lessons. """ out: list[TrainingSuggestion] = [] if task_id and evaluation: out.extend(self.suggest_from_evaluation(task_id, evaluation)) if include_lessons: out.extend(self.suggest_from_lessons()) logger.debug( "AutoTrainer.suggest_training", extra={"count": len(out), "task_id": task_id}, ) return out def apply_heuristic_updates( self, suggestions: list[TrainingSuggestion], reflective_memory: ReflectiveMemoryLike | None = None, ) -> int: """ Apply heuristic-update suggestions to reflective memory. Returns number of heuristics applied. Other suggestion kinds are logged but not applied (e.g. fine_tune_dataset for external pipelines). """ memory = reflective_memory or self._memory if not memory: logger.warning("AutoTrainer.apply_heuristic_updates: no reflective memory") return 0 applied = 0 for s in suggestions: if s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE: memory.set_heuristic(s.key, s.value) applied += 1 logger.info( "AutoTrainer: applied heuristic", extra={"key": s.key, "source_task_id": s.source_task_id}, ) else: logger.info( "AutoTrainer: suggestion not applied (use external pipeline)", extra={"kind": s.kind.value, "key": s.key}, ) return applied def run_auto_training( self, task_id: str | None = None, evaluation: dict[str, Any] | None = None, apply_heuristics: bool = True, ) -> list[TrainingSuggestion]: """ Suggest training from evaluation/lessons and optionally apply heuristic updates. Returns all suggestions (for logging or external use). """ suggestions = self.suggest_training( task_id=task_id, evaluation=evaluation, include_lessons=True, ) if apply_heuristics: self.apply_heuristic_updates(suggestions) return suggestions