package freshness import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strconv" "strings" "time" "github.com/jackc/pgx/v5" ) type QueryRowFunc func(ctx context.Context, sql string, args ...any) pgx.Row type Confidence string const ( ConfidenceHigh Confidence = "high" ConfidenceMedium Confidence = "medium" ConfidenceLow Confidence = "low" ConfidenceUnknown Confidence = "unknown" ) type Completeness string const ( CompletenessComplete Completeness = "complete" CompletenessPartial Completeness = "partial" CompletenessStale Completeness = "stale" CompletenessUnavailable Completeness = "unavailable" ) type Source string const ( SourceReported Source = "reported" SourceDerived Source = "derived" SourceSampled Source = "sampled" SourceUnavailable Source = "unavailable" ) type Provenance string const ( ProvenanceRPC Provenance = "rpc" ProvenanceExplorerIndex Provenance = "explorer_index" ProvenanceTxIndex Provenance = "tx_index" ProvenanceMissionFeed Provenance = "mission_control_feed" ProvenanceComposite Provenance = "composite" ) type Reference struct { BlockNumber *int64 `json:"block_number"` Timestamp *string `json:"timestamp"` AgeSeconds *int64 `json:"age_seconds"` Hash *string `json:"hash,omitempty"` DistanceFromHead *int64 `json:"distance_from_head,omitempty"` Source Source `json:"source"` Confidence Confidence `json:"confidence"` Provenance Provenance `json:"provenance"` Completeness Completeness `json:"completeness,omitempty"` } type Snapshot struct { ChainHead Reference `json:"chain_head"` LatestIndexedBlock Reference `json:"latest_indexed_block"` LatestIndexedTransaction Reference `json:"latest_indexed_transaction"` LatestNonEmptyBlock Reference `json:"latest_non_empty_block"` } type Diagnostics struct { TxVisibilityState string `json:"tx_visibility_state"` ActivityState string `json:"activity_state"` Explanation string `json:"explanation,omitempty"` TxLagBlocks *int64 `json:"tx_lag_blocks,omitempty"` TxLagSeconds *int64 `json:"tx_lag_seconds,omitempty"` RecentBlockSampleSize *int64 `json:"recent_block_sample_size,omitempty"` RecentNonEmptyBlocks *int64 `json:"recent_non_empty_blocks,omitempty"` RecentTransactions *int64 `json:"recent_transactions,omitempty"` LatestNonEmptyFromBlockFeed Reference `json:"latest_non_empty_block_from_block_feed"` Source Source `json:"source"` Confidence Confidence `json:"confidence"` Provenance Provenance `json:"provenance"` Completeness Completeness `json:"completeness"` } type SummaryCompleteness struct { TransactionsFeed Completeness `json:"transactions_feed"` BlocksFeed Completeness `json:"blocks_feed"` GasMetrics Completeness `json:"gas_metrics"` UtilizationMetric Completeness `json:"utilization_metrics"` } type Sampling struct { StatsGeneratedAt *string `json:"stats_generated_at"` RPCProbeAt *string `json:"rpc_probe_at"` StatsWindowSec *int64 `json:"stats_window_seconds,omitempty"` Issues map[string]string `json:"issues,omitempty"` } type HeadProbeFunc func(ctx context.Context) (*Reference, error) func ptrInt64(value int64) *int64 { return &value } func ptrString(value string) *string { return &value } func unknownReference(provenance Provenance) Reference { return Reference{ Source: SourceUnavailable, Confidence: ConfidenceUnknown, Provenance: provenance, Completeness: CompletenessUnavailable, } } func timePointer(value time.Time) *string { if value.IsZero() { return nil } formatted := value.UTC().Format(time.RFC3339) return &formatted } func computeAge(timestamp *string, now time.Time) *int64 { if timestamp == nil || *timestamp == "" { return nil } parsed, err := time.Parse(time.RFC3339, *timestamp) if err != nil { return nil } age := int64(now.Sub(parsed).Seconds()) if age < 0 { age = 0 } return &age } func classifyIndexedVisibility(age *int64) Completeness { if age == nil { return CompletenessUnavailable } switch { case *age <= 15*60: return CompletenessComplete case *age <= 3*60*60: return CompletenessPartial default: return CompletenessStale } } func classifyBlockFeed(chainHead *int64, indexedHead *int64) Completeness { if chainHead == nil || indexedHead == nil { return CompletenessUnavailable } distance := *chainHead - *indexedHead if distance < 0 { distance = 0 } switch { case distance <= 2: return CompletenessComplete case distance <= 32: return CompletenessPartial default: return CompletenessStale } } func classifyMetricPresence[T comparable](value *T) Completeness { if value == nil { return CompletenessUnavailable } return CompletenessComplete } func classifyTxVisibilityState(age *int64) string { if age == nil { return "unavailable" } switch { case *age <= 15*60: return "current" case *age <= 3*60*60: return "lagging" default: return "stale" } } func classifyActivityState(txVisibility string, txLagBlocks, recentTransactions, recentNonEmptyBlocks *int64) (string, string, Completeness) { if txVisibility == "unavailable" { if recentTransactions != nil && *recentTransactions > 0 { return "limited_observability", "Recent blocks show on-chain transaction activity, but indexed transaction freshness is unavailable.", CompletenessPartial } return "limited_observability", "Transaction freshness is unavailable, and recent block activity is limited.", CompletenessUnavailable } if recentTransactions != nil && *recentTransactions > 0 { if txLagBlocks != nil && *txLagBlocks > 32 { return "fresh_head_stale_transaction_visibility", "Recent block activity is present closer to the head than the visible indexed transaction feed.", CompletenessPartial } if *recentTransactions <= 3 { return "sparse_activity", "Recent blocks contain only a small amount of transaction activity.", CompletenessComplete } return "active", "Recent blocks contain visible transaction activity close to the head.", CompletenessComplete } if recentNonEmptyBlocks != nil && *recentNonEmptyBlocks == 0 { return "quiet_chain", "Recent sampled head blocks are empty, which indicates a quiet chain rather than a broken explorer.", CompletenessComplete } if txLagBlocks != nil && *txLagBlocks > 32 { return "fresh_head_stale_transaction_visibility", "The chain head is current, but the indexed transaction feed trails the current tip.", CompletenessPartial } return "sparse_activity", "Recent visible transaction activity is limited.", CompletenessComplete } func BuildSnapshot( ctx context.Context, chainID int, queryRow QueryRowFunc, probeHead HeadProbeFunc, now time.Time, averageGasPrice *float64, utilization *float64, ) (Snapshot, SummaryCompleteness, Sampling, Diagnostics, error) { snapshot := Snapshot{ ChainHead: unknownReference(ProvenanceRPC), LatestIndexedBlock: unknownReference(ProvenanceExplorerIndex), LatestIndexedTransaction: unknownReference(ProvenanceTxIndex), LatestNonEmptyBlock: unknownReference(ProvenanceTxIndex), } diagnostics := Diagnostics{ TxVisibilityState: "unavailable", ActivityState: "limited_observability", LatestNonEmptyFromBlockFeed: unknownReference(ProvenanceExplorerIndex), Source: SourceReported, Confidence: ConfidenceMedium, Provenance: ProvenanceComposite, Completeness: CompletenessUnavailable, } issues := map[string]string{} if probeHead != nil { if head, err := probeHead(ctx); err == nil && head != nil { snapshot.ChainHead = *head } else if err != nil { issues["chain_head"] = err.Error() } } var latestIndexedBlockNumber int64 var latestIndexedBlockTime time.Time if err := queryRow(ctx, `SELECT number, timestamp FROM blocks ORDER BY number DESC LIMIT 1`, ).Scan(&latestIndexedBlockNumber, &latestIndexedBlockTime); err == nil { timestamp := timePointer(latestIndexedBlockTime) snapshot.LatestIndexedBlock = Reference{ BlockNumber: ptrInt64(latestIndexedBlockNumber), Timestamp: timestamp, AgeSeconds: computeAge(timestamp, now), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceExplorerIndex, Completeness: CompletenessComplete, } } else { issues["latest_indexed_block"] = err.Error() } var latestTxHash string var latestTxBlock int64 var latestTxCreatedAt time.Time if err := queryRow(ctx, `SELECT concat('0x', encode(hash, 'hex')), block_number::bigint, COALESCE(block_timestamp, inserted_at) FROM transactions WHERE block_number IS NOT NULL ORDER BY block_number DESC, "index" DESC LIMIT 1`, ).Scan(&latestTxHash, &latestTxBlock, &latestTxCreatedAt); err == nil { timestamp := timePointer(latestTxCreatedAt) snapshot.LatestIndexedTransaction = Reference{ BlockNumber: ptrInt64(latestTxBlock), Timestamp: timestamp, AgeSeconds: computeAge(timestamp, now), Hash: ptrString(latestTxHash), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceTxIndex, Completeness: classifyIndexedVisibility(computeAge(timestamp, now)), } } else { issues["latest_indexed_transaction"] = err.Error() } var latestNonEmptyBlockNumber int64 var latestNonEmptyBlockTime time.Time if err := queryRow(ctx, `SELECT b.number, b.timestamp FROM blocks b WHERE EXISTS ( SELECT 1 FROM transactions t WHERE t.block_number = b.number ) ORDER BY b.number DESC LIMIT 1`, ).Scan(&latestNonEmptyBlockNumber, &latestNonEmptyBlockTime); err == nil { timestamp := timePointer(latestNonEmptyBlockTime) ref := Reference{ BlockNumber: ptrInt64(latestNonEmptyBlockNumber), Timestamp: timestamp, AgeSeconds: computeAge(timestamp, now), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceTxIndex, Completeness: classifyIndexedVisibility(computeAge(timestamp, now)), } if snapshot.ChainHead.BlockNumber != nil { distance := *snapshot.ChainHead.BlockNumber - latestNonEmptyBlockNumber if distance < 0 { distance = 0 } ref.DistanceFromHead = ptrInt64(distance) } snapshot.LatestNonEmptyBlock = ref } else { issues["latest_non_empty_block"] = err.Error() } var latestBlockFeedNonEmptyNumber int64 var latestBlockFeedNonEmptyTime time.Time if err := queryRow(ctx, `SELECT b.number, b.timestamp FROM blocks b JOIN ( SELECT DISTINCT block_number FROM transactions WHERE block_number IS NOT NULL ) tx_blocks ON tx_blocks.block_number = b.number ORDER BY b.number DESC LIMIT 1`, ).Scan(&latestBlockFeedNonEmptyNumber, &latestBlockFeedNonEmptyTime); err == nil { timestamp := timePointer(latestBlockFeedNonEmptyTime) ref := Reference{ BlockNumber: ptrInt64(latestBlockFeedNonEmptyNumber), Timestamp: timestamp, AgeSeconds: computeAge(timestamp, now), Source: SourceDerived, Confidence: ConfidenceMedium, Provenance: ProvenanceComposite, Completeness: snapshot.LatestIndexedTransaction.Completeness, } if snapshot.ChainHead.BlockNumber != nil { distance := *snapshot.ChainHead.BlockNumber - latestBlockFeedNonEmptyNumber if distance < 0 { distance = 0 } ref.DistanceFromHead = ptrInt64(distance) } diagnostics.LatestNonEmptyFromBlockFeed = ref } else { issues["latest_non_empty_block_from_block_feed"] = err.Error() } var recentBlockSampleSize, recentNonEmptyBlocks, recentTransactions int64 if err := queryRow(ctx, `SELECT COUNT(*)::bigint, COUNT(*) FILTER (WHERE COALESCE(tx_counts.tx_count, 0) > 0)::bigint, COALESCE(SUM(COALESCE(tx_counts.tx_count, 0)), 0)::bigint FROM ( SELECT number FROM blocks ORDER BY number DESC LIMIT 128 ) recent_blocks LEFT JOIN ( SELECT block_number, COUNT(*)::bigint AS tx_count FROM transactions WHERE block_number IS NOT NULL GROUP BY block_number ) tx_counts ON tx_counts.block_number = recent_blocks.number`, ).Scan(&recentBlockSampleSize, &recentNonEmptyBlocks, &recentTransactions); err == nil { diagnostics.RecentBlockSampleSize = ptrInt64(recentBlockSampleSize) diagnostics.RecentNonEmptyBlocks = ptrInt64(recentNonEmptyBlocks) diagnostics.RecentTransactions = ptrInt64(recentTransactions) } else { issues["recent_block_activity"] = err.Error() } if snapshot.ChainHead.BlockNumber != nil && snapshot.LatestIndexedTransaction.BlockNumber != nil { lag := *snapshot.ChainHead.BlockNumber - *snapshot.LatestIndexedTransaction.BlockNumber if lag < 0 { lag = 0 } diagnostics.TxLagBlocks = ptrInt64(lag) } diagnostics.TxLagSeconds = snapshot.LatestIndexedTransaction.AgeSeconds diagnostics.TxVisibilityState = classifyTxVisibilityState(snapshot.LatestIndexedTransaction.AgeSeconds) diagnostics.ActivityState, diagnostics.Explanation, diagnostics.Completeness = classifyActivityState( diagnostics.TxVisibilityState, diagnostics.TxLagBlocks, diagnostics.RecentTransactions, diagnostics.RecentNonEmptyBlocks, ) statsGeneratedAt := now.UTC().Format(time.RFC3339) sampling := Sampling{ StatsGeneratedAt: ptrString(statsGeneratedAt), StatsWindowSec: ptrInt64(300), } if len(issues) > 0 { sampling.Issues = issues } if snapshot.ChainHead.Timestamp != nil { sampling.RPCProbeAt = snapshot.ChainHead.Timestamp } completeness := SummaryCompleteness{ TransactionsFeed: snapshot.LatestIndexedTransaction.Completeness, BlocksFeed: classifyBlockFeed(snapshot.ChainHead.BlockNumber, snapshot.LatestIndexedBlock.BlockNumber), GasMetrics: classifyMetricPresence(averageGasPrice), UtilizationMetric: classifyMetricPresence(utilization), } return snapshot, completeness, sampling, diagnostics, nil } func ProbeChainHead(ctx context.Context, rpcURL string) (*Reference, error) { rpcURL = strings.TrimSpace(rpcURL) if rpcURL == "" { return nil, fmt.Errorf("empty rpc url") } blockNumberRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_blockNumber", []interface{}{}) if err != nil { return nil, err } var blockNumberHex string if err := json.Unmarshal(blockNumberRaw, &blockNumberHex); err != nil { return nil, err } blockNumber, err := strconv.ParseInt(strings.TrimPrefix(strings.TrimSpace(blockNumberHex), "0x"), 16, 64) if err != nil { return nil, err } blockRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_getBlockByNumber", []interface{}{"latest", false}) if err != nil { return nil, err } var latestBlock struct { Timestamp string `json:"timestamp"` } if err := json.Unmarshal(blockRaw, &latestBlock); err != nil { return nil, err } blockTimeHex := strings.TrimSpace(latestBlock.Timestamp) if blockTimeHex == "" { return nil, fmt.Errorf("missing block timestamp") } blockTimestamp, err := strconv.ParseInt(strings.TrimPrefix(blockTimeHex, "0x"), 16, 64) if err != nil { return nil, err } ts := time.Unix(blockTimestamp, 0).UTC() timestamp := ts.Format(time.RFC3339) now := time.Now().UTC() age := int64(now.Sub(ts).Seconds()) if age < 0 { age = 0 } return &Reference{ BlockNumber: ptrInt64(blockNumber), Timestamp: ptrString(timestamp), AgeSeconds: ptrInt64(age), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceRPC, Completeness: CompletenessComplete, }, nil } func postJSONRPC(ctx context.Context, rpcURL string, method string, params []interface{}) (json.RawMessage, int64, error) { body, err := json.Marshal(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, "method": method, "params": params, }) if err != nil { return nil, 0, err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, rpcURL, bytes.NewReader(body)) if err != nil { return nil, 0, err } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 6 * time.Second} start := time.Now() resp, err := client.Do(req) latency := time.Since(start).Milliseconds() if err != nil { return nil, latency, err } defer resp.Body.Close() payload, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if err != nil { return nil, latency, err } if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, latency, fmt.Errorf("http %d", resp.StatusCode) } var out struct { Result json.RawMessage `json:"result"` Error *struct { Message string `json:"message"` } `json:"error"` } if err := json.Unmarshal(payload, &out); err != nil { return nil, latency, err } if out.Error != nil && out.Error.Message != "" { return nil, latency, fmt.Errorf("rpc error: %s", out.Error.Message) } return out.Result, latency, nil }