Files
FusionAGI/fusionagi/gpu/tensor_scoring.py

136 lines
3.9 KiB
Python
Raw Permalink Normal View History

"""GPU-accelerated hypothesis scoring for reasoning pipelines.
Provides batched scoring of hypotheses against atomic semantic units
using GPU-accelerated tensor operations. Replaces the CPU-bound
ThreadPoolExecutor-based scoring in multi_path.py.
"""
from __future__ import annotations
from fusionagi._logger import logger
from fusionagi.gpu.backend import TensorBackend, get_backend
from fusionagi.reasoning.tot import ThoughtNode
from fusionagi.schemas.atomic import AtomicSemanticUnit
def gpu_score_hypotheses(
hypotheses: list[str],
units: list[AtomicSemanticUnit],
backend: TensorBackend | None = None,
) -> list[tuple[ThoughtNode, float]]:
"""Score hypotheses against atomic units using GPU-accelerated similarity.
Replaces the CPU-based generate_and_score_parallel with batched GPU operations.
Args:
hypotheses: List of hypothesis text strings.
units: List of atomic semantic units for reference.
backend: TensorBackend to use.
Returns:
List of (ThoughtNode, score) tuples sorted by score descending.
"""
if not hypotheses:
return []
be = backend or get_backend()
import numpy as np
hyp_embeddings = be.embed_texts(hypotheses)
unit_texts = [u.content for u in units if u.content]
if not unit_texts:
nodes = []
for h in hypotheses:
node = ThoughtNode(
thought=h,
trace=[h],
unit_refs=[u.unit_id for u in units[:10]],
score=0.5,
)
nodes.append((node, 0.5))
return nodes
unit_embeddings = be.embed_texts(unit_texts)
sim_matrix = be.to_numpy(be.cosine_similarity_matrix(hyp_embeddings, unit_embeddings))
coherence_scores = np.mean(sim_matrix, axis=1)
max_sim = np.max(sim_matrix, axis=1)
consistency_scores = max_sim
combined_scores = 0.5 * coherence_scores + 0.5 * consistency_scores
combined_scores = np.clip(combined_scores, 0.0, 1.0)
results: list[tuple[ThoughtNode, float]] = []
for i, h in enumerate(hypotheses):
score = float(combined_scores[i])
node = ThoughtNode(
thought=h,
trace=[h],
unit_refs=[u.unit_id for u in units[:10]],
score=score,
metadata={"gpu_scored": True, "coherence": float(coherence_scores[i])},
)
results.append((node, score))
results.sort(key=lambda x: x[1], reverse=True)
logger.debug(
"GPU hypothesis scoring complete",
extra={
"hypotheses": len(hypotheses),
"units": len(units),
"best_score": results[0][1] if results else 0.0,
"backend": be.name,
},
)
return results
def gpu_score_claims_against_reference(
claims: list[str],
reference: str,
weights: list[float] | None = None,
backend: TensorBackend | None = None,
) -> list[float]:
"""Score a batch of claims against a single reference using GPU batch_score.
Args:
claims: List of claim texts.
reference: Reference text to score against.
weights: Optional per-dimension weights.
backend: TensorBackend to use.
Returns:
List of scores for each claim.
"""
if not claims:
return []
be = backend or get_backend()
claim_emb = be.embed_texts(claims)
ref_emb = be.embed_texts([reference])
weight_tensor = None
if weights is not None:
import numpy as np
dim = be.to_numpy(ref_emb).shape[-1]
w = np.ones(dim, dtype=np.float32)
for i, wt in enumerate(weights[:dim]):
w[i] = wt
weight_tensor = be.from_numpy(w)
import numpy as np
ref_squeezed = be.to_numpy(ref_emb)[0]
scores = be.to_numpy(
be.batch_score(claim_emb, be.from_numpy(ref_squeezed), weight_tensor)
)
scores = np.atleast_1d(scores)
return list(scores.tolist())