399 lines
11 KiB
Go
399 lines
11 KiB
Go
package freshness
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
type QueryRowFunc func(ctx context.Context, sql string, args ...any) pgx.Row
|
|
|
|
type Confidence string
|
|
|
|
const (
|
|
ConfidenceHigh Confidence = "high"
|
|
ConfidenceMedium Confidence = "medium"
|
|
ConfidenceLow Confidence = "low"
|
|
ConfidenceUnknown Confidence = "unknown"
|
|
)
|
|
|
|
type Completeness string
|
|
|
|
const (
|
|
CompletenessComplete Completeness = "complete"
|
|
CompletenessPartial Completeness = "partial"
|
|
CompletenessStale Completeness = "stale"
|
|
CompletenessUnavailable Completeness = "unavailable"
|
|
)
|
|
|
|
type Source string
|
|
|
|
const (
|
|
SourceReported Source = "reported"
|
|
SourceDerived Source = "derived"
|
|
SourceSampled Source = "sampled"
|
|
SourceUnavailable Source = "unavailable"
|
|
)
|
|
|
|
type Provenance string
|
|
|
|
const (
|
|
ProvenanceRPC Provenance = "rpc"
|
|
ProvenanceExplorerIndex Provenance = "explorer_index"
|
|
ProvenanceTxIndex Provenance = "tx_index"
|
|
ProvenanceMissionFeed Provenance = "mission_control_feed"
|
|
ProvenanceComposite Provenance = "composite"
|
|
)
|
|
|
|
type Reference struct {
|
|
BlockNumber *int64 `json:"block_number"`
|
|
Timestamp *string `json:"timestamp"`
|
|
AgeSeconds *int64 `json:"age_seconds"`
|
|
Hash *string `json:"hash,omitempty"`
|
|
DistanceFromHead *int64 `json:"distance_from_head,omitempty"`
|
|
Source Source `json:"source"`
|
|
Confidence Confidence `json:"confidence"`
|
|
Provenance Provenance `json:"provenance"`
|
|
Completeness Completeness `json:"completeness,omitempty"`
|
|
}
|
|
|
|
type Snapshot struct {
|
|
ChainHead Reference `json:"chain_head"`
|
|
LatestIndexedBlock Reference `json:"latest_indexed_block"`
|
|
LatestIndexedTransaction Reference `json:"latest_indexed_transaction"`
|
|
LatestNonEmptyBlock Reference `json:"latest_non_empty_block"`
|
|
}
|
|
|
|
type SummaryCompleteness struct {
|
|
TransactionsFeed Completeness `json:"transactions_feed"`
|
|
BlocksFeed Completeness `json:"blocks_feed"`
|
|
GasMetrics Completeness `json:"gas_metrics"`
|
|
UtilizationMetric Completeness `json:"utilization_metrics"`
|
|
}
|
|
|
|
type Sampling struct {
|
|
StatsGeneratedAt *string `json:"stats_generated_at"`
|
|
RPCProbeAt *string `json:"rpc_probe_at"`
|
|
StatsWindowSec *int64 `json:"stats_window_seconds,omitempty"`
|
|
Issues map[string]string `json:"issues,omitempty"`
|
|
}
|
|
|
|
type HeadProbeFunc func(ctx context.Context) (*Reference, error)
|
|
|
|
func ptrInt64(value int64) *int64 { return &value }
|
|
|
|
func ptrString(value string) *string { return &value }
|
|
|
|
func unknownReference(provenance Provenance) Reference {
|
|
return Reference{
|
|
Source: SourceUnavailable,
|
|
Confidence: ConfidenceUnknown,
|
|
Provenance: provenance,
|
|
Completeness: CompletenessUnavailable,
|
|
}
|
|
}
|
|
|
|
func timePointer(value time.Time) *string {
|
|
if value.IsZero() {
|
|
return nil
|
|
}
|
|
formatted := value.UTC().Format(time.RFC3339)
|
|
return &formatted
|
|
}
|
|
|
|
func computeAge(timestamp *string, now time.Time) *int64 {
|
|
if timestamp == nil || *timestamp == "" {
|
|
return nil
|
|
}
|
|
parsed, err := time.Parse(time.RFC3339, *timestamp)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
age := int64(now.Sub(parsed).Seconds())
|
|
if age < 0 {
|
|
age = 0
|
|
}
|
|
return &age
|
|
}
|
|
|
|
func classifyIndexedVisibility(age *int64) Completeness {
|
|
if age == nil {
|
|
return CompletenessUnavailable
|
|
}
|
|
switch {
|
|
case *age <= 15*60:
|
|
return CompletenessComplete
|
|
case *age <= 3*60*60:
|
|
return CompletenessPartial
|
|
default:
|
|
return CompletenessStale
|
|
}
|
|
}
|
|
|
|
func classifyBlockFeed(chainHead *int64, indexedHead *int64) Completeness {
|
|
if chainHead == nil || indexedHead == nil {
|
|
return CompletenessUnavailable
|
|
}
|
|
distance := *chainHead - *indexedHead
|
|
if distance < 0 {
|
|
distance = 0
|
|
}
|
|
switch {
|
|
case distance <= 2:
|
|
return CompletenessComplete
|
|
case distance <= 32:
|
|
return CompletenessPartial
|
|
default:
|
|
return CompletenessStale
|
|
}
|
|
}
|
|
|
|
func classifyMetricPresence[T comparable](value *T) Completeness {
|
|
if value == nil {
|
|
return CompletenessUnavailable
|
|
}
|
|
return CompletenessComplete
|
|
}
|
|
|
|
func BuildSnapshot(
|
|
ctx context.Context,
|
|
chainID int,
|
|
queryRow QueryRowFunc,
|
|
probeHead HeadProbeFunc,
|
|
now time.Time,
|
|
averageGasPrice *float64,
|
|
utilization *float64,
|
|
) (Snapshot, SummaryCompleteness, Sampling, error) {
|
|
snapshot := Snapshot{
|
|
ChainHead: unknownReference(ProvenanceRPC),
|
|
LatestIndexedBlock: unknownReference(ProvenanceExplorerIndex),
|
|
LatestIndexedTransaction: unknownReference(ProvenanceTxIndex),
|
|
LatestNonEmptyBlock: unknownReference(ProvenanceTxIndex),
|
|
}
|
|
issues := map[string]string{}
|
|
|
|
if probeHead != nil {
|
|
if head, err := probeHead(ctx); err == nil && head != nil {
|
|
snapshot.ChainHead = *head
|
|
} else if err != nil {
|
|
issues["chain_head"] = err.Error()
|
|
}
|
|
}
|
|
|
|
var latestIndexedBlockNumber int64
|
|
var latestIndexedBlockTime time.Time
|
|
if err := queryRow(ctx,
|
|
`SELECT number, timestamp
|
|
FROM blocks
|
|
ORDER BY number DESC
|
|
LIMIT 1`,
|
|
).Scan(&latestIndexedBlockNumber, &latestIndexedBlockTime); err == nil {
|
|
timestamp := timePointer(latestIndexedBlockTime)
|
|
snapshot.LatestIndexedBlock = Reference{
|
|
BlockNumber: ptrInt64(latestIndexedBlockNumber),
|
|
Timestamp: timestamp,
|
|
AgeSeconds: computeAge(timestamp, now),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceExplorerIndex,
|
|
Completeness: CompletenessComplete,
|
|
}
|
|
} else {
|
|
issues["latest_indexed_block"] = err.Error()
|
|
}
|
|
|
|
var latestTxHash string
|
|
var latestTxBlock int64
|
|
var latestTxCreatedAt time.Time
|
|
if err := queryRow(ctx,
|
|
`SELECT concat('0x', encode(hash, 'hex')), block_number::bigint, COALESCE(block_timestamp, inserted_at)
|
|
FROM transactions
|
|
WHERE block_number IS NOT NULL
|
|
ORDER BY block_number DESC, "index" DESC
|
|
LIMIT 1`,
|
|
).Scan(&latestTxHash, &latestTxBlock, &latestTxCreatedAt); err == nil {
|
|
timestamp := timePointer(latestTxCreatedAt)
|
|
snapshot.LatestIndexedTransaction = Reference{
|
|
BlockNumber: ptrInt64(latestTxBlock),
|
|
Timestamp: timestamp,
|
|
AgeSeconds: computeAge(timestamp, now),
|
|
Hash: ptrString(latestTxHash),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceTxIndex,
|
|
Completeness: classifyIndexedVisibility(computeAge(timestamp, now)),
|
|
}
|
|
} else {
|
|
issues["latest_indexed_transaction"] = err.Error()
|
|
}
|
|
|
|
var latestNonEmptyBlockNumber int64
|
|
var latestNonEmptyBlockTime time.Time
|
|
if err := queryRow(ctx,
|
|
`SELECT b.number, b.timestamp
|
|
FROM blocks b
|
|
WHERE EXISTS (
|
|
SELECT 1
|
|
FROM transactions t
|
|
WHERE t.block_number = b.number
|
|
)
|
|
ORDER BY b.number DESC
|
|
LIMIT 1`,
|
|
).Scan(&latestNonEmptyBlockNumber, &latestNonEmptyBlockTime); err == nil {
|
|
timestamp := timePointer(latestNonEmptyBlockTime)
|
|
ref := Reference{
|
|
BlockNumber: ptrInt64(latestNonEmptyBlockNumber),
|
|
Timestamp: timestamp,
|
|
AgeSeconds: computeAge(timestamp, now),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceTxIndex,
|
|
Completeness: classifyIndexedVisibility(computeAge(timestamp, now)),
|
|
}
|
|
if snapshot.ChainHead.BlockNumber != nil {
|
|
distance := *snapshot.ChainHead.BlockNumber - latestNonEmptyBlockNumber
|
|
if distance < 0 {
|
|
distance = 0
|
|
}
|
|
ref.DistanceFromHead = ptrInt64(distance)
|
|
}
|
|
snapshot.LatestNonEmptyBlock = ref
|
|
} else {
|
|
issues["latest_non_empty_block"] = err.Error()
|
|
}
|
|
|
|
statsGeneratedAt := now.UTC().Format(time.RFC3339)
|
|
sampling := Sampling{
|
|
StatsGeneratedAt: ptrString(statsGeneratedAt),
|
|
StatsWindowSec: ptrInt64(300),
|
|
}
|
|
if len(issues) > 0 {
|
|
sampling.Issues = issues
|
|
}
|
|
if snapshot.ChainHead.Timestamp != nil {
|
|
sampling.RPCProbeAt = snapshot.ChainHead.Timestamp
|
|
}
|
|
|
|
completeness := SummaryCompleteness{
|
|
TransactionsFeed: snapshot.LatestIndexedTransaction.Completeness,
|
|
BlocksFeed: classifyBlockFeed(snapshot.ChainHead.BlockNumber, snapshot.LatestIndexedBlock.BlockNumber),
|
|
GasMetrics: classifyMetricPresence(averageGasPrice),
|
|
UtilizationMetric: classifyMetricPresence(utilization),
|
|
}
|
|
|
|
return snapshot, completeness, sampling, nil
|
|
}
|
|
|
|
func ProbeChainHead(ctx context.Context, rpcURL string) (*Reference, error) {
|
|
rpcURL = strings.TrimSpace(rpcURL)
|
|
if rpcURL == "" {
|
|
return nil, fmt.Errorf("empty rpc url")
|
|
}
|
|
|
|
blockNumberRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_blockNumber", []interface{}{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var blockNumberHex string
|
|
if err := json.Unmarshal(blockNumberRaw, &blockNumberHex); err != nil {
|
|
return nil, err
|
|
}
|
|
blockNumber, err := strconv.ParseInt(strings.TrimPrefix(strings.TrimSpace(blockNumberHex), "0x"), 16, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
blockRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_getBlockByNumber", []interface{}{"latest", false})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var latestBlock struct {
|
|
Timestamp string `json:"timestamp"`
|
|
}
|
|
if err := json.Unmarshal(blockRaw, &latestBlock); err != nil {
|
|
return nil, err
|
|
}
|
|
blockTimeHex := strings.TrimSpace(latestBlock.Timestamp)
|
|
if blockTimeHex == "" {
|
|
return nil, fmt.Errorf("missing block timestamp")
|
|
}
|
|
blockTimestamp, err := strconv.ParseInt(strings.TrimPrefix(blockTimeHex, "0x"), 16, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ts := time.Unix(blockTimestamp, 0).UTC()
|
|
timestamp := ts.Format(time.RFC3339)
|
|
now := time.Now().UTC()
|
|
age := int64(now.Sub(ts).Seconds())
|
|
if age < 0 {
|
|
age = 0
|
|
}
|
|
|
|
return &Reference{
|
|
BlockNumber: ptrInt64(blockNumber),
|
|
Timestamp: ptrString(timestamp),
|
|
AgeSeconds: ptrInt64(age),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceRPC,
|
|
Completeness: CompletenessComplete,
|
|
}, nil
|
|
}
|
|
|
|
func postJSONRPC(ctx context.Context, rpcURL string, method string, params []interface{}) (json.RawMessage, int64, error) {
|
|
body, err := json.Marshal(map[string]interface{}{
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": method,
|
|
"params": params,
|
|
})
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, rpcURL, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
client := &http.Client{Timeout: 6 * time.Second}
|
|
start := time.Now()
|
|
resp, err := client.Do(req)
|
|
latency := time.Since(start).Milliseconds()
|
|
if err != nil {
|
|
return nil, latency, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
payload, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
|
if err != nil {
|
|
return nil, latency, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return nil, latency, fmt.Errorf("http %d", resp.StatusCode)
|
|
}
|
|
|
|
var out struct {
|
|
Result json.RawMessage `json:"result"`
|
|
Error *struct {
|
|
Message string `json:"message"`
|
|
} `json:"error"`
|
|
}
|
|
if err := json.Unmarshal(payload, &out); err != nil {
|
|
return nil, latency, err
|
|
}
|
|
if out.Error != nil && out.Error.Message != "" {
|
|
return nil, latency, fmt.Errorf("rpc error: %s", out.Error.Message)
|
|
}
|
|
return out.Result, latency, nil
|
|
}
|