Files
dbis_core/db/migrations/BACKFILL_STRATEGY.md

956 lines
27 KiB
Markdown
Raw Permalink Normal View History

# Ledger Backfill Strategy
**Version**: 1.0.0
**Last Updated**: 2025-01-20
**Status**: Active Documentation
## Overview
This document outlines the strategy for backfilling historical ledger data into the DBIS Core Banking System ledger. The backfill process ensures data integrity, maintains idempotency, and supports resumable operations.
---
## Backfill Scenarios
### Scenario 1: Initial System Setup (Empty Ledger)
**Use Case**: Setting up a new DBIS instance with empty ledger, populating from external source (e.g., legacy system, CSV export, external API).
**Approach**:
1. Validate source data integrity
2. Transform source data to DBIS ledger format
3. Batch insert with idempotency checks
4. Verify balance consistency
5. Apply constraints after backfill
### Scenario 2: Schema Migration (Existing Ledger Data)
**Use Case**: Migrating existing ledger data to new schema (e.g., adding new fields, restructuring).
**Approach**:
1. Audit existing data
2. Transform to new schema format
3. Migrate in batches
4. Verify data integrity
5. Update schema constraints
### Scenario 3: Data Reconciliation (Fix Inconsistencies)
**Use Case**: Fixing inconsistent balances or missing entries discovered during audit.
**Approach**:
1. Identify inconsistencies
2. Generate correction entries
3. Apply corrections via normal posting function
4. Verify balance consistency
5. Document corrections in audit log
### Scenario 4: Dual-Ledger Sync (SCB Ledger Backfill)
**Use Case**: Backfilling historical entries from SCB (Sovereign Central Bank) ledger to DBIS.
**Approach**:
1. Extract entries from SCB ledger
2. Transform to DBIS format
3. Post to DBIS via outbox pattern
4. Track sync status
5. Verify dual-ledger consistency
---
## Backfill Architecture
### Component Overview
```
┌─────────────────────────────────────────────────────────────┐
│ Backfill Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ Source │────▶│ Transform │────▶│ Validate │ │
│ │ Reader │ │ Service │ │ Service │ │
│ └─────────────┘ └──────────────┘ └─────────────┘ │
│ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Batch │ │
│ │ Processor │ │
│ └──────────────┘ │
│ │ │
│ ┌───────────┴───────────┐ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Ledger │ │ Checkpoint │ │
│ │ Posting │ │ Service │ │
│ │ Module │ └──────────────┘ │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Audit & │ │
│ │ Verification │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
```
### Key Components
1. **Source Reader**: Reads data from source (CSV, API, database, etc.)
2. **Transform Service**: Transforms source data to DBIS ledger format
3. **Validate Service**: Validates entries before posting
4. **Batch Processor**: Processes entries in batches with checkpointing
5. **Ledger Posting Module**: Uses atomic posting function for entries
6. **Checkpoint Service**: Tracks progress for resumable backfill
7. **Audit & Verification**: Validates backfill results
---
## Backfill Process
### Step 1: Pre-Backfill Preparation
#### 1.1 Audit Existing Data
Before starting backfill, audit existing data:
```sql
-- Check for existing ledger entries
SELECT COUNT(*), MIN(timestamp_utc), MAX(timestamp_utc)
FROM ledger_entries;
-- Check for inconsistent balances
SELECT id, balance, available_balance, reserved_balance
FROM bank_accounts
WHERE available_balance < 0
OR reserved_balance < 0
OR available_balance > balance
OR (available_balance + reserved_balance) > balance;
-- Check for duplicate reference IDs
SELECT ledger_id, reference_id, COUNT(*)
FROM ledger_entries
GROUP BY ledger_id, reference_id
HAVING COUNT(*) > 1;
```
#### 1.2 Verify Schema
Ensure all required migrations are applied:
```sql
-- Verify idempotency constraint exists
SELECT constraint_name
FROM information_schema.table_constraints
WHERE table_name = 'ledger_entries'
AND constraint_name LIKE '%reference%';
-- Verify outbox table exists
SELECT COUNT(*) FROM dual_ledger_outbox;
-- Verify posting function exists
SELECT routine_name
FROM information_schema.routines
WHERE routine_name = 'post_ledger_entry';
```
#### 1.3 Prepare Source Data
- **CSV Export**: Ensure format matches expected schema
- **API Extraction**: Configure API endpoints and authentication
- **Database Extraction**: Set up connection and query filters
- **Legacy System**: Configure export format and mapping
---
### Step 2: Data Transformation
#### 2.1 Source Data Format
Source data should be transformed to this format:
```typescript
interface LedgerEntrySource {
ledgerId: string; // e.g., "MASTER", "SOVEREIGN"
debitAccountId: string; // Account ID
creditAccountId: string; // Account ID
amount: string; // Decimal as string (e.g., "1000.00")
currencyCode: string; // ISO 4217 (e.g., "USD")
assetType: string; // "fiat", "cbdc", "commodity", "security"
transactionType: string; // Transaction type classification
referenceId: string; // Unique reference ID (required for idempotency)
timestampUtc?: string; // ISO 8601 timestamp
fxRate?: string; // FX rate if applicable
metadata?: Record<string, unknown>; // Additional metadata
}
```
#### 2.2 Transformation Rules
1. **Account ID Mapping**: Map source account identifiers to DBIS account IDs
2. **Amount Normalization**: Convert amounts to standard format (decimal string)
3. **Currency Validation**: Validate currency codes against ISO 4217
4. **Timestamp Normalization**: Convert timestamps to UTC ISO 8601 format
5. **Reference ID Generation**: Generate unique reference IDs if not present
6. **Metadata Extraction**: Extract relevant metadata from source
#### 2.3 Example Transformation Script
```typescript
// Example: Transform CSV data
function transformCSVToLedgerEntry(csvRow: CSVRow): LedgerEntrySource {
return {
ledgerId: csvRow.ledger || 'MASTER',
debitAccountId: mapAccountId(csvRow.debit_account),
creditAccountId: mapAccountId(csvRow.credit_account),
amount: normalizeAmount(csvRow.amount),
currencyCode: csvRow.currency || 'USD',
assetType: csvRow.asset_type || 'fiat',
transactionType: mapTransactionType(csvRow.txn_type),
referenceId: csvRow.reference_id || generateReferenceId(csvRow),
timestampUtc: csvRow.timestamp || new Date().toISOString(),
fxRate: csvRow.fx_rate || undefined,
metadata: extractMetadata(csvRow),
};
}
```
---
### Step 3: Batch Processing
#### 3.1 Batch Configuration
Configure batch processing parameters:
```typescript
interface BackfillConfig {
batchSize: number; // Entries per batch (default: 1000)
checkpointInterval: number; // Checkpoint every N batches (default: 10)
maxRetries: number; // Max retries per batch (default: 3)
retryDelay: number; // Initial retry delay in ms (default: 1000)
parallelWorkers: number; // Number of parallel workers (default: 1)
skipDuplicates: boolean; // Skip entries with duplicate reference IDs (default: true)
validateBalances: boolean; // Validate balances after each batch (default: true)
}
```
#### 3.2 Checkpointing Strategy
Use checkpointing to enable resumable backfill:
```sql
-- Create checkpoint table for ledger backfill
CREATE TABLE IF NOT EXISTS ledger_backfill_checkpoints (
id SERIAL PRIMARY KEY,
source_id VARCHAR(255) NOT NULL,
source_type VARCHAR(50) NOT NULL, -- 'CSV', 'API', 'DATABASE', 'SCB'
last_processed_id VARCHAR(255),
last_processed_timestamp TIMESTAMP,
total_processed BIGINT DEFAULT 0,
total_successful BIGINT DEFAULT 0,
total_failed BIGINT DEFAULT 0,
status VARCHAR(50) DEFAULT 'IN_PROGRESS', -- 'IN_PROGRESS', 'COMPLETED', 'FAILED', 'PAUSED'
started_at TIMESTAMP DEFAULT NOW(),
last_checkpoint_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
error_message TEXT,
metadata JSONB,
UNIQUE(source_id, source_type)
);
CREATE INDEX idx_backfill_checkpoints_status
ON ledger_backfill_checkpoints(status);
CREATE INDEX idx_backfill_checkpoints_source
ON ledger_backfill_checkpoints(source_id, source_type);
```
#### 3.3 Batch Processing Loop
```typescript
async function processBackfill(
source: DataSource,
config: BackfillConfig
): Promise<BackfillResult> {
const checkpoint = await loadCheckpoint(source.id, source.type);
let processed = 0;
let successful = 0;
let failed = 0;
let lastProcessedId: string | null = null;
let lastProcessedTimestamp: Date | null = null;
while (true) {
// Load batch from source (starting from checkpoint)
const batch = await source.readBatch({
startId: checkpoint?.lastProcessedId,
startTimestamp: checkpoint?.lastProcessedTimestamp,
limit: config.batchSize,
});
if (batch.length === 0) {
break; // No more data
}
// Process batch
const results = await processBatch(batch, config);
// Update counters
processed += batch.length;
successful += results.successful;
failed += results.failed;
// Update checkpoint
lastProcessedId = batch[batch.length - 1].id;
lastProcessedTimestamp = batch[batch.length - 1].timestamp;
await saveCheckpoint({
sourceId: source.id,
sourceType: source.type,
lastProcessedId,
lastProcessedTimestamp,
totalProcessed: processed,
totalSuccessful: successful,
totalFailed: failed,
status: 'IN_PROGRESS',
});
// Validate balances if configured
if (config.validateBalances && processed % (config.checkpointInterval * config.batchSize) === 0) {
await validateBalances();
}
}
// Mark as completed
await saveCheckpoint({
sourceId: source.id,
sourceType: source.type,
status: 'COMPLETED',
completedAt: new Date(),
});
return {
totalProcessed: processed,
totalSuccessful: successful,
totalFailed: failed,
};
}
```
---
### Step 4: Entry Posting
#### 4.1 Use Atomic Posting Function
Always use the atomic posting function for backfill entries:
```typescript
async function postBackfillEntry(entry: LedgerEntrySource): Promise<void> {
try {
// Use atomic posting function via SQL
const result = await prisma.$executeRaw`
SELECT post_ledger_entry(
${entry.ledgerId}::TEXT,
${entry.debitAccountId}::TEXT,
${entry.creditAccountId}::TEXT,
${entry.amount}::NUMERIC,
${entry.currencyCode}::TEXT,
${entry.assetType}::TEXT,
${entry.transactionType}::TEXT,
${entry.referenceId}::TEXT,
${entry.fxRate || null}::NUMERIC,
${entry.metadata ? JSON.stringify(entry.metadata) : null}::JSONB
)
`;
// Verify result
if (!result) {
throw new Error('Failed to post ledger entry');
}
} catch (error) {
// Handle idempotency violation (duplicate reference ID)
if (error.code === '23505' || error.message?.includes('duplicate')) {
// Skip duplicate entries if configured
if (config.skipDuplicates) {
return; // Entry already exists, skip
}
throw new Error(`Duplicate reference ID: ${entry.referenceId}`);
}
throw error;
}
}
```
#### 4.2 Batch Posting
Post entries in batches for efficiency:
```typescript
async function processBatch(
entries: LedgerEntrySource[],
config: BackfillConfig
): Promise<{ successful: number; failed: number }> {
let successful = 0;
let failed = 0;
// Process in parallel if configured
if (config.parallelWorkers > 1) {
const chunks = chunkArray(entries, config.parallelWorkers);
const results = await Promise.allSettled(
chunks.map((chunk) => processChunk(chunk, config))
);
for (const result of results) {
if (result.status === 'fulfilled') {
successful += result.value.successful;
failed += result.value.failed;
} else {
failed += entries.length;
}
}
} else {
// Sequential processing
for (const entry of entries) {
try {
await postBackfillEntry(entry);
successful++;
} catch (error) {
failed++;
logError(entry, error);
}
}
}
return { successful, failed };
}
```
---
### Step 5: Balance Constraints Application
#### 5.1 Pre-Constraint Validation
Before applying balance constraints, validate all balances:
```sql
-- Validate all balances are consistent
DO $$
DECLARE
inconsistent_count INTEGER;
BEGIN
SELECT COUNT(*) INTO inconsistent_count
FROM bank_accounts
WHERE available_balance < 0
OR reserved_balance < 0
OR available_balance > balance
OR (available_balance + reserved_balance) > balance;
IF inconsistent_count > 0 THEN
RAISE EXCEPTION 'Found % inconsistent balances. Fix before applying constraints.', inconsistent_count;
END IF;
END $$;
```
#### 5.2 Apply Constraints
After backfill completes and balances are validated, apply constraints:
```bash
# Apply balance constraints migration
psql $DATABASE_URL -f db/migrations/004_balance_constraints.sql
```
#### 5.3 Post-Constraint Verification
Verify constraints are applied correctly:
```sql
-- Check constraint exists
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'bank_accounts'
AND constraint_name LIKE '%balance%';
-- Verify constraint is enforced
-- This should fail if constraints are working
UPDATE bank_accounts
SET available_balance = -1
WHERE id = (SELECT id FROM bank_accounts LIMIT 1);
```
---
### Step 6: Verification and Reconciliation
#### 6.1 Entry Verification
Verify all entries were posted correctly:
```sql
-- Compare source count vs. posted count
SELECT
COUNT(*) as total_entries,
COUNT(DISTINCT reference_id) as unique_references,
COUNT(DISTINCT ledger_id) as unique_ledgers,
MIN(timestamp_utc) as earliest_entry,
MAX(timestamp_utc) as latest_entry
FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%'; -- If using prefix for backfill entries
-- Check for missing entries
SELECT source_id, reference_id
FROM backfill_source_data
WHERE NOT EXISTS (
SELECT 1 FROM ledger_entries
WHERE reference_id = backfill_source_data.reference_id
);
```
#### 6.2 Balance Reconciliation
Reconcile balances after backfill:
```sql
-- Compare expected vs. actual balances
SELECT
account_id,
expected_balance,
actual_balance,
(expected_balance - actual_balance) as difference
FROM (
SELECT
account_id,
SUM(CASE WHEN side = 'debit' THEN amount ELSE -amount END) as expected_balance,
(SELECT balance FROM bank_accounts WHERE id = account_id) as actual_balance
FROM ledger_entries
WHERE account_id IN (SELECT id FROM bank_accounts)
GROUP BY account_id
) reconciliation
WHERE ABS(expected_balance - actual_balance) > 0.01; -- Allow small rounding differences
```
#### 6.3 Dual-Ledger Reconciliation
If backfilling from SCB ledger, reconcile dual-ledger consistency:
```sql
-- Check outbox sync status
SELECT
status,
COUNT(*) as count,
MIN(created_at) as oldest,
MAX(created_at) as newest
FROM dual_ledger_outbox
WHERE created_at >= (SELECT MIN(timestamp_utc) FROM ledger_entries WHERE reference_id LIKE 'BACKFILL-%')
GROUP BY status;
-- Verify all entries have corresponding outbox records (for SCB sync)
SELECT le.id, le.reference_id
FROM ledger_entries le
WHERE le.reference_id LIKE 'BACKFILL-%'
AND NOT EXISTS (
SELECT 1 FROM dual_ledger_outbox dlo
WHERE dlo.reference_id = le.reference_id
);
```
---
## Implementation Scripts
### TypeScript Backfill Script
**File**: `dbis_core/scripts/backfill-ledger.ts`
```typescript
#!/usr/bin/env ts-node
import { PrismaClient } from '@prisma/client';
import { readFileSync } from 'fs';
import { parse } from 'csv-parse/sync';
const prisma = new PrismaClient();
interface BackfillConfig {
sourceFile: string;
ledgerId: string;
batchSize: number;
skipDuplicates: boolean;
}
async function backfillFromCSV(config: BackfillConfig) {
// Read and parse CSV
const csvData = readFileSync(config.sourceFile, 'utf-8');
const records = parse(csvData, {
columns: true,
skip_empty_lines: true,
});
let processed = 0;
let successful = 0;
let failed = 0;
// Process in batches
for (let i = 0; i < records.length; i += config.batchSize) {
const batch = records.slice(i, i + config.batchSize);
for (const record of batch) {
try {
// Transform and post entry
await prisma.$executeRaw`
SELECT post_ledger_entry(
${config.ledgerId}::TEXT,
${record.debitAccountId}::TEXT,
${record.creditAccountId}::TEXT,
${record.amount}::NUMERIC,
${record.currencyCode}::TEXT,
${record.assetType || 'fiat'}::TEXT,
${record.transactionType}::TEXT,
${record.referenceId}::TEXT,
${record.fxRate || null}::NUMERIC,
${record.metadata ? JSON.stringify(JSON.parse(record.metadata)) : null}::JSONB
)
`;
successful++;
} catch (error) {
if (config.skipDuplicates && error.code === '23505') {
// Skip duplicates
continue;
}
failed++;
console.error(`Failed to post entry ${record.referenceId}:`, error.message);
}
processed++;
}
console.log(`Processed ${processed}/${records.length} entries (${successful} successful, ${failed} failed)`);
}
return { processed, successful, failed };
}
// CLI entry point
const config: BackfillConfig = {
sourceFile: process.env.BACKFILL_SOURCE_FILE || 'backfill.csv',
ledgerId: process.env.LEDGER_ID || 'MASTER',
batchSize: parseInt(process.env.BATCH_SIZE || '1000', 10),
skipDuplicates: process.env.SKIP_DUPLICATES === 'true',
};
backfillFromCSV(config)
.then((result) => {
console.log('Backfill completed:', result);
process.exit(0);
})
.catch((error) => {
console.error('Backfill failed:', error);
process.exit(1);
})
.finally(() => {
prisma.$disconnect();
});
```
### SQL Backfill Script
**File**: `dbis_core/scripts/backfill-ledger.sql`
```sql
-- Ledger Backfill Script
-- Use this for direct SQL-based backfill from another database
-- Example: Backfill from external ledger_entries_legacy table
DO $$
DECLARE
batch_size INTEGER := 1000;
processed INTEGER := 0;
successful INTEGER := 0;
failed INTEGER := 0;
entry RECORD;
BEGIN
-- Create temporary table for batch processing
CREATE TEMP TABLE backfill_batch AS
SELECT * FROM ledger_entries_legacy
ORDER BY id
LIMIT 0;
-- Process in batches
FOR entry IN
SELECT * FROM ledger_entries_legacy
ORDER BY id
LOOP
BEGIN
-- Post entry using atomic function
PERFORM post_ledger_entry(
entry.ledger_id::TEXT,
entry.debit_account_id::TEXT,
entry.credit_account_id::TEXT,
entry.amount::NUMERIC,
entry.currency_code::TEXT,
entry.asset_type::TEXT,
entry.transaction_type::TEXT,
entry.reference_id::TEXT,
entry.fx_rate::NUMERIC,
entry.metadata::JSONB
);
successful := successful + 1;
EXCEPTION
WHEN unique_violation THEN
-- Duplicate reference ID, skip if configured
failed := failed + 1;
RAISE NOTICE 'Skipping duplicate reference ID: %', entry.reference_id;
WHEN OTHERS THEN
failed := failed + 1;
RAISE NOTICE 'Error processing entry %: %', entry.reference_id, SQLERRM;
END;
processed := processed + 1;
-- Checkpoint every batch_size entries
IF processed % batch_size = 0 THEN
RAISE NOTICE 'Processed % entries (% successful, % failed)', processed, successful, failed;
END IF;
END LOOP;
RAISE NOTICE 'Backfill completed: % total, % successful, % failed', processed, successful, failed;
END $$;
```
---
## Best Practices
### 1. Idempotency
- Always use unique `reference_id` for each entry
- Use atomic posting function that enforces idempotency
- Skip duplicates during backfill if they already exist
### 2. Checkpointing
- Save checkpoint after each batch
- Enable resumable backfill from last checkpoint
- Track progress with metrics (processed, successful, failed)
### 3. Validation
- Validate source data before transformation
- Validate transformed entries before posting
- Verify balances after backfill completion
- Reconcile with source system if possible
### 4. Error Handling
- Log all errors with full context
- Retry transient errors with exponential backoff
- Skip permanent errors (e.g., duplicate reference IDs)
- Generate error report after completion
### 5. Performance
- Process in batches (1000-10000 entries per batch)
- Use parallel processing for large backfills
- Monitor database performance during backfill
- Schedule during low-traffic periods
### 6. Testing
- Test backfill process on staging environment first
- Use small test dataset to verify transformation
- Verify balances match expected values
- Test rollback procedures if needed
---
## Monitoring and Metrics
### Key Metrics to Track
1. **Progress Metrics**:
- Total entries to process
- Entries processed
- Entries successful
- Entries failed
- Processing rate (entries/second)
2. **Performance Metrics**:
- Batch processing time
- Database query time
- Checkpoint save time
- Total elapsed time
3. **Quality Metrics**:
- Duplicate entries skipped
- Validation errors
- Balance inconsistencies
- Reconciliation mismatches
### Monitoring Queries
```sql
-- Check backfill progress
SELECT
source_id,
source_type,
status,
total_processed,
total_successful,
total_failed,
last_checkpoint_at,
NOW() - last_checkpoint_at as time_since_last_checkpoint
FROM ledger_backfill_checkpoints
WHERE status = 'IN_PROGRESS';
-- Check for stalled backfills
SELECT *
FROM ledger_backfill_checkpoints
WHERE status = 'IN_PROGRESS'
AND last_checkpoint_at < NOW() - INTERVAL '1 hour';
-- Verify backfill completion
SELECT
COUNT(*) as total_entries,
MIN(timestamp_utc) as earliest,
MAX(timestamp_utc) as latest
FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%';
```
---
## Rollback Procedures
### Scenario 1: Rollback Before Constraints Applied
If constraints have not been applied, rollback is straightforward:
```sql
-- Remove backfilled entries
DELETE FROM ledger_entries
WHERE reference_id LIKE 'BACKFILL-%';
-- Remove outbox records
DELETE FROM dual_ledger_outbox
WHERE reference_id LIKE 'BACKFILL-%';
-- Reset balances (if needed)
UPDATE bank_accounts
SET balance = balance - (
SELECT COALESCE(SUM(amount), 0)
FROM ledger_entries
WHERE debit_account_id = bank_accounts.id
AND reference_id LIKE 'BACKFILL-%'
) + (
SELECT COALESCE(SUM(amount), 0)
FROM ledger_entries
WHERE credit_account_id = bank_accounts.id
AND reference_id LIKE 'BACKFILL-%'
);
```
### Scenario 2: Rollback After Constraints Applied
If constraints have been applied, rollback is more complex:
1. Temporarily disable constraints
2. Remove backfilled entries
3. Recalculate balances
4. Re-enable constraints
5. Verify balance consistency
**Note**: This should only be done during maintenance window.
---
## Troubleshooting
### Common Issues
#### 1. Duplicate Reference ID Errors
**Symptom**: `unique_violation` error on `reference_id`
**Solution**:
- Check if entries were already backfilled
- Use `skipDuplicates: true` to skip existing entries
- Or regenerate reference IDs for duplicates
#### 2. Balance Inconsistencies
**Symptom**: Balance validation fails
**Solution**:
- Identify accounts with inconsistent balances
- Generate correction entries
- Post corrections before applying constraints
#### 3. Slow Performance
**Symptom**: Backfill processing is slow
**Solution**:
- Increase batch size (if memory allows)
- Use parallel processing
- Optimize database indexes
- Run during off-peak hours
#### 4. Out of Memory
**Symptom**: Process runs out of memory
**Solution**:
- Reduce batch size
- Process sequentially instead of parallel
- Use streaming instead of loading all data
---
## Examples
### Example 1: CSV Backfill
```bash
# Configure environment
export DATABASE_URL="postgresql://user:password@host:port/database"
export BACKFILL_SOURCE_FILE="ledger_export.csv"
export LEDGER_ID="MASTER"
export BATCH_SIZE="1000"
export SKIP_DUPLICATES="true"
# Run backfill script
cd dbis_core
ts-node scripts/backfill-ledger.ts
```
### Example 2: SCB Ledger Sync
```typescript
// Backfill from SCB ledger via API
async function backfillFromSCB(sovereignBankId: string, startDate: Date, endDate: Date) {
const scbApi = new SCBLedgerAPI(sovereignBankId);
const entries = await scbApi.getLedgerEntries(startDate, endDate);
for (const entry of entries) {
// Transform SCB entry to DBIS format
const dbisEntry = transformSCBEntry(entry);
// Post to DBIS (will create outbox record for dual-ledger sync)
await ledgerPostingModule.postEntry(dbisEntry);
}
}
```
---
## References
- Migration Files: `dbis_core/db/migrations/`
- Ledger Posting Module: `dbis_core/src/core/ledger/ledger-posting.module.ts`
- Atomic Posting Function: `dbis_core/db/migrations/005_post_ledger_entry.sql`
- Block Indexer Backfill: `explorer-monorepo/backend/indexer/backfill/backfill.go` (reference pattern)