Files
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

137 lines
5.3 KiB
Python

"""Auto recommend/suggest: produce actionable recommendations from lessons and evaluations."""
from typing import Any, Protocol
from fusionagi.schemas.recommendation import Recommendation, RecommendationKind
from fusionagi._logger import logger
class ReflectiveMemoryLike(Protocol):
"""Protocol for reflective memory: get lessons and heuristics."""
def get_lessons(self, limit: int = 50) -> list[dict[str, Any]]: ...
def get_all_heuristics(self) -> dict[str, Any]: ...
class AutoRecommender:
"""
Produces actionable recommendations from reflective memory lessons and
from post-task evaluations (suggestions, error_analysis).
"""
def __init__(self, reflective_memory: ReflectiveMemoryLike | None = None) -> None:
"""
Initialize the auto-recommender.
Args:
reflective_memory: Optional reflective memory for lessons/heuristics.
"""
self._memory = reflective_memory
def recommend_from_evaluation(
self,
task_id: str,
evaluation: dict[str, Any],
) -> list[Recommendation]:
"""Turn a single evaluation (from Critic) into recommendations."""
recs: list[Recommendation] = []
suggestions = evaluation.get("suggestions", [])
error_analysis = evaluation.get("error_analysis", [])
score = evaluation.get("score", 0.5)
success = evaluation.get("success", False)
for i, s in enumerate(suggestions[:10]):
recs.append(
Recommendation(
kind=RecommendationKind.NEXT_ACTION,
title="Suggestion from evaluation",
description=s if isinstance(s, str) else str(s),
payload={"raw": s, "error_analysis": error_analysis},
source_task_id=task_id,
priority=8 if not success else 5,
)
)
if error_analysis and not recs:
recs.append(
Recommendation(
kind=RecommendationKind.STRATEGY_CHANGE,
title="Address error analysis",
description="; ".join(str(e) for e in error_analysis[:3]),
payload={"error_analysis": error_analysis},
source_task_id=task_id,
priority=7,
)
)
if score < 0.5 and not success:
recs.append(
Recommendation(
kind=RecommendationKind.TRAINING_TARGET,
title="Low score: consider training or prompt tuning",
description=f"Task {task_id} scored {score}; add as training target.",
payload={"score": score, "task_id": task_id},
source_task_id=task_id,
priority=6,
)
)
return recs
def recommend_from_lessons(self, limit_lessons: int = 20) -> list[Recommendation]:
"""Aggregate lessons from reflective memory into recommendations."""
if not self._memory:
return []
lessons = self._memory.get_lessons(limit=limit_lessons)
recs: list[Recommendation] = []
failed = [l for l in lessons if l.get("outcome") == "failed"]
if len(failed) >= 3:
recs.append(
Recommendation(
kind=RecommendationKind.STRATEGY_CHANGE,
title="Multiple failures in recent lessons",
description=f"{len(failed)} failed tasks in last {limit_lessons} lessons.",
payload={"failed_count": len(failed), "lesson_sample": failed[-3:]},
source_task_id=None,
priority=6,
)
)
for lesson in lessons[-5:]:
ev = lesson.get("evaluation", {})
suggestions = ev.get("suggestions", [])
for s in suggestions[:2]:
recs.append(
Recommendation(
kind=RecommendationKind.NEXT_ACTION,
title="From lesson",
description=s if isinstance(s, str) else str(s),
payload={"lesson": lesson},
source_task_id=lesson.get("task_id"),
priority=4,
)
)
return recs
def recommend(
self,
task_id: str | None = None,
evaluation: dict[str, Any] | None = None,
include_lessons: bool = True,
) -> list[Recommendation]:
"""
Produce all recommendations: from optional evaluation and optionally
from reflective memory lessons. Deduplicated by title/description.
"""
recs: list[Recommendation] = []
if task_id and evaluation:
recs.extend(self.recommend_from_evaluation(task_id, evaluation))
if include_lessons:
recs.extend(self.recommend_from_lessons())
seen: set[tuple[str, str]] = set()
unique: list[Recommendation] = []
for r in recs:
key = (r.title, r.description)
if key not in seen:
seen.add(key)
unique.append(r)
unique.sort(key=lambda x: (-x.priority, x.title))
logger.debug("AutoRecommender.recommend", extra={"count": len(unique), "task_id": task_id})
return unique