- Introduced a new Diagnostics struct to capture transaction visibility state and activity state. - Updated BuildSnapshot function to return diagnostics alongside snapshot, completeness, and sampling. - Enhanced test cases to validate the new diagnostics data. - Updated frontend components to utilize the new diagnostics information for improved user feedback on freshness context. This change improves the observability of transaction activity and enhances the user experience by providing clearer insights into the freshness of data.
545 lines
17 KiB
Go
545 lines
17 KiB
Go
package freshness
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
type QueryRowFunc func(ctx context.Context, sql string, args ...any) pgx.Row
|
|
|
|
type Confidence string
|
|
|
|
const (
|
|
ConfidenceHigh Confidence = "high"
|
|
ConfidenceMedium Confidence = "medium"
|
|
ConfidenceLow Confidence = "low"
|
|
ConfidenceUnknown Confidence = "unknown"
|
|
)
|
|
|
|
type Completeness string
|
|
|
|
const (
|
|
CompletenessComplete Completeness = "complete"
|
|
CompletenessPartial Completeness = "partial"
|
|
CompletenessStale Completeness = "stale"
|
|
CompletenessUnavailable Completeness = "unavailable"
|
|
)
|
|
|
|
type Source string
|
|
|
|
const (
|
|
SourceReported Source = "reported"
|
|
SourceDerived Source = "derived"
|
|
SourceSampled Source = "sampled"
|
|
SourceUnavailable Source = "unavailable"
|
|
)
|
|
|
|
type Provenance string
|
|
|
|
const (
|
|
ProvenanceRPC Provenance = "rpc"
|
|
ProvenanceExplorerIndex Provenance = "explorer_index"
|
|
ProvenanceTxIndex Provenance = "tx_index"
|
|
ProvenanceMissionFeed Provenance = "mission_control_feed"
|
|
ProvenanceComposite Provenance = "composite"
|
|
)
|
|
|
|
type Reference struct {
|
|
BlockNumber *int64 `json:"block_number"`
|
|
Timestamp *string `json:"timestamp"`
|
|
AgeSeconds *int64 `json:"age_seconds"`
|
|
Hash *string `json:"hash,omitempty"`
|
|
DistanceFromHead *int64 `json:"distance_from_head,omitempty"`
|
|
Source Source `json:"source"`
|
|
Confidence Confidence `json:"confidence"`
|
|
Provenance Provenance `json:"provenance"`
|
|
Completeness Completeness `json:"completeness,omitempty"`
|
|
}
|
|
|
|
type Snapshot struct {
|
|
ChainHead Reference `json:"chain_head"`
|
|
LatestIndexedBlock Reference `json:"latest_indexed_block"`
|
|
LatestIndexedTransaction Reference `json:"latest_indexed_transaction"`
|
|
LatestNonEmptyBlock Reference `json:"latest_non_empty_block"`
|
|
}
|
|
|
|
type Diagnostics struct {
|
|
TxVisibilityState string `json:"tx_visibility_state"`
|
|
ActivityState string `json:"activity_state"`
|
|
Explanation string `json:"explanation,omitempty"`
|
|
TxLagBlocks *int64 `json:"tx_lag_blocks,omitempty"`
|
|
TxLagSeconds *int64 `json:"tx_lag_seconds,omitempty"`
|
|
RecentBlockSampleSize *int64 `json:"recent_block_sample_size,omitempty"`
|
|
RecentNonEmptyBlocks *int64 `json:"recent_non_empty_blocks,omitempty"`
|
|
RecentTransactions *int64 `json:"recent_transactions,omitempty"`
|
|
LatestNonEmptyFromBlockFeed Reference `json:"latest_non_empty_block_from_block_feed"`
|
|
Source Source `json:"source"`
|
|
Confidence Confidence `json:"confidence"`
|
|
Provenance Provenance `json:"provenance"`
|
|
Completeness Completeness `json:"completeness"`
|
|
}
|
|
|
|
type SummaryCompleteness struct {
|
|
TransactionsFeed Completeness `json:"transactions_feed"`
|
|
BlocksFeed Completeness `json:"blocks_feed"`
|
|
GasMetrics Completeness `json:"gas_metrics"`
|
|
UtilizationMetric Completeness `json:"utilization_metrics"`
|
|
}
|
|
|
|
type Sampling struct {
|
|
StatsGeneratedAt *string `json:"stats_generated_at"`
|
|
RPCProbeAt *string `json:"rpc_probe_at"`
|
|
StatsWindowSec *int64 `json:"stats_window_seconds,omitempty"`
|
|
Issues map[string]string `json:"issues,omitempty"`
|
|
}
|
|
|
|
type HeadProbeFunc func(ctx context.Context) (*Reference, error)
|
|
|
|
func ptrInt64(value int64) *int64 { return &value }
|
|
|
|
func ptrString(value string) *string { return &value }
|
|
|
|
func unknownReference(provenance Provenance) Reference {
|
|
return Reference{
|
|
Source: SourceUnavailable,
|
|
Confidence: ConfidenceUnknown,
|
|
Provenance: provenance,
|
|
Completeness: CompletenessUnavailable,
|
|
}
|
|
}
|
|
|
|
func timePointer(value time.Time) *string {
|
|
if value.IsZero() {
|
|
return nil
|
|
}
|
|
formatted := value.UTC().Format(time.RFC3339)
|
|
return &formatted
|
|
}
|
|
|
|
func computeAge(timestamp *string, now time.Time) *int64 {
|
|
if timestamp == nil || *timestamp == "" {
|
|
return nil
|
|
}
|
|
parsed, err := time.Parse(time.RFC3339, *timestamp)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
age := int64(now.Sub(parsed).Seconds())
|
|
if age < 0 {
|
|
age = 0
|
|
}
|
|
return &age
|
|
}
|
|
|
|
func classifyIndexedVisibility(age *int64) Completeness {
|
|
if age == nil {
|
|
return CompletenessUnavailable
|
|
}
|
|
switch {
|
|
case *age <= 15*60:
|
|
return CompletenessComplete
|
|
case *age <= 3*60*60:
|
|
return CompletenessPartial
|
|
default:
|
|
return CompletenessStale
|
|
}
|
|
}
|
|
|
|
func classifyBlockFeed(chainHead *int64, indexedHead *int64) Completeness {
|
|
if chainHead == nil || indexedHead == nil {
|
|
return CompletenessUnavailable
|
|
}
|
|
distance := *chainHead - *indexedHead
|
|
if distance < 0 {
|
|
distance = 0
|
|
}
|
|
switch {
|
|
case distance <= 2:
|
|
return CompletenessComplete
|
|
case distance <= 32:
|
|
return CompletenessPartial
|
|
default:
|
|
return CompletenessStale
|
|
}
|
|
}
|
|
|
|
func classifyMetricPresence[T comparable](value *T) Completeness {
|
|
if value == nil {
|
|
return CompletenessUnavailable
|
|
}
|
|
return CompletenessComplete
|
|
}
|
|
|
|
func classifyTxVisibilityState(age *int64) string {
|
|
if age == nil {
|
|
return "unavailable"
|
|
}
|
|
switch {
|
|
case *age <= 15*60:
|
|
return "current"
|
|
case *age <= 3*60*60:
|
|
return "lagging"
|
|
default:
|
|
return "stale"
|
|
}
|
|
}
|
|
|
|
func classifyActivityState(txVisibility string, txLagBlocks, recentTransactions, recentNonEmptyBlocks *int64) (string, string, Completeness) {
|
|
if txVisibility == "unavailable" {
|
|
if recentTransactions != nil && *recentTransactions > 0 {
|
|
return "limited_observability", "Recent blocks show on-chain transaction activity, but indexed transaction freshness is unavailable.", CompletenessPartial
|
|
}
|
|
return "limited_observability", "Transaction freshness is unavailable, and recent block activity is limited.", CompletenessUnavailable
|
|
}
|
|
|
|
if recentTransactions != nil && *recentTransactions > 0 {
|
|
if txLagBlocks != nil && *txLagBlocks > 32 {
|
|
return "fresh_head_stale_transaction_visibility", "Recent block activity is present closer to the head than the visible indexed transaction feed.", CompletenessPartial
|
|
}
|
|
if *recentTransactions <= 3 {
|
|
return "sparse_activity", "Recent blocks contain only a small amount of transaction activity.", CompletenessComplete
|
|
}
|
|
return "active", "Recent blocks contain visible transaction activity close to the head.", CompletenessComplete
|
|
}
|
|
|
|
if recentNonEmptyBlocks != nil && *recentNonEmptyBlocks == 0 {
|
|
return "quiet_chain", "Recent sampled head blocks are empty, which indicates a quiet chain rather than a broken explorer.", CompletenessComplete
|
|
}
|
|
|
|
if txLagBlocks != nil && *txLagBlocks > 32 {
|
|
return "fresh_head_stale_transaction_visibility", "The chain head is current, but the indexed transaction feed trails the current tip.", CompletenessPartial
|
|
}
|
|
|
|
return "sparse_activity", "Recent visible transaction activity is limited.", CompletenessComplete
|
|
}
|
|
|
|
func BuildSnapshot(
|
|
ctx context.Context,
|
|
chainID int,
|
|
queryRow QueryRowFunc,
|
|
probeHead HeadProbeFunc,
|
|
now time.Time,
|
|
averageGasPrice *float64,
|
|
utilization *float64,
|
|
) (Snapshot, SummaryCompleteness, Sampling, Diagnostics, error) {
|
|
snapshot := Snapshot{
|
|
ChainHead: unknownReference(ProvenanceRPC),
|
|
LatestIndexedBlock: unknownReference(ProvenanceExplorerIndex),
|
|
LatestIndexedTransaction: unknownReference(ProvenanceTxIndex),
|
|
LatestNonEmptyBlock: unknownReference(ProvenanceTxIndex),
|
|
}
|
|
diagnostics := Diagnostics{
|
|
TxVisibilityState: "unavailable",
|
|
ActivityState: "limited_observability",
|
|
LatestNonEmptyFromBlockFeed: unknownReference(ProvenanceExplorerIndex),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceMedium,
|
|
Provenance: ProvenanceComposite,
|
|
Completeness: CompletenessUnavailable,
|
|
}
|
|
issues := map[string]string{}
|
|
|
|
if probeHead != nil {
|
|
if head, err := probeHead(ctx); err == nil && head != nil {
|
|
snapshot.ChainHead = *head
|
|
} else if err != nil {
|
|
issues["chain_head"] = err.Error()
|
|
}
|
|
}
|
|
|
|
var latestIndexedBlockNumber int64
|
|
var latestIndexedBlockTime time.Time
|
|
if err := queryRow(ctx,
|
|
`SELECT number, timestamp
|
|
FROM blocks
|
|
ORDER BY number DESC
|
|
LIMIT 1`,
|
|
).Scan(&latestIndexedBlockNumber, &latestIndexedBlockTime); err == nil {
|
|
timestamp := timePointer(latestIndexedBlockTime)
|
|
snapshot.LatestIndexedBlock = Reference{
|
|
BlockNumber: ptrInt64(latestIndexedBlockNumber),
|
|
Timestamp: timestamp,
|
|
AgeSeconds: computeAge(timestamp, now),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceExplorerIndex,
|
|
Completeness: CompletenessComplete,
|
|
}
|
|
} else {
|
|
issues["latest_indexed_block"] = err.Error()
|
|
}
|
|
|
|
var latestTxHash string
|
|
var latestTxBlock int64
|
|
var latestTxCreatedAt time.Time
|
|
if err := queryRow(ctx,
|
|
`SELECT concat('0x', encode(hash, 'hex')), block_number::bigint, COALESCE(block_timestamp, inserted_at)
|
|
FROM transactions
|
|
WHERE block_number IS NOT NULL
|
|
ORDER BY block_number DESC, "index" DESC
|
|
LIMIT 1`,
|
|
).Scan(&latestTxHash, &latestTxBlock, &latestTxCreatedAt); err == nil {
|
|
timestamp := timePointer(latestTxCreatedAt)
|
|
snapshot.LatestIndexedTransaction = Reference{
|
|
BlockNumber: ptrInt64(latestTxBlock),
|
|
Timestamp: timestamp,
|
|
AgeSeconds: computeAge(timestamp, now),
|
|
Hash: ptrString(latestTxHash),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceTxIndex,
|
|
Completeness: classifyIndexedVisibility(computeAge(timestamp, now)),
|
|
}
|
|
} else {
|
|
issues["latest_indexed_transaction"] = err.Error()
|
|
}
|
|
|
|
var latestNonEmptyBlockNumber int64
|
|
var latestNonEmptyBlockTime time.Time
|
|
if err := queryRow(ctx,
|
|
`SELECT b.number, b.timestamp
|
|
FROM blocks b
|
|
WHERE EXISTS (
|
|
SELECT 1
|
|
FROM transactions t
|
|
WHERE t.block_number = b.number
|
|
)
|
|
ORDER BY b.number DESC
|
|
LIMIT 1`,
|
|
).Scan(&latestNonEmptyBlockNumber, &latestNonEmptyBlockTime); err == nil {
|
|
timestamp := timePointer(latestNonEmptyBlockTime)
|
|
ref := Reference{
|
|
BlockNumber: ptrInt64(latestNonEmptyBlockNumber),
|
|
Timestamp: timestamp,
|
|
AgeSeconds: computeAge(timestamp, now),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceTxIndex,
|
|
Completeness: classifyIndexedVisibility(computeAge(timestamp, now)),
|
|
}
|
|
if snapshot.ChainHead.BlockNumber != nil {
|
|
distance := *snapshot.ChainHead.BlockNumber - latestNonEmptyBlockNumber
|
|
if distance < 0 {
|
|
distance = 0
|
|
}
|
|
ref.DistanceFromHead = ptrInt64(distance)
|
|
}
|
|
snapshot.LatestNonEmptyBlock = ref
|
|
} else {
|
|
issues["latest_non_empty_block"] = err.Error()
|
|
}
|
|
|
|
var latestBlockFeedNonEmptyNumber int64
|
|
var latestBlockFeedNonEmptyTime time.Time
|
|
if err := queryRow(ctx,
|
|
`SELECT b.number, b.timestamp
|
|
FROM blocks b
|
|
JOIN (
|
|
SELECT DISTINCT block_number
|
|
FROM transactions
|
|
WHERE block_number IS NOT NULL
|
|
) tx_blocks
|
|
ON tx_blocks.block_number = b.number
|
|
ORDER BY b.number DESC
|
|
LIMIT 1`,
|
|
).Scan(&latestBlockFeedNonEmptyNumber, &latestBlockFeedNonEmptyTime); err == nil {
|
|
timestamp := timePointer(latestBlockFeedNonEmptyTime)
|
|
ref := Reference{
|
|
BlockNumber: ptrInt64(latestBlockFeedNonEmptyNumber),
|
|
Timestamp: timestamp,
|
|
AgeSeconds: computeAge(timestamp, now),
|
|
Source: SourceDerived,
|
|
Confidence: ConfidenceMedium,
|
|
Provenance: ProvenanceComposite,
|
|
Completeness: snapshot.LatestIndexedTransaction.Completeness,
|
|
}
|
|
if snapshot.ChainHead.BlockNumber != nil {
|
|
distance := *snapshot.ChainHead.BlockNumber - latestBlockFeedNonEmptyNumber
|
|
if distance < 0 {
|
|
distance = 0
|
|
}
|
|
ref.DistanceFromHead = ptrInt64(distance)
|
|
}
|
|
diagnostics.LatestNonEmptyFromBlockFeed = ref
|
|
} else {
|
|
issues["latest_non_empty_block_from_block_feed"] = err.Error()
|
|
}
|
|
|
|
var recentBlockSampleSize, recentNonEmptyBlocks, recentTransactions int64
|
|
if err := queryRow(ctx,
|
|
`SELECT COUNT(*)::bigint,
|
|
COUNT(*) FILTER (WHERE COALESCE(tx_counts.tx_count, 0) > 0)::bigint,
|
|
COALESCE(SUM(COALESCE(tx_counts.tx_count, 0)), 0)::bigint
|
|
FROM (
|
|
SELECT number
|
|
FROM blocks
|
|
ORDER BY number DESC
|
|
LIMIT 128
|
|
) recent_blocks
|
|
LEFT JOIN (
|
|
SELECT block_number, COUNT(*)::bigint AS tx_count
|
|
FROM transactions
|
|
WHERE block_number IS NOT NULL
|
|
GROUP BY block_number
|
|
) tx_counts
|
|
ON tx_counts.block_number = recent_blocks.number`,
|
|
).Scan(&recentBlockSampleSize, &recentNonEmptyBlocks, &recentTransactions); err == nil {
|
|
diagnostics.RecentBlockSampleSize = ptrInt64(recentBlockSampleSize)
|
|
diagnostics.RecentNonEmptyBlocks = ptrInt64(recentNonEmptyBlocks)
|
|
diagnostics.RecentTransactions = ptrInt64(recentTransactions)
|
|
} else {
|
|
issues["recent_block_activity"] = err.Error()
|
|
}
|
|
|
|
if snapshot.ChainHead.BlockNumber != nil && snapshot.LatestIndexedTransaction.BlockNumber != nil {
|
|
lag := *snapshot.ChainHead.BlockNumber - *snapshot.LatestIndexedTransaction.BlockNumber
|
|
if lag < 0 {
|
|
lag = 0
|
|
}
|
|
diagnostics.TxLagBlocks = ptrInt64(lag)
|
|
}
|
|
diagnostics.TxLagSeconds = snapshot.LatestIndexedTransaction.AgeSeconds
|
|
diagnostics.TxVisibilityState = classifyTxVisibilityState(snapshot.LatestIndexedTransaction.AgeSeconds)
|
|
diagnostics.ActivityState, diagnostics.Explanation, diagnostics.Completeness = classifyActivityState(
|
|
diagnostics.TxVisibilityState,
|
|
diagnostics.TxLagBlocks,
|
|
diagnostics.RecentTransactions,
|
|
diagnostics.RecentNonEmptyBlocks,
|
|
)
|
|
|
|
statsGeneratedAt := now.UTC().Format(time.RFC3339)
|
|
sampling := Sampling{
|
|
StatsGeneratedAt: ptrString(statsGeneratedAt),
|
|
StatsWindowSec: ptrInt64(300),
|
|
}
|
|
if len(issues) > 0 {
|
|
sampling.Issues = issues
|
|
}
|
|
if snapshot.ChainHead.Timestamp != nil {
|
|
sampling.RPCProbeAt = snapshot.ChainHead.Timestamp
|
|
}
|
|
|
|
completeness := SummaryCompleteness{
|
|
TransactionsFeed: snapshot.LatestIndexedTransaction.Completeness,
|
|
BlocksFeed: classifyBlockFeed(snapshot.ChainHead.BlockNumber, snapshot.LatestIndexedBlock.BlockNumber),
|
|
GasMetrics: classifyMetricPresence(averageGasPrice),
|
|
UtilizationMetric: classifyMetricPresence(utilization),
|
|
}
|
|
|
|
return snapshot, completeness, sampling, diagnostics, nil
|
|
}
|
|
|
|
func ProbeChainHead(ctx context.Context, rpcURL string) (*Reference, error) {
|
|
rpcURL = strings.TrimSpace(rpcURL)
|
|
if rpcURL == "" {
|
|
return nil, fmt.Errorf("empty rpc url")
|
|
}
|
|
|
|
blockNumberRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_blockNumber", []interface{}{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var blockNumberHex string
|
|
if err := json.Unmarshal(blockNumberRaw, &blockNumberHex); err != nil {
|
|
return nil, err
|
|
}
|
|
blockNumber, err := strconv.ParseInt(strings.TrimPrefix(strings.TrimSpace(blockNumberHex), "0x"), 16, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
blockRaw, _, err := postJSONRPC(ctx, rpcURL, "eth_getBlockByNumber", []interface{}{"latest", false})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var latestBlock struct {
|
|
Timestamp string `json:"timestamp"`
|
|
}
|
|
if err := json.Unmarshal(blockRaw, &latestBlock); err != nil {
|
|
return nil, err
|
|
}
|
|
blockTimeHex := strings.TrimSpace(latestBlock.Timestamp)
|
|
if blockTimeHex == "" {
|
|
return nil, fmt.Errorf("missing block timestamp")
|
|
}
|
|
blockTimestamp, err := strconv.ParseInt(strings.TrimPrefix(blockTimeHex, "0x"), 16, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ts := time.Unix(blockTimestamp, 0).UTC()
|
|
timestamp := ts.Format(time.RFC3339)
|
|
now := time.Now().UTC()
|
|
age := int64(now.Sub(ts).Seconds())
|
|
if age < 0 {
|
|
age = 0
|
|
}
|
|
|
|
return &Reference{
|
|
BlockNumber: ptrInt64(blockNumber),
|
|
Timestamp: ptrString(timestamp),
|
|
AgeSeconds: ptrInt64(age),
|
|
Source: SourceReported,
|
|
Confidence: ConfidenceHigh,
|
|
Provenance: ProvenanceRPC,
|
|
Completeness: CompletenessComplete,
|
|
}, nil
|
|
}
|
|
|
|
func postJSONRPC(ctx context.Context, rpcURL string, method string, params []interface{}) (json.RawMessage, int64, error) {
|
|
body, err := json.Marshal(map[string]interface{}{
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": method,
|
|
"params": params,
|
|
})
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, rpcURL, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
client := &http.Client{Timeout: 6 * time.Second}
|
|
start := time.Now()
|
|
resp, err := client.Do(req)
|
|
latency := time.Since(start).Milliseconds()
|
|
if err != nil {
|
|
return nil, latency, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
payload, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
|
if err != nil {
|
|
return nil, latency, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return nil, latency, fmt.Errorf("http %d", resp.StatusCode)
|
|
}
|
|
|
|
var out struct {
|
|
Result json.RawMessage `json:"result"`
|
|
Error *struct {
|
|
Message string `json:"message"`
|
|
} `json:"error"`
|
|
}
|
|
if err := json.Unmarshal(payload, &out); err != nil {
|
|
return nil, latency, err
|
|
}
|
|
if out.Error != nil && out.Error.Message != "" {
|
|
return nil, latency, fmt.Errorf("rpc error: %s", out.Error.Message)
|
|
}
|
|
return out.Result, latency, nil
|
|
}
|