956 lines
27 KiB
Markdown
956 lines
27 KiB
Markdown
|
|
# Ledger Backfill Strategy
|
||
|
|
|
||
|
|
**Version**: 1.0.0
|
||
|
|
**Last Updated**: 2025-01-20
|
||
|
|
**Status**: Active Documentation
|
||
|
|
|
||
|
|
## Overview
|
||
|
|
|
||
|
|
This document outlines the strategy for backfilling historical ledger data into the DBIS Core Banking System ledger. The backfill process ensures data integrity, maintains idempotency, and supports resumable operations.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Backfill Scenarios
|
||
|
|
|
||
|
|
### Scenario 1: Initial System Setup (Empty Ledger)
|
||
|
|
|
||
|
|
**Use Case**: Setting up a new DBIS instance with empty ledger, populating from external source (e.g., legacy system, CSV export, external API).
|
||
|
|
|
||
|
|
**Approach**:
|
||
|
|
1. Validate source data integrity
|
||
|
|
2. Transform source data to DBIS ledger format
|
||
|
|
3. Batch insert with idempotency checks
|
||
|
|
4. Verify balance consistency
|
||
|
|
5. Apply constraints after backfill
|
||
|
|
|
||
|
|
### Scenario 2: Schema Migration (Existing Ledger Data)
|
||
|
|
|
||
|
|
**Use Case**: Migrating existing ledger data to new schema (e.g., adding new fields, restructuring).
|
||
|
|
|
||
|
|
**Approach**:
|
||
|
|
1. Audit existing data
|
||
|
|
2. Transform to new schema format
|
||
|
|
3. Migrate in batches
|
||
|
|
4. Verify data integrity
|
||
|
|
5. Update schema constraints
|
||
|
|
|
||
|
|
### Scenario 3: Data Reconciliation (Fix Inconsistencies)
|
||
|
|
|
||
|
|
**Use Case**: Fixing inconsistent balances or missing entries discovered during audit.
|
||
|
|
|
||
|
|
**Approach**:
|
||
|
|
1. Identify inconsistencies
|
||
|
|
2. Generate correction entries
|
||
|
|
3. Apply corrections via normal posting function
|
||
|
|
4. Verify balance consistency
|
||
|
|
5. Document corrections in audit log
|
||
|
|
|
||
|
|
### Scenario 4: Dual-Ledger Sync (SCB Ledger Backfill)
|
||
|
|
|
||
|
|
**Use Case**: Backfilling historical entries from SCB (Sovereign Central Bank) ledger to DBIS.
|
||
|
|
|
||
|
|
**Approach**:
|
||
|
|
1. Extract entries from SCB ledger
|
||
|
|
2. Transform to DBIS format
|
||
|
|
3. Post to DBIS via outbox pattern
|
||
|
|
4. Track sync status
|
||
|
|
5. Verify dual-ledger consistency
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Backfill Architecture
|
||
|
|
|
||
|
|
### Component Overview
|
||
|
|
|
||
|
|
```
|
||
|
|
┌─────────────────────────────────────────────────────────────┐
|
||
|
|
│ Backfill Architecture │
|
||
|
|
├─────────────────────────────────────────────────────────────┤
|
||
|
|
│ │
|
||
|
|
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │
|
||
|
|
│ │ Source │────▶│ Transform │────▶│ Validate │ │
|
||
|
|
│ │ Reader │ │ Service │ │ Service │ │
|
||
|
|
│ └─────────────┘ └──────────────┘ └─────────────┘ │
|
||
|
|
│ │
|
||
|
|
│ │ │
|
||
|
|
│ ▼ │
|
||
|
|
│ ┌──────────────┐ │
|
||
|
|
│ │ Batch │ │
|
||
|
|
│ │ Processor │ │
|
||
|
|
│ └──────────────┘ │
|
||
|
|
│ │ │
|
||
|
|
│ ┌───────────┴───────────┐ │
|
||
|
|
│ ▼ ▼ │
|
||
|
|
│ ┌──────────────┐ ┌──────────────┐ │
|
||
|
|
│ │ Ledger │ │ Checkpoint │ │
|
||
|
|
│ │ Posting │ │ Service │ │
|
||
|
|
│ │ Module │ └──────────────┘ │
|
||
|
|
│ └──────────────┘ │
|
||
|
|
│ │ │
|
||
|
|
│ ▼ │
|
||
|
|
│ ┌──────────────┐ │
|
||
|
|
│ │ Audit & │ │
|
||
|
|
│ │ Verification │ │
|
||
|
|
│ └──────────────┘ │
|
||
|
|
│ │
|
||
|
|
└─────────────────────────────────────────────────────────────┘
|
||
|
|
```
|
||
|
|
|
||
|
|
### Key Components
|
||
|
|
|
||
|
|
1. **Source Reader**: Reads data from source (CSV, API, database, etc.)
|
||
|
|
2. **Transform Service**: Transforms source data to DBIS ledger format
|
||
|
|
3. **Validate Service**: Validates entries before posting
|
||
|
|
4. **Batch Processor**: Processes entries in batches with checkpointing
|
||
|
|
5. **Ledger Posting Module**: Uses atomic posting function for entries
|
||
|
|
6. **Checkpoint Service**: Tracks progress for resumable backfill
|
||
|
|
7. **Audit & Verification**: Validates backfill results
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Backfill Process
|
||
|
|
|
||
|
|
### Step 1: Pre-Backfill Preparation
|
||
|
|
|
||
|
|
#### 1.1 Audit Existing Data
|
||
|
|
|
||
|
|
Before starting backfill, audit existing data:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Check for existing ledger entries
|
||
|
|
SELECT COUNT(*), MIN(timestamp_utc), MAX(timestamp_utc)
|
||
|
|
FROM ledger_entries;
|
||
|
|
|
||
|
|
-- Check for inconsistent balances
|
||
|
|
SELECT id, balance, available_balance, reserved_balance
|
||
|
|
FROM bank_accounts
|
||
|
|
WHERE available_balance < 0
|
||
|
|
OR reserved_balance < 0
|
||
|
|
OR available_balance > balance
|
||
|
|
OR (available_balance + reserved_balance) > balance;
|
||
|
|
|
||
|
|
-- Check for duplicate reference IDs
|
||
|
|
SELECT ledger_id, reference_id, COUNT(*)
|
||
|
|
FROM ledger_entries
|
||
|
|
GROUP BY ledger_id, reference_id
|
||
|
|
HAVING COUNT(*) > 1;
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 1.2 Verify Schema
|
||
|
|
|
||
|
|
Ensure all required migrations are applied:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Verify idempotency constraint exists
|
||
|
|
SELECT constraint_name
|
||
|
|
FROM information_schema.table_constraints
|
||
|
|
WHERE table_name = 'ledger_entries'
|
||
|
|
AND constraint_name LIKE '%reference%';
|
||
|
|
|
||
|
|
-- Verify outbox table exists
|
||
|
|
SELECT COUNT(*) FROM dual_ledger_outbox;
|
||
|
|
|
||
|
|
-- Verify posting function exists
|
||
|
|
SELECT routine_name
|
||
|
|
FROM information_schema.routines
|
||
|
|
WHERE routine_name = 'post_ledger_entry';
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 1.3 Prepare Source Data
|
||
|
|
|
||
|
|
- **CSV Export**: Ensure format matches expected schema
|
||
|
|
- **API Extraction**: Configure API endpoints and authentication
|
||
|
|
- **Database Extraction**: Set up connection and query filters
|
||
|
|
- **Legacy System**: Configure export format and mapping
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Step 2: Data Transformation
|
||
|
|
|
||
|
|
#### 2.1 Source Data Format
|
||
|
|
|
||
|
|
Source data should be transformed to this format:
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
interface LedgerEntrySource {
|
||
|
|
ledgerId: string; // e.g., "MASTER", "SOVEREIGN"
|
||
|
|
debitAccountId: string; // Account ID
|
||
|
|
creditAccountId: string; // Account ID
|
||
|
|
amount: string; // Decimal as string (e.g., "1000.00")
|
||
|
|
currencyCode: string; // ISO 4217 (e.g., "USD")
|
||
|
|
assetType: string; // "fiat", "cbdc", "commodity", "security"
|
||
|
|
transactionType: string; // Transaction type classification
|
||
|
|
referenceId: string; // Unique reference ID (required for idempotency)
|
||
|
|
timestampUtc?: string; // ISO 8601 timestamp
|
||
|
|
fxRate?: string; // FX rate if applicable
|
||
|
|
metadata?: Record<string, unknown>; // Additional metadata
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 2.2 Transformation Rules
|
||
|
|
|
||
|
|
1. **Account ID Mapping**: Map source account identifiers to DBIS account IDs
|
||
|
|
2. **Amount Normalization**: Convert amounts to standard format (decimal string)
|
||
|
|
3. **Currency Validation**: Validate currency codes against ISO 4217
|
||
|
|
4. **Timestamp Normalization**: Convert timestamps to UTC ISO 8601 format
|
||
|
|
5. **Reference ID Generation**: Generate unique reference IDs if not present
|
||
|
|
6. **Metadata Extraction**: Extract relevant metadata from source
|
||
|
|
|
||
|
|
#### 2.3 Example Transformation Script
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
// Example: Transform CSV data
|
||
|
|
function transformCSVToLedgerEntry(csvRow: CSVRow): LedgerEntrySource {
|
||
|
|
return {
|
||
|
|
ledgerId: csvRow.ledger || 'MASTER',
|
||
|
|
debitAccountId: mapAccountId(csvRow.debit_account),
|
||
|
|
creditAccountId: mapAccountId(csvRow.credit_account),
|
||
|
|
amount: normalizeAmount(csvRow.amount),
|
||
|
|
currencyCode: csvRow.currency || 'USD',
|
||
|
|
assetType: csvRow.asset_type || 'fiat',
|
||
|
|
transactionType: mapTransactionType(csvRow.txn_type),
|
||
|
|
referenceId: csvRow.reference_id || generateReferenceId(csvRow),
|
||
|
|
timestampUtc: csvRow.timestamp || new Date().toISOString(),
|
||
|
|
fxRate: csvRow.fx_rate || undefined,
|
||
|
|
metadata: extractMetadata(csvRow),
|
||
|
|
};
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Step 3: Batch Processing
|
||
|
|
|
||
|
|
#### 3.1 Batch Configuration
|
||
|
|
|
||
|
|
Configure batch processing parameters:
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
interface BackfillConfig {
|
||
|
|
batchSize: number; // Entries per batch (default: 1000)
|
||
|
|
checkpointInterval: number; // Checkpoint every N batches (default: 10)
|
||
|
|
maxRetries: number; // Max retries per batch (default: 3)
|
||
|
|
retryDelay: number; // Initial retry delay in ms (default: 1000)
|
||
|
|
parallelWorkers: number; // Number of parallel workers (default: 1)
|
||
|
|
skipDuplicates: boolean; // Skip entries with duplicate reference IDs (default: true)
|
||
|
|
validateBalances: boolean; // Validate balances after each batch (default: true)
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 3.2 Checkpointing Strategy
|
||
|
|
|
||
|
|
Use checkpointing to enable resumable backfill:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Create checkpoint table for ledger backfill
|
||
|
|
CREATE TABLE IF NOT EXISTS ledger_backfill_checkpoints (
|
||
|
|
id SERIAL PRIMARY KEY,
|
||
|
|
source_id VARCHAR(255) NOT NULL,
|
||
|
|
source_type VARCHAR(50) NOT NULL, -- 'CSV', 'API', 'DATABASE', 'SCB'
|
||
|
|
last_processed_id VARCHAR(255),
|
||
|
|
last_processed_timestamp TIMESTAMP,
|
||
|
|
total_processed BIGINT DEFAULT 0,
|
||
|
|
total_successful BIGINT DEFAULT 0,
|
||
|
|
total_failed BIGINT DEFAULT 0,
|
||
|
|
status VARCHAR(50) DEFAULT 'IN_PROGRESS', -- 'IN_PROGRESS', 'COMPLETED', 'FAILED', 'PAUSED'
|
||
|
|
started_at TIMESTAMP DEFAULT NOW(),
|
||
|
|
last_checkpoint_at TIMESTAMP DEFAULT NOW(),
|
||
|
|
completed_at TIMESTAMP,
|
||
|
|
error_message TEXT,
|
||
|
|
metadata JSONB,
|
||
|
|
UNIQUE(source_id, source_type)
|
||
|
|
);
|
||
|
|
|
||
|
|
CREATE INDEX idx_backfill_checkpoints_status
|
||
|
|
ON ledger_backfill_checkpoints(status);
|
||
|
|
|
||
|
|
CREATE INDEX idx_backfill_checkpoints_source
|
||
|
|
ON ledger_backfill_checkpoints(source_id, source_type);
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 3.3 Batch Processing Loop
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
async function processBackfill(
|
||
|
|
source: DataSource,
|
||
|
|
config: BackfillConfig
|
||
|
|
): Promise<BackfillResult> {
|
||
|
|
const checkpoint = await loadCheckpoint(source.id, source.type);
|
||
|
|
let processed = 0;
|
||
|
|
let successful = 0;
|
||
|
|
let failed = 0;
|
||
|
|
let lastProcessedId: string | null = null;
|
||
|
|
let lastProcessedTimestamp: Date | null = null;
|
||
|
|
|
||
|
|
while (true) {
|
||
|
|
// Load batch from source (starting from checkpoint)
|
||
|
|
const batch = await source.readBatch({
|
||
|
|
startId: checkpoint?.lastProcessedId,
|
||
|
|
startTimestamp: checkpoint?.lastProcessedTimestamp,
|
||
|
|
limit: config.batchSize,
|
||
|
|
});
|
||
|
|
|
||
|
|
if (batch.length === 0) {
|
||
|
|
break; // No more data
|
||
|
|
}
|
||
|
|
|
||
|
|
// Process batch
|
||
|
|
const results = await processBatch(batch, config);
|
||
|
|
|
||
|
|
// Update counters
|
||
|
|
processed += batch.length;
|
||
|
|
successful += results.successful;
|
||
|
|
failed += results.failed;
|
||
|
|
|
||
|
|
// Update checkpoint
|
||
|
|
lastProcessedId = batch[batch.length - 1].id;
|
||
|
|
lastProcessedTimestamp = batch[batch.length - 1].timestamp;
|
||
|
|
|
||
|
|
await saveCheckpoint({
|
||
|
|
sourceId: source.id,
|
||
|
|
sourceType: source.type,
|
||
|
|
lastProcessedId,
|
||
|
|
lastProcessedTimestamp,
|
||
|
|
totalProcessed: processed,
|
||
|
|
totalSuccessful: successful,
|
||
|
|
totalFailed: failed,
|
||
|
|
status: 'IN_PROGRESS',
|
||
|
|
});
|
||
|
|
|
||
|
|
// Validate balances if configured
|
||
|
|
if (config.validateBalances && processed % (config.checkpointInterval * config.batchSize) === 0) {
|
||
|
|
await validateBalances();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Mark as completed
|
||
|
|
await saveCheckpoint({
|
||
|
|
sourceId: source.id,
|
||
|
|
sourceType: source.type,
|
||
|
|
status: 'COMPLETED',
|
||
|
|
completedAt: new Date(),
|
||
|
|
});
|
||
|
|
|
||
|
|
return {
|
||
|
|
totalProcessed: processed,
|
||
|
|
totalSuccessful: successful,
|
||
|
|
totalFailed: failed,
|
||
|
|
};
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Step 4: Entry Posting
|
||
|
|
|
||
|
|
#### 4.1 Use Atomic Posting Function
|
||
|
|
|
||
|
|
Always use the atomic posting function for backfill entries:
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
async function postBackfillEntry(entry: LedgerEntrySource): Promise<void> {
|
||
|
|
try {
|
||
|
|
// Use atomic posting function via SQL
|
||
|
|
const result = await prisma.$executeRaw`
|
||
|
|
SELECT post_ledger_entry(
|
||
|
|
${entry.ledgerId}::TEXT,
|
||
|
|
${entry.debitAccountId}::TEXT,
|
||
|
|
${entry.creditAccountId}::TEXT,
|
||
|
|
${entry.amount}::NUMERIC,
|
||
|
|
${entry.currencyCode}::TEXT,
|
||
|
|
${entry.assetType}::TEXT,
|
||
|
|
${entry.transactionType}::TEXT,
|
||
|
|
${entry.referenceId}::TEXT,
|
||
|
|
${entry.fxRate || null}::NUMERIC,
|
||
|
|
${entry.metadata ? JSON.stringify(entry.metadata) : null}::JSONB
|
||
|
|
)
|
||
|
|
`;
|
||
|
|
|
||
|
|
// Verify result
|
||
|
|
if (!result) {
|
||
|
|
throw new Error('Failed to post ledger entry');
|
||
|
|
}
|
||
|
|
} catch (error) {
|
||
|
|
// Handle idempotency violation (duplicate reference ID)
|
||
|
|
if (error.code === '23505' || error.message?.includes('duplicate')) {
|
||
|
|
// Skip duplicate entries if configured
|
||
|
|
if (config.skipDuplicates) {
|
||
|
|
return; // Entry already exists, skip
|
||
|
|
}
|
||
|
|
throw new Error(`Duplicate reference ID: ${entry.referenceId}`);
|
||
|
|
}
|
||
|
|
|
||
|
|
throw error;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 4.2 Batch Posting
|
||
|
|
|
||
|
|
Post entries in batches for efficiency:
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
async function processBatch(
|
||
|
|
entries: LedgerEntrySource[],
|
||
|
|
config: BackfillConfig
|
||
|
|
): Promise<{ successful: number; failed: number }> {
|
||
|
|
let successful = 0;
|
||
|
|
let failed = 0;
|
||
|
|
|
||
|
|
// Process in parallel if configured
|
||
|
|
if (config.parallelWorkers > 1) {
|
||
|
|
const chunks = chunkArray(entries, config.parallelWorkers);
|
||
|
|
const results = await Promise.allSettled(
|
||
|
|
chunks.map((chunk) => processChunk(chunk, config))
|
||
|
|
);
|
||
|
|
|
||
|
|
for (const result of results) {
|
||
|
|
if (result.status === 'fulfilled') {
|
||
|
|
successful += result.value.successful;
|
||
|
|
failed += result.value.failed;
|
||
|
|
} else {
|
||
|
|
failed += entries.length;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
// Sequential processing
|
||
|
|
for (const entry of entries) {
|
||
|
|
try {
|
||
|
|
await postBackfillEntry(entry);
|
||
|
|
successful++;
|
||
|
|
} catch (error) {
|
||
|
|
failed++;
|
||
|
|
logError(entry, error);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return { successful, failed };
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Step 5: Balance Constraints Application
|
||
|
|
|
||
|
|
#### 5.1 Pre-Constraint Validation
|
||
|
|
|
||
|
|
Before applying balance constraints, validate all balances:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Validate all balances are consistent
|
||
|
|
DO $$
|
||
|
|
DECLARE
|
||
|
|
inconsistent_count INTEGER;
|
||
|
|
BEGIN
|
||
|
|
SELECT COUNT(*) INTO inconsistent_count
|
||
|
|
FROM bank_accounts
|
||
|
|
WHERE available_balance < 0
|
||
|
|
OR reserved_balance < 0
|
||
|
|
OR available_balance > balance
|
||
|
|
OR (available_balance + reserved_balance) > balance;
|
||
|
|
|
||
|
|
IF inconsistent_count > 0 THEN
|
||
|
|
RAISE EXCEPTION 'Found % inconsistent balances. Fix before applying constraints.', inconsistent_count;
|
||
|
|
END IF;
|
||
|
|
END $$;
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 5.2 Apply Constraints
|
||
|
|
|
||
|
|
After backfill completes and balances are validated, apply constraints:
|
||
|
|
|
||
|
|
```bash
|
||
|
|
# Apply balance constraints migration
|
||
|
|
psql $DATABASE_URL -f db/migrations/004_balance_constraints.sql
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 5.3 Post-Constraint Verification
|
||
|
|
|
||
|
|
Verify constraints are applied correctly:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Check constraint exists
|
||
|
|
SELECT constraint_name, constraint_type
|
||
|
|
FROM information_schema.table_constraints
|
||
|
|
WHERE table_name = 'bank_accounts'
|
||
|
|
AND constraint_name LIKE '%balance%';
|
||
|
|
|
||
|
|
-- Verify constraint is enforced
|
||
|
|
-- This should fail if constraints are working
|
||
|
|
UPDATE bank_accounts
|
||
|
|
SET available_balance = -1
|
||
|
|
WHERE id = (SELECT id FROM bank_accounts LIMIT 1);
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Step 6: Verification and Reconciliation
|
||
|
|
|
||
|
|
#### 6.1 Entry Verification
|
||
|
|
|
||
|
|
Verify all entries were posted correctly:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Compare source count vs. posted count
|
||
|
|
SELECT
|
||
|
|
COUNT(*) as total_entries,
|
||
|
|
COUNT(DISTINCT reference_id) as unique_references,
|
||
|
|
COUNT(DISTINCT ledger_id) as unique_ledgers,
|
||
|
|
MIN(timestamp_utc) as earliest_entry,
|
||
|
|
MAX(timestamp_utc) as latest_entry
|
||
|
|
FROM ledger_entries
|
||
|
|
WHERE reference_id LIKE 'BACKFILL-%'; -- If using prefix for backfill entries
|
||
|
|
|
||
|
|
-- Check for missing entries
|
||
|
|
SELECT source_id, reference_id
|
||
|
|
FROM backfill_source_data
|
||
|
|
WHERE NOT EXISTS (
|
||
|
|
SELECT 1 FROM ledger_entries
|
||
|
|
WHERE reference_id = backfill_source_data.reference_id
|
||
|
|
);
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 6.2 Balance Reconciliation
|
||
|
|
|
||
|
|
Reconcile balances after backfill:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Compare expected vs. actual balances
|
||
|
|
SELECT
|
||
|
|
account_id,
|
||
|
|
expected_balance,
|
||
|
|
actual_balance,
|
||
|
|
(expected_balance - actual_balance) as difference
|
||
|
|
FROM (
|
||
|
|
SELECT
|
||
|
|
account_id,
|
||
|
|
SUM(CASE WHEN side = 'debit' THEN amount ELSE -amount END) as expected_balance,
|
||
|
|
(SELECT balance FROM bank_accounts WHERE id = account_id) as actual_balance
|
||
|
|
FROM ledger_entries
|
||
|
|
WHERE account_id IN (SELECT id FROM bank_accounts)
|
||
|
|
GROUP BY account_id
|
||
|
|
) reconciliation
|
||
|
|
WHERE ABS(expected_balance - actual_balance) > 0.01; -- Allow small rounding differences
|
||
|
|
```
|
||
|
|
|
||
|
|
#### 6.3 Dual-Ledger Reconciliation
|
||
|
|
|
||
|
|
If backfilling from SCB ledger, reconcile dual-ledger consistency:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Check outbox sync status
|
||
|
|
SELECT
|
||
|
|
status,
|
||
|
|
COUNT(*) as count,
|
||
|
|
MIN(created_at) as oldest,
|
||
|
|
MAX(created_at) as newest
|
||
|
|
FROM dual_ledger_outbox
|
||
|
|
WHERE created_at >= (SELECT MIN(timestamp_utc) FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%')
|
||
|
|
GROUP BY status;
|
||
|
|
|
||
|
|
-- Verify all entries have corresponding outbox records (for SCB sync)
|
||
|
|
SELECT le.id, le.reference_id
|
||
|
|
FROM ledger_entries le
|
||
|
|
WHERE le.reference_id LIKE 'BACKFILL-%'
|
||
|
|
AND NOT EXISTS (
|
||
|
|
SELECT 1 FROM dual_ledger_outbox dlo
|
||
|
|
WHERE dlo.reference_id = le.reference_id
|
||
|
|
);
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Implementation Scripts
|
||
|
|
|
||
|
|
### TypeScript Backfill Script
|
||
|
|
|
||
|
|
**File**: `dbis_core/scripts/backfill-ledger.ts`
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
#!/usr/bin/env ts-node
|
||
|
|
import { PrismaClient } from '@prisma/client';
|
||
|
|
import { readFileSync } from 'fs';
|
||
|
|
import { parse } from 'csv-parse/sync';
|
||
|
|
|
||
|
|
const prisma = new PrismaClient();
|
||
|
|
|
||
|
|
interface BackfillConfig {
|
||
|
|
sourceFile: string;
|
||
|
|
ledgerId: string;
|
||
|
|
batchSize: number;
|
||
|
|
skipDuplicates: boolean;
|
||
|
|
}
|
||
|
|
|
||
|
|
async function backfillFromCSV(config: BackfillConfig) {
|
||
|
|
// Read and parse CSV
|
||
|
|
const csvData = readFileSync(config.sourceFile, 'utf-8');
|
||
|
|
const records = parse(csvData, {
|
||
|
|
columns: true,
|
||
|
|
skip_empty_lines: true,
|
||
|
|
});
|
||
|
|
|
||
|
|
let processed = 0;
|
||
|
|
let successful = 0;
|
||
|
|
let failed = 0;
|
||
|
|
|
||
|
|
// Process in batches
|
||
|
|
for (let i = 0; i < records.length; i += config.batchSize) {
|
||
|
|
const batch = records.slice(i, i + config.batchSize);
|
||
|
|
|
||
|
|
for (const record of batch) {
|
||
|
|
try {
|
||
|
|
// Transform and post entry
|
||
|
|
await prisma.$executeRaw`
|
||
|
|
SELECT post_ledger_entry(
|
||
|
|
${config.ledgerId}::TEXT,
|
||
|
|
${record.debitAccountId}::TEXT,
|
||
|
|
${record.creditAccountId}::TEXT,
|
||
|
|
${record.amount}::NUMERIC,
|
||
|
|
${record.currencyCode}::TEXT,
|
||
|
|
${record.assetType || 'fiat'}::TEXT,
|
||
|
|
${record.transactionType}::TEXT,
|
||
|
|
${record.referenceId}::TEXT,
|
||
|
|
${record.fxRate || null}::NUMERIC,
|
||
|
|
${record.metadata ? JSON.stringify(JSON.parse(record.metadata)) : null}::JSONB
|
||
|
|
)
|
||
|
|
`;
|
||
|
|
successful++;
|
||
|
|
} catch (error) {
|
||
|
|
if (config.skipDuplicates && error.code === '23505') {
|
||
|
|
// Skip duplicates
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
failed++;
|
||
|
|
console.error(`Failed to post entry ${record.referenceId}:`, error.message);
|
||
|
|
}
|
||
|
|
processed++;
|
||
|
|
}
|
||
|
|
|
||
|
|
console.log(`Processed ${processed}/${records.length} entries (${successful} successful, ${failed} failed)`);
|
||
|
|
}
|
||
|
|
|
||
|
|
return { processed, successful, failed };
|
||
|
|
}
|
||
|
|
|
||
|
|
// CLI entry point
|
||
|
|
const config: BackfillConfig = {
|
||
|
|
sourceFile: process.env.BACKFILL_SOURCE_FILE || 'backfill.csv',
|
||
|
|
ledgerId: process.env.LEDGER_ID || 'MASTER',
|
||
|
|
batchSize: parseInt(process.env.BATCH_SIZE || '1000', 10),
|
||
|
|
skipDuplicates: process.env.SKIP_DUPLICATES === 'true',
|
||
|
|
};
|
||
|
|
|
||
|
|
backfillFromCSV(config)
|
||
|
|
.then((result) => {
|
||
|
|
console.log('Backfill completed:', result);
|
||
|
|
process.exit(0);
|
||
|
|
})
|
||
|
|
.catch((error) => {
|
||
|
|
console.error('Backfill failed:', error);
|
||
|
|
process.exit(1);
|
||
|
|
})
|
||
|
|
.finally(() => {
|
||
|
|
prisma.$disconnect();
|
||
|
|
});
|
||
|
|
```
|
||
|
|
|
||
|
|
### SQL Backfill Script
|
||
|
|
|
||
|
|
**File**: `dbis_core/scripts/backfill-ledger.sql`
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Ledger Backfill Script
|
||
|
|
-- Use this for direct SQL-based backfill from another database
|
||
|
|
|
||
|
|
-- Example: Backfill from external ledger_entries_legacy table
|
||
|
|
DO $$
|
||
|
|
DECLARE
|
||
|
|
batch_size INTEGER := 1000;
|
||
|
|
processed INTEGER := 0;
|
||
|
|
successful INTEGER := 0;
|
||
|
|
failed INTEGER := 0;
|
||
|
|
entry RECORD;
|
||
|
|
BEGIN
|
||
|
|
-- Create temporary table for batch processing
|
||
|
|
CREATE TEMP TABLE backfill_batch AS
|
||
|
|
SELECT * FROM ledger_entries_legacy
|
||
|
|
ORDER BY id
|
||
|
|
LIMIT 0;
|
||
|
|
|
||
|
|
-- Process in batches
|
||
|
|
FOR entry IN
|
||
|
|
SELECT * FROM ledger_entries_legacy
|
||
|
|
ORDER BY id
|
||
|
|
LOOP
|
||
|
|
BEGIN
|
||
|
|
-- Post entry using atomic function
|
||
|
|
PERFORM post_ledger_entry(
|
||
|
|
entry.ledger_id::TEXT,
|
||
|
|
entry.debit_account_id::TEXT,
|
||
|
|
entry.credit_account_id::TEXT,
|
||
|
|
entry.amount::NUMERIC,
|
||
|
|
entry.currency_code::TEXT,
|
||
|
|
entry.asset_type::TEXT,
|
||
|
|
entry.transaction_type::TEXT,
|
||
|
|
entry.reference_id::TEXT,
|
||
|
|
entry.fx_rate::NUMERIC,
|
||
|
|
entry.metadata::JSONB
|
||
|
|
);
|
||
|
|
|
||
|
|
successful := successful + 1;
|
||
|
|
EXCEPTION
|
||
|
|
WHEN unique_violation THEN
|
||
|
|
-- Duplicate reference ID, skip if configured
|
||
|
|
failed := failed + 1;
|
||
|
|
RAISE NOTICE 'Skipping duplicate reference ID: %', entry.reference_id;
|
||
|
|
WHEN OTHERS THEN
|
||
|
|
failed := failed + 1;
|
||
|
|
RAISE NOTICE 'Error processing entry %: %', entry.reference_id, SQLERRM;
|
||
|
|
END;
|
||
|
|
|
||
|
|
processed := processed + 1;
|
||
|
|
|
||
|
|
-- Checkpoint every batch_size entries
|
||
|
|
IF processed % batch_size = 0 THEN
|
||
|
|
RAISE NOTICE 'Processed % entries (% successful, % failed)', processed, successful, failed;
|
||
|
|
END IF;
|
||
|
|
END LOOP;
|
||
|
|
|
||
|
|
RAISE NOTICE 'Backfill completed: % total, % successful, % failed', processed, successful, failed;
|
||
|
|
END $$;
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Best Practices
|
||
|
|
|
||
|
|
### 1. Idempotency
|
||
|
|
|
||
|
|
- Always use unique `reference_id` for each entry
|
||
|
|
- Use atomic posting function that enforces idempotency
|
||
|
|
- Skip duplicates during backfill if they already exist
|
||
|
|
|
||
|
|
### 2. Checkpointing
|
||
|
|
|
||
|
|
- Save checkpoint after each batch
|
||
|
|
- Enable resumable backfill from last checkpoint
|
||
|
|
- Track progress with metrics (processed, successful, failed)
|
||
|
|
|
||
|
|
### 3. Validation
|
||
|
|
|
||
|
|
- Validate source data before transformation
|
||
|
|
- Validate transformed entries before posting
|
||
|
|
- Verify balances after backfill completion
|
||
|
|
- Reconcile with source system if possible
|
||
|
|
|
||
|
|
### 4. Error Handling
|
||
|
|
|
||
|
|
- Log all errors with full context
|
||
|
|
- Retry transient errors with exponential backoff
|
||
|
|
- Skip permanent errors (e.g., duplicate reference IDs)
|
||
|
|
- Generate error report after completion
|
||
|
|
|
||
|
|
### 5. Performance
|
||
|
|
|
||
|
|
- Process in batches (1000-10000 entries per batch)
|
||
|
|
- Use parallel processing for large backfills
|
||
|
|
- Monitor database performance during backfill
|
||
|
|
- Schedule during low-traffic periods
|
||
|
|
|
||
|
|
### 6. Testing
|
||
|
|
|
||
|
|
- Test backfill process on staging environment first
|
||
|
|
- Use small test dataset to verify transformation
|
||
|
|
- Verify balances match expected values
|
||
|
|
- Test rollback procedures if needed
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Monitoring and Metrics
|
||
|
|
|
||
|
|
### Key Metrics to Track
|
||
|
|
|
||
|
|
1. **Progress Metrics**:
|
||
|
|
- Total entries to process
|
||
|
|
- Entries processed
|
||
|
|
- Entries successful
|
||
|
|
- Entries failed
|
||
|
|
- Processing rate (entries/second)
|
||
|
|
|
||
|
|
2. **Performance Metrics**:
|
||
|
|
- Batch processing time
|
||
|
|
- Database query time
|
||
|
|
- Checkpoint save time
|
||
|
|
- Total elapsed time
|
||
|
|
|
||
|
|
3. **Quality Metrics**:
|
||
|
|
- Duplicate entries skipped
|
||
|
|
- Validation errors
|
||
|
|
- Balance inconsistencies
|
||
|
|
- Reconciliation mismatches
|
||
|
|
|
||
|
|
### Monitoring Queries
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Check backfill progress
|
||
|
|
SELECT
|
||
|
|
source_id,
|
||
|
|
source_type,
|
||
|
|
status,
|
||
|
|
total_processed,
|
||
|
|
total_successful,
|
||
|
|
total_failed,
|
||
|
|
last_checkpoint_at,
|
||
|
|
NOW() - last_checkpoint_at as time_since_last_checkpoint
|
||
|
|
FROM ledger_backfill_checkpoints
|
||
|
|
WHERE status = 'IN_PROGRESS';
|
||
|
|
|
||
|
|
-- Check for stalled backfills
|
||
|
|
SELECT *
|
||
|
|
FROM ledger_backfill_checkpoints
|
||
|
|
WHERE status = 'IN_PROGRESS'
|
||
|
|
AND last_checkpoint_at < NOW() - INTERVAL '1 hour';
|
||
|
|
|
||
|
|
-- Verify backfill completion
|
||
|
|
SELECT
|
||
|
|
COUNT(*) as total_entries,
|
||
|
|
MIN(timestamp_utc) as earliest,
|
||
|
|
MAX(timestamp_utc) as latest
|
||
|
|
FROM ledger_entries
|
||
|
|
WHERE reference_id LIKE 'BACKFILL-%';
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Rollback Procedures
|
||
|
|
|
||
|
|
### Scenario 1: Rollback Before Constraints Applied
|
||
|
|
|
||
|
|
If constraints have not been applied, rollback is straightforward:
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Remove backfilled entries
|
||
|
|
DELETE FROM ledger_entries
|
||
|
|
WHERE reference_id LIKE 'BACKFILL-%';
|
||
|
|
|
||
|
|
-- Remove outbox records
|
||
|
|
DELETE FROM dual_ledger_outbox
|
||
|
|
WHERE reference_id LIKE 'BACKFILL-%';
|
||
|
|
|
||
|
|
-- Reset balances (if needed)
|
||
|
|
UPDATE bank_accounts
|
||
|
|
SET balance = balance - (
|
||
|
|
SELECT COALESCE(SUM(amount), 0)
|
||
|
|
FROM ledger_entries
|
||
|
|
WHERE debit_account_id = bank_accounts.id
|
||
|
|
AND reference_id LIKE 'BACKFILL-%'
|
||
|
|
) + (
|
||
|
|
SELECT COALESCE(SUM(amount), 0)
|
||
|
|
FROM ledger_entries
|
||
|
|
WHERE credit_account_id = bank_accounts.id
|
||
|
|
AND reference_id LIKE 'BACKFILL-%'
|
||
|
|
);
|
||
|
|
```
|
||
|
|
|
||
|
|
### Scenario 2: Rollback After Constraints Applied
|
||
|
|
|
||
|
|
If constraints have been applied, rollback is more complex:
|
||
|
|
|
||
|
|
1. Temporarily disable constraints
|
||
|
|
2. Remove backfilled entries
|
||
|
|
3. Recalculate balances
|
||
|
|
4. Re-enable constraints
|
||
|
|
5. Verify balance consistency
|
||
|
|
|
||
|
|
**Note**: This should only be done during maintenance window.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Troubleshooting
|
||
|
|
|
||
|
|
### Common Issues
|
||
|
|
|
||
|
|
#### 1. Duplicate Reference ID Errors
|
||
|
|
|
||
|
|
**Symptom**: `unique_violation` error on `reference_id`
|
||
|
|
|
||
|
|
**Solution**:
|
||
|
|
- Check if entries were already backfilled
|
||
|
|
- Use `skipDuplicates: true` to skip existing entries
|
||
|
|
- Or regenerate reference IDs for duplicates
|
||
|
|
|
||
|
|
#### 2. Balance Inconsistencies
|
||
|
|
|
||
|
|
**Symptom**: Balance validation fails
|
||
|
|
|
||
|
|
**Solution**:
|
||
|
|
- Identify accounts with inconsistent balances
|
||
|
|
- Generate correction entries
|
||
|
|
- Post corrections before applying constraints
|
||
|
|
|
||
|
|
#### 3. Slow Performance
|
||
|
|
|
||
|
|
**Symptom**: Backfill processing is slow
|
||
|
|
|
||
|
|
**Solution**:
|
||
|
|
- Increase batch size (if memory allows)
|
||
|
|
- Use parallel processing
|
||
|
|
- Optimize database indexes
|
||
|
|
- Run during off-peak hours
|
||
|
|
|
||
|
|
#### 4. Out of Memory
|
||
|
|
|
||
|
|
**Symptom**: Process runs out of memory
|
||
|
|
|
||
|
|
**Solution**:
|
||
|
|
- Reduce batch size
|
||
|
|
- Process sequentially instead of parallel
|
||
|
|
- Use streaming instead of loading all data
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Examples
|
||
|
|
|
||
|
|
### Example 1: CSV Backfill
|
||
|
|
|
||
|
|
```bash
|
||
|
|
# Configure environment
|
||
|
|
export DATABASE_URL="postgresql://user:password@host:port/database"
|
||
|
|
export BACKFILL_SOURCE_FILE="ledger_export.csv"
|
||
|
|
export LEDGER_ID="MASTER"
|
||
|
|
export BATCH_SIZE="1000"
|
||
|
|
export SKIP_DUPLICATES="true"
|
||
|
|
|
||
|
|
# Run backfill script
|
||
|
|
cd dbis_core
|
||
|
|
ts-node scripts/backfill-ledger.ts
|
||
|
|
```
|
||
|
|
|
||
|
|
### Example 2: SCB Ledger Sync
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
// Backfill from SCB ledger via API
|
||
|
|
async function backfillFromSCB(sovereignBankId: string, startDate: Date, endDate: Date) {
|
||
|
|
const scbApi = new SCBLedgerAPI(sovereignBankId);
|
||
|
|
const entries = await scbApi.getLedgerEntries(startDate, endDate);
|
||
|
|
|
||
|
|
for (const entry of entries) {
|
||
|
|
// Transform SCB entry to DBIS format
|
||
|
|
const dbisEntry = transformSCBEntry(entry);
|
||
|
|
|
||
|
|
// Post to DBIS (will create outbox record for dual-ledger sync)
|
||
|
|
await ledgerPostingModule.postEntry(dbisEntry);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## References
|
||
|
|
|
||
|
|
- Migration Files: `dbis_core/db/migrations/`
|
||
|
|
- Ledger Posting Module: `dbis_core/src/core/ledger/ledger-posting.module.ts`
|
||
|
|
- Atomic Posting Function: `dbis_core/db/migrations/005_post_ledger_entry.sql`
|
||
|
|
- Block Indexer Backfill: `explorer-monorepo/backend/indexer/backfill/backfill.go` (reference pattern)
|