137 lines
5.3 KiB
Python
137 lines
5.3 KiB
Python
"""Auto recommend/suggest: produce actionable recommendations from lessons and evaluations."""
|
|
|
|
from typing import Any, Protocol
|
|
|
|
from fusionagi.schemas.recommendation import Recommendation, RecommendationKind
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class ReflectiveMemoryLike(Protocol):
|
|
"""Protocol for reflective memory: get lessons and heuristics."""
|
|
|
|
def get_lessons(self, limit: int = 50) -> list[dict[str, Any]]: ...
|
|
def get_all_heuristics(self) -> dict[str, Any]: ...
|
|
|
|
|
|
class AutoRecommender:
|
|
"""
|
|
Produces actionable recommendations from reflective memory lessons and
|
|
from post-task evaluations (suggestions, error_analysis).
|
|
"""
|
|
|
|
def __init__(self, reflective_memory: ReflectiveMemoryLike | None = None) -> None:
|
|
"""
|
|
Initialize the auto-recommender.
|
|
|
|
Args:
|
|
reflective_memory: Optional reflective memory for lessons/heuristics.
|
|
"""
|
|
self._memory = reflective_memory
|
|
|
|
def recommend_from_evaluation(
|
|
self,
|
|
task_id: str,
|
|
evaluation: dict[str, Any],
|
|
) -> list[Recommendation]:
|
|
"""Turn a single evaluation (from Critic) into recommendations."""
|
|
recs: list[Recommendation] = []
|
|
suggestions = evaluation.get("suggestions", [])
|
|
error_analysis = evaluation.get("error_analysis", [])
|
|
score = evaluation.get("score", 0.5)
|
|
success = evaluation.get("success", False)
|
|
|
|
for i, s in enumerate(suggestions[:10]):
|
|
recs.append(
|
|
Recommendation(
|
|
kind=RecommendationKind.NEXT_ACTION,
|
|
title="Suggestion from evaluation",
|
|
description=s if isinstance(s, str) else str(s),
|
|
payload={"raw": s, "error_analysis": error_analysis},
|
|
source_task_id=task_id,
|
|
priority=8 if not success else 5,
|
|
)
|
|
)
|
|
if error_analysis and not recs:
|
|
recs.append(
|
|
Recommendation(
|
|
kind=RecommendationKind.STRATEGY_CHANGE,
|
|
title="Address error analysis",
|
|
description="; ".join(str(e) for e in error_analysis[:3]),
|
|
payload={"error_analysis": error_analysis},
|
|
source_task_id=task_id,
|
|
priority=7,
|
|
)
|
|
)
|
|
if score < 0.5 and not success:
|
|
recs.append(
|
|
Recommendation(
|
|
kind=RecommendationKind.TRAINING_TARGET,
|
|
title="Low score: consider training or prompt tuning",
|
|
description=f"Task {task_id} scored {score}; add as training target.",
|
|
payload={"score": score, "task_id": task_id},
|
|
source_task_id=task_id,
|
|
priority=6,
|
|
)
|
|
)
|
|
return recs
|
|
|
|
def recommend_from_lessons(self, limit_lessons: int = 20) -> list[Recommendation]:
|
|
"""Aggregate lessons from reflective memory into recommendations."""
|
|
if not self._memory:
|
|
return []
|
|
lessons = self._memory.get_lessons(limit=limit_lessons)
|
|
recs: list[Recommendation] = []
|
|
failed = [l for l in lessons if l.get("outcome") == "failed"]
|
|
if len(failed) >= 3:
|
|
recs.append(
|
|
Recommendation(
|
|
kind=RecommendationKind.STRATEGY_CHANGE,
|
|
title="Multiple failures in recent lessons",
|
|
description=f"{len(failed)} failed tasks in last {limit_lessons} lessons.",
|
|
payload={"failed_count": len(failed), "lesson_sample": failed[-3:]},
|
|
source_task_id=None,
|
|
priority=6,
|
|
)
|
|
)
|
|
for lesson in lessons[-5:]:
|
|
ev = lesson.get("evaluation", {})
|
|
suggestions = ev.get("suggestions", [])
|
|
for s in suggestions[:2]:
|
|
recs.append(
|
|
Recommendation(
|
|
kind=RecommendationKind.NEXT_ACTION,
|
|
title="From lesson",
|
|
description=s if isinstance(s, str) else str(s),
|
|
payload={"lesson": lesson},
|
|
source_task_id=lesson.get("task_id"),
|
|
priority=4,
|
|
)
|
|
)
|
|
return recs
|
|
|
|
def recommend(
|
|
self,
|
|
task_id: str | None = None,
|
|
evaluation: dict[str, Any] | None = None,
|
|
include_lessons: bool = True,
|
|
) -> list[Recommendation]:
|
|
"""
|
|
Produce all recommendations: from optional evaluation and optionally
|
|
from reflective memory lessons. Deduplicated by title/description.
|
|
"""
|
|
recs: list[Recommendation] = []
|
|
if task_id and evaluation:
|
|
recs.extend(self.recommend_from_evaluation(task_id, evaluation))
|
|
if include_lessons:
|
|
recs.extend(self.recommend_from_lessons())
|
|
seen: set[tuple[str, str]] = set()
|
|
unique: list[Recommendation] = []
|
|
for r in recs:
|
|
key = (r.title, r.description)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique.append(r)
|
|
unique.sort(key=lambda x: (-x.priority, x.title))
|
|
logger.debug("AutoRecommender.recommend", extra={"count": len(unique), "task_id": task_id})
|
|
return unique
|