Files

106 lines
3.3 KiB
Python
Raw Permalink Normal View History

"""GPU-accelerated scoring integration for reasoning pipeline.
Provides drop-in GPU replacements for CPU scoring functions used in
multi_path.py and consensus_engine.py. Automatically falls back to
CPU when GPU is not available.
"""
from __future__ import annotations
from typing import Callable
from fusionagi._logger import logger
from fusionagi.reasoning.tot import ThoughtNode
from fusionagi.schemas.atomic import AtomicSemanticUnit, AtomicUnitType
def generate_and_score_gpu(
hypotheses: list[str],
units: list[AtomicSemanticUnit],
score_fn: Callable[[ThoughtNode, list[AtomicSemanticUnit]], float] | None = None,
) -> list[tuple[ThoughtNode, float]]:
"""GPU-accelerated hypothesis scoring, drop-in for generate_and_score_parallel.
Uses GPU tensor operations for batched scoring when available,
falling back to the original CPU implementation.
Args:
hypotheses: List of hypothesis texts.
units: Atomic semantic units for context.
score_fn: Optional custom scoring function (overrides GPU scoring).
Returns:
List of (ThoughtNode, score) tuples sorted by score descending.
"""
if score_fn is not None:
from fusionagi.reasoning.multi_path import generate_and_score_parallel
return generate_and_score_parallel(hypotheses, units, score_fn)
try:
from fusionagi.gpu.tensor_scoring import gpu_score_hypotheses
results = gpu_score_hypotheses(hypotheses, units)
logger.debug(
"GPU scoring used for hypotheses",
extra={"count": len(hypotheses), "backend": "gpu"},
)
return results
except ImportError:
from fusionagi.reasoning.multi_path import generate_and_score_parallel
logger.debug("GPU not available, using CPU scoring")
return generate_and_score_parallel(hypotheses, units)
def score_claims_gpu(
claims: list[str],
reference: str,
) -> list[float]:
"""Score claims against a reference using GPU when available.
Args:
claims: List of claim texts.
reference: Reference text.
Returns:
List of scores for each claim.
"""
try:
from fusionagi.gpu.tensor_scoring import gpu_score_claims_against_reference
return gpu_score_claims_against_reference(claims, reference)
except ImportError:
from fusionagi.reasoning.multi_path import _score_consistency
scores: list[float] = []
for claim in claims:
node = ThoughtNode(thought=claim, trace=[claim])
unit = AtomicSemanticUnit(
unit_id="ref", content=reference, type=AtomicUnitType.FACT, confidence=1.0
)
scores.append(_score_consistency(node, [unit]))
return scores
def deduplicate_claims_gpu(
claims: list[str],
threshold: float = 0.85,
) -> list[list[int]]:
"""GPU-accelerated claim deduplication.
Args:
claims: List of claim texts.
threshold: Similarity threshold for grouping.
Returns:
List of groups (each group is a list of indices).
"""
try:
from fusionagi.gpu.tensor_similarity import deduplicate_claims
return deduplicate_claims(claims, threshold)
except ImportError:
groups: list[list[int]] = [[i] for i in range(len(claims))]
return groups