"""Auto recommend/suggest: produce actionable recommendations from lessons and evaluations.""" from typing import Any, Protocol from fusionagi.schemas.recommendation import Recommendation, RecommendationKind from fusionagi._logger import logger class ReflectiveMemoryLike(Protocol): """Protocol for reflective memory: get lessons and heuristics.""" def get_lessons(self, limit: int = 50) -> list[dict[str, Any]]: ... def get_all_heuristics(self) -> dict[str, Any]: ... class AutoRecommender: """ Produces actionable recommendations from reflective memory lessons and from post-task evaluations (suggestions, error_analysis). """ def __init__(self, reflective_memory: ReflectiveMemoryLike | None = None) -> None: """ Initialize the auto-recommender. Args: reflective_memory: Optional reflective memory for lessons/heuristics. """ self._memory = reflective_memory def recommend_from_evaluation( self, task_id: str, evaluation: dict[str, Any], ) -> list[Recommendation]: """Turn a single evaluation (from Critic) into recommendations.""" recs: list[Recommendation] = [] suggestions = evaluation.get("suggestions", []) error_analysis = evaluation.get("error_analysis", []) score = evaluation.get("score", 0.5) success = evaluation.get("success", False) for i, s in enumerate(suggestions[:10]): recs.append( Recommendation( kind=RecommendationKind.NEXT_ACTION, title="Suggestion from evaluation", description=s if isinstance(s, str) else str(s), payload={"raw": s, "error_analysis": error_analysis}, source_task_id=task_id, priority=8 if not success else 5, ) ) if error_analysis and not recs: recs.append( Recommendation( kind=RecommendationKind.STRATEGY_CHANGE, title="Address error analysis", description="; ".join(str(e) for e in error_analysis[:3]), payload={"error_analysis": error_analysis}, source_task_id=task_id, priority=7, ) ) if score < 0.5 and not success: recs.append( Recommendation( kind=RecommendationKind.TRAINING_TARGET, title="Low score: consider training or prompt tuning", description=f"Task {task_id} scored {score}; add as training target.", payload={"score": score, "task_id": task_id}, source_task_id=task_id, priority=6, ) ) return recs def recommend_from_lessons(self, limit_lessons: int = 20) -> list[Recommendation]: """Aggregate lessons from reflective memory into recommendations.""" if not self._memory: return [] lessons = self._memory.get_lessons(limit=limit_lessons) recs: list[Recommendation] = [] failed = [l for l in lessons if l.get("outcome") == "failed"] if len(failed) >= 3: recs.append( Recommendation( kind=RecommendationKind.STRATEGY_CHANGE, title="Multiple failures in recent lessons", description=f"{len(failed)} failed tasks in last {limit_lessons} lessons.", payload={"failed_count": len(failed), "lesson_sample": failed[-3:]}, source_task_id=None, priority=6, ) ) for lesson in lessons[-5:]: ev = lesson.get("evaluation", {}) suggestions = ev.get("suggestions", []) for s in suggestions[:2]: recs.append( Recommendation( kind=RecommendationKind.NEXT_ACTION, title="From lesson", description=s if isinstance(s, str) else str(s), payload={"lesson": lesson}, source_task_id=lesson.get("task_id"), priority=4, ) ) return recs def recommend( self, task_id: str | None = None, evaluation: dict[str, Any] | None = None, include_lessons: bool = True, ) -> list[Recommendation]: """ Produce all recommendations: from optional evaluation and optionally from reflective memory lessons. Deduplicated by title/description. """ recs: list[Recommendation] = [] if task_id and evaluation: recs.extend(self.recommend_from_evaluation(task_id, evaluation)) if include_lessons: recs.extend(self.recommend_from_lessons()) seen: set[tuple[str, str]] = set() unique: list[Recommendation] = [] for r in recs: key = (r.title, r.description) if key not in seen: seen.add(key) unique.append(r) unique.sort(key=lambda x: (-x.priority, x.title)) logger.debug("AutoRecommender.recommend", extra={"count": len(unique), "task_id": task_id}) return unique