Files
proxmox/scripts/ccip_monitor.py

307 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
CCIP Monitor Service
Monitors Chainlink CCIP message flow, tracks latency, fees, and alerts on failures.
"""
import os
import time
import json
import logging
from typing import Optional, Dict, Any
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
try:
from web3 import Web3
from prometheus_client import Counter, Gauge, Histogram, start_http_server
except ImportError as e:
print(f"Error importing dependencies: {e}")
print("Please install dependencies: pip install web3 prometheus-client")
exit(1)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ccip-monitor')
# Load configuration from environment
RPC_URL = os.getenv('RPC_URL_138', 'http://192.168.11.250:8545')
CCIP_ROUTER_ADDRESS = os.getenv('CCIP_ROUTER_ADDRESS', '')
CCIP_SENDER_ADDRESS = os.getenv('CCIP_SENDER_ADDRESS', '')
LINK_TOKEN_ADDRESS = os.getenv('LINK_TOKEN_ADDRESS', '')
METRICS_PORT = int(os.getenv('METRICS_PORT', '8000'))
CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '60'))
ALERT_WEBHOOK = os.getenv('ALERT_WEBHOOK', '')
# Prometheus metrics
ccip_messages_total = Counter('ccip_messages_total', 'Total CCIP messages processed', ['event'])
ccip_message_fees = Histogram('ccip_message_fees', 'CCIP message fees', buckets=[0.001, 0.01, 0.1, 1, 10, 100])
ccip_message_latency = Histogram('ccip_message_latency_seconds', 'CCIP message latency in seconds', buckets=[1, 5, 10, 30, 60, 300, 600])
ccip_last_block = Gauge('ccip_last_block', 'Last processed block number')
ccip_service_status = Gauge('ccip_service_status', 'Service status (1=healthy, 0=unhealthy)')
ccip_rpc_connected = Gauge('ccip_rpc_connected', 'RPC connection status (1=connected, 0=disconnected)')
# Initialize Web3
w3 = None
last_processed_block = 0
# CCIP Router ABI (minimal - for event monitoring)
CCIP_ROUTER_ABI = [
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "messageId", "type": "bytes32"},
{"indexed": True, "name": "sourceChainSelector", "type": "uint64"},
{"indexed": False, "name": "sender", "type": "address"},
{"indexed": False, "name": "data", "type": "bytes"},
{"indexed": False, "name": "tokenAmounts", "type": "tuple[]"},
{"indexed": False, "name": "feeToken", "type": "address"},
{"indexed": False, "name": "extraArgs", "type": "bytes"}
],
"name": "MessageSent",
"type": "event"
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "messageId", "type": "bytes32"},
{"indexed": True, "name": "sourceChainSelector", "type": "uint64"},
{"indexed": False, "name": "sender", "type": "address"},
{"indexed": False, "name": "data", "type": "bytes"}
],
"name": "MessageExecuted",
"type": "event"
}
]
class HealthCheckHandler(BaseHTTPRequestHandler):
"""HTTP handler for health check endpoint"""
def do_GET(self):
global w3, ccip_service_status, ccip_rpc_connected
try:
# Check RPC connection
if w3 and w3.is_connected():
block_number = w3.eth.block_number
ccip_rpc_connected.set(1)
ccip_service_status.set(1)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'status': 'healthy',
'rpc_connected': True,
'block_number': block_number,
'ccip_router': CCIP_ROUTER_ADDRESS,
'ccip_sender': CCIP_SENDER_ADDRESS
}
self.wfile.write(json.dumps(response).encode())
else:
ccip_rpc_connected.set(0)
ccip_service_status.set(0)
self.send_response(503)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'status': 'unhealthy', 'rpc_connected': False}
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"Health check error: {e}")
ccip_service_status.set(0)
self.send_response(503)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'status': 'error', 'error': str(e)}
self.wfile.write(json.dumps(response).encode())
def log_message(self, format, *args):
# Suppress default logging for health checks
pass
def init_web3() -> bool:
"""Initialize Web3 connection"""
global w3
try:
logger.info(f"Connecting to RPC: {RPC_URL}")
w3 = Web3(Web3.HTTPProvider(RPC_URL, request_kwargs={'timeout': 30}))
if not w3.is_connected():
logger.error(f"Failed to connect to RPC: {RPC_URL}")
return False
chain_id = w3.eth.chain_id
block_number = w3.eth.block_number
logger.info(f"Connected to chain {chain_id}, current block: {block_number}")
return True
except Exception as e:
logger.error(f"Error initializing Web3: {e}")
return False
def monitor_ccip_events():
"""Monitor CCIP Router events"""
global w3, last_processed_block
if not w3 or not w3.is_connected():
logger.warning("Web3 not connected, skipping event monitoring")
return
try:
# Get current block
current_block = w3.eth.block_number
ccip_last_block.set(current_block)
if CCIP_ROUTER_ADDRESS and CCIP_ROUTER_ADDRESS != '':
try:
# Determine block range (check last 100 blocks or since last processed)
from_block = max(last_processed_block + 1 if last_processed_block > 0 else current_block - 100, current_block - 1000)
to_block = current_block
if from_block <= to_block:
logger.debug(f"Checking blocks {from_block} to {to_block} for CCIP events")
# Get MessageSent events using raw get_logs (web3.py 7.x compatible)
try:
# Calculate event signature hash for MessageSent
# MessageSent(bytes32,uint64,address,bytes,(address,uint256)[],address,bytes)
message_sent_signature = "MessageSent(bytes32,uint64,address,bytes,(address,uint256)[],address,bytes)"
message_sent_topic = Web3.keccak(text=message_sent_signature)
# Use get_logs with proper parameters for web3.py 7.x
filter_params = {
"fromBlock": from_block,
"toBlock": to_block,
"address": Web3.to_checksum_address(CCIP_ROUTER_ADDRESS),
"topics": [message_sent_topic.hex()] if hasattr(message_sent_topic, 'hex') else [message_sent_topic]
}
logs = w3.eth.get_logs(filter_params)
for log in logs:
# Handle transaction hash extraction safely
tx_hash = log.get('transactionHash')
if tx_hash:
if isinstance(tx_hash, bytes):
tx_hash_str = tx_hash.hex()
elif hasattr(tx_hash, 'hex'):
tx_hash_str = tx_hash.hex()
else:
tx_hash_str = str(tx_hash)
logger.info(f"CCIP MessageSent event detected: {tx_hash_str}")
ccip_messages_total.labels(event='MessageSent').inc()
except Exception as e:
logger.debug(f"No MessageSent events or error: {e}")
# Get MessageExecuted events using raw get_logs
try:
# MessageExecuted(bytes32,uint64,address,bytes)
message_executed_signature = "MessageExecuted(bytes32,uint64,address,bytes)"
message_executed_topic = Web3.keccak(text=message_executed_signature)
filter_params = {
"fromBlock": from_block,
"toBlock": to_block,
"address": Web3.to_checksum_address(CCIP_ROUTER_ADDRESS),
"topics": [message_executed_topic.hex()] if hasattr(message_executed_topic, 'hex') else [message_executed_topic]
}
logs = w3.eth.get_logs(filter_params)
for log in logs:
# Handle transaction hash extraction safely
tx_hash = log.get('transactionHash')
if tx_hash:
if isinstance(tx_hash, bytes):
tx_hash_str = tx_hash.hex()
elif hasattr(tx_hash, 'hex'):
tx_hash_str = tx_hash.hex()
else:
tx_hash_str = str(tx_hash)
logger.info(f"CCIP MessageExecuted event detected: {tx_hash_str}")
ccip_messages_total.labels(event='MessageExecuted').inc()
except Exception as e:
logger.debug(f"No MessageExecuted events or error: {e}")
last_processed_block = to_block
except Exception as e:
logger.error(f"Error monitoring CCIP events: {e}")
else:
logger.warning("CCIP_ROUTER_ADDRESS not configured, skipping event monitoring")
except Exception as e:
logger.error(f"Error in monitor_ccip_events: {e}")
def start_health_server():
"""Start HTTP server for health checks"""
try:
server = HTTPServer(('0.0.0.0', METRICS_PORT), HealthCheckHandler)
logger.info(f"Health check server started on port {METRICS_PORT}")
server.serve_forever()
except Exception as e:
logger.error(f"Error starting health server: {e}")
def main():
"""Main function"""
logger.info("Starting CCIP Monitor Service")
logger.info(f"RPC URL: {RPC_URL}")
logger.info(f"CCIP Router: {CCIP_ROUTER_ADDRESS}")
logger.info(f"CCIP Sender: {CCIP_SENDER_ADDRESS}")
logger.info(f"Metrics Port: {METRICS_PORT}")
logger.info(f"Check Interval: {CHECK_INTERVAL} seconds")
# Initialize Web3
if not init_web3():
logger.error("Failed to initialize Web3, exiting")
exit(1)
# Start Prometheus metrics server
try:
start_http_server(METRICS_PORT + 1)
logger.info(f"Prometheus metrics server started on port {METRICS_PORT + 1}")
except Exception as e:
logger.warning(f"Could not start Prometheus metrics server: {e}")
# Start health check server in separate thread
health_thread = Thread(target=start_health_server, daemon=True)
health_thread.start()
# Main monitoring loop
logger.info("Starting monitoring loop")
while True:
try:
# Check Web3 connection
if not w3.is_connected():
logger.warning("Web3 connection lost, attempting to reconnect...")
if not init_web3():
logger.error("Failed to reconnect to RPC")
ccip_rpc_connected.set(0)
time.sleep(30)
continue
ccip_rpc_connected.set(1)
# Monitor CCIP events
monitor_ccip_events()
# Sleep until next check
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logger.info("Received interrupt signal, shutting down...")
break
except Exception as e:
logger.error(f"Error in main loop: {e}", exc_info=True)
time.sleep(30)
if __name__ == '__main__':
main()