Files
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

168 lines
6.1 KiB
Python

"""Auto training: suggest and apply heuristic updates from reflection and failures."""
from typing import Any, Protocol
from fusionagi.schemas.recommendation import TrainingSuggestion, TrainingSuggestionKind
from fusionagi._logger import logger
class ReflectiveMemoryLike(Protocol):
"""Protocol for reflective memory: set_heuristic, get_lessons."""
def set_heuristic(self, key: str, value: Any) -> None: ...
def get_lessons(self, limit: int = 50) -> list[dict[str, Any]]: ...
def get_all_heuristics(self) -> dict[str, Any]: ...
class AutoTrainer:
"""
Suggests training actions (heuristic updates, prompt tuning, fine-tune datasets)
from lessons and evaluations, and applies heuristic updates to reflective memory.
"""
def __init__(self, reflective_memory: ReflectiveMemoryLike | None = None) -> None:
"""
Initialize the auto-trainer.
Args:
reflective_memory: Optional reflective memory for applying heuristics.
"""
self._memory = reflective_memory
def suggest_from_evaluation(
self,
task_id: str,
evaluation: dict[str, Any],
) -> list[TrainingSuggestion]:
"""
From a single Critic evaluation, produce training suggestions
(heuristic_update from suggestions, fine_tune_dataset on failure).
"""
suggestions: list[TrainingSuggestion] = []
ev_suggestions = evaluation.get("suggestions", [])
success = evaluation.get("success", False)
score = evaluation.get("score", 0.5)
for i, s in enumerate(ev_suggestions[:5]):
key = f"heuristic_from_task_{task_id}_{i}"
suggestions.append(
TrainingSuggestion(
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
key=key,
value=s,
source_task_id=task_id,
reason="From Critic evaluation suggestion",
)
)
if not success or score < 0.5:
suggestions.append(
TrainingSuggestion(
kind=TrainingSuggestionKind.FINE_TUNE_DATASET,
key=f"training_target_{task_id}",
value={
"task_id": task_id,
"outcome": "failed" if not success else "low_score",
"score": score,
"suggestions": ev_suggestions,
},
source_task_id=task_id,
reason="Task failed or low score; add to training dataset",
)
)
return suggestions
def suggest_from_lessons(self, limit_lessons: int = 20) -> list[TrainingSuggestion]:
"""
Aggregate lessons into training suggestions (e.g. strategy_param
or heuristic updates from repeated patterns).
"""
if not self._memory:
return []
lessons = self._memory.get_lessons(limit=limit_lessons)
suggestions: list[TrainingSuggestion] = []
for lesson in lessons[-10:]:
ev = lesson.get("evaluation", {})
tid = lesson.get("task_id", "unknown")
for i, s in enumerate(ev.get("suggestions", [])[:2]):
key = f"lesson_heuristic_{tid}_{i}"
suggestions.append(
TrainingSuggestion(
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
key=key,
value=s,
source_task_id=tid,
reason="From reflective lesson",
)
)
return suggestions
def suggest_training(
self,
task_id: str | None = None,
evaluation: dict[str, Any] | None = None,
include_lessons: bool = True,
) -> list[TrainingSuggestion]:
"""
Produce all training suggestions from optional evaluation and
optionally from lessons.
"""
out: list[TrainingSuggestion] = []
if task_id and evaluation:
out.extend(self.suggest_from_evaluation(task_id, evaluation))
if include_lessons:
out.extend(self.suggest_from_lessons())
logger.debug(
"AutoTrainer.suggest_training",
extra={"count": len(out), "task_id": task_id},
)
return out
def apply_heuristic_updates(
self,
suggestions: list[TrainingSuggestion],
reflective_memory: ReflectiveMemoryLike | None = None,
) -> int:
"""
Apply heuristic-update suggestions to reflective memory.
Returns number of heuristics applied. Other suggestion kinds are logged
but not applied (e.g. fine_tune_dataset for external pipelines).
"""
memory = reflective_memory or self._memory
if not memory:
logger.warning("AutoTrainer.apply_heuristic_updates: no reflective memory")
return 0
applied = 0
for s in suggestions:
if s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE:
memory.set_heuristic(s.key, s.value)
applied += 1
logger.info(
"AutoTrainer: applied heuristic",
extra={"key": s.key, "source_task_id": s.source_task_id},
)
else:
logger.info(
"AutoTrainer: suggestion not applied (use external pipeline)",
extra={"kind": s.kind.value, "key": s.key},
)
return applied
def run_auto_training(
self,
task_id: str | None = None,
evaluation: dict[str, Any] | None = None,
apply_heuristics: bool = True,
) -> list[TrainingSuggestion]:
"""
Suggest training from evaluation/lessons and optionally apply
heuristic updates. Returns all suggestions (for logging or external use).
"""
suggestions = self.suggest_training(
task_id=task_id,
evaluation=evaluation,
include_lessons=True,
)
if apply_heuristics:
self.apply_heuristic_updates(suggestions)
return suggestions