Files
CurrenciCombo/orchestrator/src/services/execution.ts

203 lines
5.4 KiB
TypeScript
Raw Normal View History

import { EventEmitter } from "events";
import { getPlanById, updatePlanStatus } from "../db/plans";
import { prepareDLTExecution, commitDLTExecution, abortDLTExecution } from "./dlt";
import { prepareBankInstruction, commitBankInstruction, abortBankInstruction } from "./bank";
import { registerPlan, finalizePlan } from "./notary";
import type { PlanStatusEvent } from "../types/execution";
export class ExecutionCoordinator extends EventEmitter {
private executions: Map<string, {
planId: string;
status: string;
phase: string;
startedAt: Date;
error?: string;
}> = new Map();
/**
* Execute a plan using 2PC (two-phase commit) pattern
*/
async executePlan(planId: string): Promise<{ executionId: string }> {
const executionId = `exec-${Date.now()}`;
this.executions.set(executionId, {
planId,
status: "pending",
phase: "prepare",
startedAt: new Date(),
});
this.emitStatus(executionId, {
phase: "prepare",
status: "in_progress",
timestamp: new Date().toISOString(),
});
try {
// Get plan
const plan = await getPlanById(planId);
if (!plan) {
throw new Error("Plan not found");
}
// PHASE 1: PREPARE
await this.preparePhase(executionId, plan);
// PHASE 2: EXECUTE DLT
await this.executeDLTPhase(executionId, plan);
// PHASE 3: BANK INSTRUCTION
await this.bankInstructionPhase(executionId, plan);
// PHASE 4: COMMIT
await this.commitPhase(executionId, plan);
this.emitStatus(executionId, {
phase: "complete",
status: "complete",
timestamp: new Date().toISOString(),
});
await updatePlanStatus(planId, "complete");
return { executionId };
} catch (error: any) {
// Rollback on error
await this.abortExecution(executionId, planId, error.message);
throw error;
}
}
private async preparePhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "prepare",
status: "in_progress",
timestamp: new Date().toISOString(),
});
// Prepare DLT execution
const dltPrepared = await prepareDLTExecution(plan);
if (!dltPrepared) {
throw new Error("DLT preparation failed");
}
// Prepare bank instruction (provisional)
const bankPrepared = await prepareBankInstruction(plan);
if (!bankPrepared) {
await abortDLTExecution(plan.plan_id);
throw new Error("Bank preparation failed");
}
// Register plan with notary
await registerPlan(plan);
this.emitStatus(executionId, {
phase: "prepare",
status: "complete",
timestamp: new Date().toISOString(),
});
}
private async executeDLTPhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "execute_dlt",
status: "in_progress",
timestamp: new Date().toISOString(),
});
const result = await commitDLTExecution(plan);
if (!result.success) {
await abortDLTExecution(plan.plan_id);
await abortBankInstruction(plan.plan_id);
throw new Error("DLT execution failed: " + result.error);
}
this.emitStatus(executionId, {
phase: "execute_dlt",
status: "complete",
dltTxHash: result.txHash,
timestamp: new Date().toISOString(),
});
}
private async bankInstructionPhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "bank_instruction",
status: "in_progress",
timestamp: new Date().toISOString(),
});
const result = await commitBankInstruction(plan);
if (!result.success) {
// DLT already committed, need to handle rollback
throw new Error("Bank instruction failed: " + result.error);
}
this.emitStatus(executionId, {
phase: "bank_instruction",
status: "complete",
isoMessageId: result.isoMessageId,
timestamp: new Date().toISOString(),
});
}
private async commitPhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "commit",
status: "in_progress",
timestamp: new Date().toISOString(),
});
// Finalize with notary
await finalizePlan(plan.plan_id, {
dltTxHash: "mock-tx-hash",
isoMessageId: "mock-iso-id",
});
this.emitStatus(executionId, {
phase: "commit",
status: "complete",
timestamp: new Date().toISOString(),
});
}
async abortExecution(executionId: string, planId: string, error: string) {
const execution = this.executions.get(executionId);
if (!execution) return;
try {
// Abort DLT
await abortDLTExecution(planId);
// Abort bank
await abortBankInstruction(planId);
await updatePlanStatus(planId, "aborted");
this.emitStatus(executionId, {
phase: "aborted",
status: "failed",
error,
timestamp: new Date().toISOString(),
});
} catch (abortError: any) {
console.error("Abort failed:", abortError);
}
}
async getExecutionStatus(executionId: string) {
return this.executions.get(executionId);
}
private emitStatus(executionId: string, event: PlanStatusEvent) {
this.emit("status", executionId, event);
}
onStatus(callback: (executionId: string, event: PlanStatusEvent) => void) {
this.on("status", callback);
}
}
export const executionCoordinator = new ExecutionCoordinator();