package freshness import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strconv" "strings" "time" "github.com/jackc/pgx/v5" ) type QueryRowFunc func(ctx context.Context, sql string, args ...any) pgx.Row type Confidence string const ( ConfidenceHigh Confidence = "high" ConfidenceMedium Confidence = "medium" ConfidenceLow Confidence = "low" ConfidenceUnknown Confidence = "unknown" ) type Completeness string const ( CompletenessComplete Completeness = "complete" CompletenessPartial Completeness = "partial" CompletenessStale Completeness = "stale" CompletenessUnavailable Completeness = "unavailable" ) type Source string const ( SourceReported Source = "reported" SourceDerived Source = "derived" SourceSampled Source = "sampled" SourceUnavailable Source = "unavailable" ) type Provenance string const ( ProvenanceRPC Provenance = "rpc" ProvenanceExplorerIndex Provenance = "explorer_index" ProvenanceTxIndex Provenance = "tx_index" ProvenanceMissionFeed Provenance = "mission_control_feed" ProvenanceComposite Provenance = "composite" ) type Reference struct { BlockNumber *int64 `json:"block_number"` Timestamp *string `json:"timestamp"` AgeSeconds *int64 `json:"age_seconds"` Hash *string `json:"hash,omitempty"` DistanceFromHead *int64 `json:"distance_from_head,omitempty"` Source Source `json:"source"` Confidence Confidence `json:"confidence"` Provenance Provenance `json:"provenance"` Completeness Completeness `json:"completeness,omitempty"` } type Snapshot struct { ChainHead Reference `json:"chain_head"` LatestIndexedBlock Reference `json:"latest_indexed_block"` LatestIndexedTransaction Reference `json:"latest_indexed_transaction"` LatestNonEmptyBlock Reference `json:"latest_non_empty_block"` } type SummaryCompleteness struct { TransactionsFeed Completeness `json:"transactions_feed"` BlocksFeed Completeness `json:"blocks_feed"` GasMetrics Completeness `json:"gas_metrics"` UtilizationMetric Completeness `json:"utilization_metrics"` } type Sampling struct { StatsGeneratedAt *string `json:"stats_generated_at"` RPCProbeAt *string `json:"rpc_probe_at"` StatsWindowSec *int64 `json:"stats_window_seconds,omitempty"` Issues map[string]string `json:"issues,omitempty"` } type HeadProbeFunc func(ctx context.Context) (*Reference, error) func ptrInt64(value int64) *int64 { return &value } func ptrString(value string) *string { return &value } func unknownReference(provenance Provenance) Reference { return Reference{ Source: SourceUnavailable, Confidence: ConfidenceUnknown, Provenance: provenance, Completeness: CompletenessUnavailable, } } func timePointer(value time.Time) *string { if value.IsZero() { return nil } formatted := value.UTC().Format(time.RFC3339) return &formatted } func computeAge(timestamp *string, now time.Time) *int64 { if timestamp == nil || *timestamp == "" { return nil } parsed, err := time.Parse(time.RFC3339, *timestamp) if err != nil { return nil } age := int64(now.Sub(parsed).Seconds()) if age < 0 { age = 0 } return &age } func classifyIndexedVisibility(age *int64) Completeness { if age == nil { return CompletenessUnavailable } switch { case *age <= 15*60: return CompletenessComplete case *age <= 3*60*60: return CompletenessPartial default: return CompletenessStale } } func classifyBlockFeed(chainHead *int64, indexedHead *int64) Completeness { if chainHead == nil || indexedHead == nil { return CompletenessUnavailable } distance := *chainHead - *indexedHead if distance < 0 { distance = 0 } switch { case distance <= 2: return CompletenessComplete case distance <= 32: return CompletenessPartial default: return CompletenessStale } } func classifyMetricPresence[T comparable](value *T) Completeness { if value == nil { return CompletenessUnavailable } return CompletenessComplete } func BuildSnapshot( ctx context.Context, chainID int, queryRow QueryRowFunc, probeHead HeadProbeFunc, now time.Time, averageGasPrice *float64, utilization *float64, ) (Snapshot, SummaryCompleteness, Sampling, error) { snapshot := Snapshot{ ChainHead: unknownReference(ProvenanceRPC), LatestIndexedBlock: unknownReference(ProvenanceExplorerIndex), LatestIndexedTransaction: unknownReference(ProvenanceTxIndex), LatestNonEmptyBlock: unknownReference(ProvenanceTxIndex), } issues := map[string]string{} if probeHead != nil { if head, err := probeHead(ctx); err == nil && head != nil { snapshot.ChainHead = *head } else if err != nil { issues["chain_head"] = err.Error() } } var latestIndexedBlockNumber int64 var latestIndexedBlockTime time.Time if err := queryRow(ctx, `SELECT number, timestamp FROM blocks ORDER BY number DESC LIMIT 1`, ).Scan(&latestIndexedBlockNumber, &latestIndexedBlockTime); err == nil { timestamp := timePointer(latestIndexedBlockTime) snapshot.LatestIndexedBlock = Reference{ BlockNumber: ptrInt64(latestIndexedBlockNumber), Timestamp: timestamp, AgeSeconds: computeAge(timestamp, now), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceExplorerIndex, Completeness: CompletenessComplete, } } else { issues["latest_indexed_block"] = err.Error() } var latestTxHash string var latestTxBlock int64 var latestTxCreatedAt time.Time if err := queryRow(ctx, `SELECT concat('0x', encode(hash, 'hex')), block_number::bigint, COALESCE(block_timestamp, inserted_at) FROM transactions WHERE block_number IS NOT NULL ORDER BY block_number DESC, "index" DESC LIMIT 1`, ).Scan(&latestTxHash, &latestTxBlock, &latestTxCreatedAt); err == nil { timestamp := timePointer(latestTxCreatedAt) snapshot.LatestIndexedTransaction = Reference{ BlockNumber: ptrInt64(latestTxBlock), Timestamp: timestamp, AgeSeconds: computeAge(timestamp, now), Hash: ptrString(latestTxHash), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceTxIndex, Completeness: classifyIndexedVisibility(computeAge(timestamp, now)), } } else { issues["latest_indexed_transaction"] = err.Error() } var latestNonEmptyBlockNumber int64 var latestNonEmptyBlockTime time.Time if err := queryRow(ctx, `SELECT b.number, b.timestamp FROM blocks b WHERE EXISTS ( SELECT 1 FROM transactions t WHERE t.block_number = b.number ) ORDER BY b.number DESC LIMIT 1`, ).Scan(&latestNonEmptyBlockNumber, &latestNonEmptyBlockTime); err == nil { timestamp := timePointer(latestNonEmptyBlockTime) ref := Reference{ BlockNumber: ptrInt64(latestNonEmptyBlockNumber), Timestamp: timestamp, AgeSeconds: computeAge(timestamp, now), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceTxIndex, Completeness: classifyIndexedVisibility(computeAge(timestamp, now)), } if snapshot.ChainHead.BlockNumber != nil { distance := *snapshot.ChainHead.BlockNumber - latestNonEmptyBlockNumber if distance < 0 { distance = 0 } ref.DistanceFromHead = ptrInt64(distance) } snapshot.LatestNonEmptyBlock = ref } else { issues["latest_non_empty_block"] = err.Error() } statsGeneratedAt := now.UTC().Format(time.RFC3339) sampling := Sampling{ StatsGeneratedAt: ptrString(statsGeneratedAt), StatsWindowSec: ptrInt64(300), } if len(issues) > 0 { sampling.Issues = issues } if snapshot.ChainHead.Timestamp != nil { sampling.RPCProbeAt = snapshot.ChainHead.Timestamp } completeness := SummaryCompleteness{ TransactionsFeed: snapshot.LatestIndexedTransaction.Completeness, BlocksFeed: classifyBlockFeed(snapshot.ChainHead.BlockNumber, snapshot.LatestIndexedBlock.BlockNumber), GasMetrics: classifyMetricPresence(averageGasPrice), UtilizationMetric: classifyMetricPresence(utilization), } return snapshot, completeness, sampling, nil } func ProbeChainHead(ctx context.Context, rpcURL string) (*Reference, error) { rpcURL = strings.TrimSpace(rpcURL) if rpcURL == "" { return nil, fmt.Errorf("empty rpc url") } blockNumberRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_blockNumber", []interface{}{}) if err != nil { return nil, err } var blockNumberHex string if err := json.Unmarshal(blockNumberRaw, &blockNumberHex); err != nil { return nil, err } blockNumber, err := strconv.ParseInt(strings.TrimPrefix(strings.TrimSpace(blockNumberHex), "0x"), 16, 64) if err != nil { return nil, err } blockRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_getBlockByNumber", []interface{}{"latest", false}) if err != nil { return nil, err } var latestBlock struct { Timestamp string `json:"timestamp"` } if err := json.Unmarshal(blockRaw, &latestBlock); err != nil { return nil, err } blockTimeHex := strings.TrimSpace(latestBlock.Timestamp) if blockTimeHex == "" { return nil, fmt.Errorf("missing block timestamp") } blockTimestamp, err := strconv.ParseInt(strings.TrimPrefix(blockTimeHex, "0x"), 16, 64) if err != nil { return nil, err } ts := time.Unix(blockTimestamp, 0).UTC() timestamp := ts.Format(time.RFC3339) now := time.Now().UTC() age := int64(now.Sub(ts).Seconds()) if age < 0 { age = 0 } return &Reference{ BlockNumber: ptrInt64(blockNumber), Timestamp: ptrString(timestamp), AgeSeconds: ptrInt64(age), Source: SourceReported, Confidence: ConfidenceHigh, Provenance: ProvenanceRPC, Completeness: CompletenessComplete, }, nil } func postJSONRPC(ctx context.Context, rpcURL string, method string, params []interface{}) (json.RawMessage, int64, error) { body, err := json.Marshal(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, "method": method, "params": params, }) if err != nil { return nil, 0, err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, rpcURL, bytes.NewReader(body)) if err != nil { return nil, 0, err } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 6 * time.Second} start := time.Now() resp, err := client.Do(req) latency := time.Since(start).Milliseconds() if err != nil { return nil, latency, err } defer resp.Body.Close() payload, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if err != nil { return nil, latency, err } if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, latency, fmt.Errorf("http %d", resp.StatusCode) } var out struct { Result json.RawMessage `json:"result"` Error *struct { Message string `json:"message"` } `json:"error"` } if err := json.Unmarshal(payload, &out); err != nil { return nil, latency, err } if out.Error != nil && out.Error.Message != "" { return nil, latency, fmt.Errorf("rpc error: %s", out.Error.Message) } return out.Result, latency, nil }