"""Plan graph: topological sort, next step, checkpoints.""" from fusionagi.schemas.plan import Plan, PlanStep def topological_order(plan: Plan) -> list[str]: """ Return step ids in dependency order (topological sort). Steps with cycles or missing dependencies are still appended at the end so the result is a full list of step ids. """ steps_by_id = {s.id: s for s in plan.steps} in_degree = { s.id: len([d for d in s.dependencies if d in steps_by_id]) for s in plan.steps } order: list[str] = [] remaining = set(plan.step_ids()) while remaining: ready = [sid for sid in remaining if in_degree[sid] == 0] if not ready: break # cycle or missing dep for sid in ready: order.append(sid) remaining.discard(sid) for s in plan.steps: if sid in s.dependencies: in_degree[s.id] = max(0, in_degree[s.id] - 1) for sid in plan.step_ids(): if sid not in order: order.append(sid) return order def next_step(plan: Plan, completed_step_ids: set[str]) -> str | None: """Return the next step id that has all dependencies satisfied, or None.""" order = topological_order(plan) for sid in order: if sid in completed_step_ids: continue step = next((s for s in plan.steps if s.id == sid), None) if step and set(step.dependencies).issubset(completed_step_ids): return sid return None def ready_steps(plan: Plan, completed_step_ids: set[str]) -> list[str]: """ Return all step ids that have dependencies satisfied and can run in parallel. For multi-agent acceleration: steps with no mutual dependencies can be dispatched to different agents concurrently. Returns: List of step ids ready for parallel execution. """ ready: list[str] = [] steps_by_id = {s.id: s for s in plan.steps} for sid, step in steps_by_id.items(): if sid in completed_step_ids: continue if set(step.dependencies).issubset(completed_step_ids): ready.append(sid) return ready def get_step(plan: Plan, step_id: str) -> PlanStep | None: """Return the step by id or None.""" return next((s for s in plan.steps if s.id == step_id), None)