#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os from datetime import datetime, timezone from decimal import Decimal, getcontext from pathlib import Path import re import subprocess import sys getcontext().prec = 42 ROOT = Path(__file__).resolve().parents[2] CHECKER_PATH = ROOT / "scripts" / "verify" / "check-mainnet-cwusdc-usdc-support-health.py" POLICY_PATH = ROOT / "config" / "extraction" / "mainnet-cwusdc-usdc-support-policy.json" ROOT_ENV_PATH = ROOT / ".env" SMOM_ENV_PATH = ROOT / "smom-dbis-138" / ".env" ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" UINT_RE = re.compile(r"\b\d+\b") ADDRESS_RE = re.compile(r"0x[a-fA-F0-9]{40}") BROADCAST_RECEIVER = ROOT / "smom-dbis-138" / "broadcast" / "DeployAaveQuotePushFlashReceiver.s.sol" / "1" / "run-latest.json" BROADCAST_MANAGER = ROOT / "smom-dbis-138" / "broadcast" / "DeployQuotePushTreasuryManager.s.sol" / "1" / "run-latest.json" def load_json(path: Path) -> dict: return json.loads(path.read_text()) def load_env_file(path: Path) -> dict[str, str]: values: dict[str, str] = {} if not path.exists(): return values for raw_line in path.read_text().splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) values[key.strip()] = value.strip().strip('"').strip("'") return values def merged_env_values() -> dict[str, str]: values: dict[str, str] = {} values.update(load_env_file(ROOT_ENV_PATH)) values.update(load_env_file(SMOM_ENV_PATH)) return values def resolve_env_value(key: str, env_values: dict[str, str], seen: set[str] | None = None) -> str: if seen is None: seen = set() if key in seen: return env_values.get(key, "") seen.add(key) value = os.environ.get(key) or env_values.get(key, "") if value.startswith("${") and value.endswith("}"): inner = value[2:-1] target = inner.split(":-", 1)[0] fallback = inner.split(":-", 1)[1] if ":-" in inner else "" resolved = resolve_env_value(target, env_values, seen) return resolved or fallback return value.rstrip("\r\n") def cast_call(rpc_url: str, target: str, signature: str, *args: str) -> str: cmd = ["cast", "call", target, signature, *args, "--rpc-url", rpc_url] return subprocess.check_output(cmd, text=True).strip() def parse_uint(value: str) -> int: matches = UINT_RE.findall(value) if not matches: raise ValueError(f"could not parse integer from {value!r}") return int(matches[0]) def parse_uints(value: str, count: int) -> list[int]: matches = [int(match) for match in UINT_RE.findall(value)] if len(matches) < count: raise ValueError(f"expected at least {count} integers from {value!r}") return matches[:count] def parse_address(value: str) -> str: match = ADDRESS_RE.search(value) if not match: raise ValueError(f"could not parse address from {value!r}") return match.group(0) def normalize_units(raw: int, decimals: int) -> Decimal: return Decimal(raw) / (Decimal(10) ** decimals) def run_health_checker() -> dict: output = subprocess.check_output(["python3", str(CHECKER_PATH)], text=True) return json.loads(output) def read_latest_create_address(path: Path, expected_contract_name: str) -> str: if not path.exists(): return "" data = load_json(path) for tx in reversed(data.get("transactions", [])): if tx.get("transactionType") == "CREATE" and tx.get("contractName") == expected_contract_name: return str(tx.get("contractAddress") or "").strip() return "" def query_token_meta(rpc_url: str, token: str) -> dict: decimals = parse_uint(cast_call(rpc_url, token, "decimals()(uint8)")) symbol = cast_call(rpc_url, token, "symbol()(string)") return {"address": token, "symbol": symbol, "decimals": decimals} def query_manager_state(rpc_url: str, manager: str) -> dict: quote_token = parse_address(cast_call(rpc_url, manager, "quoteToken()(address)")) receiver = parse_address(cast_call(rpc_url, manager, "receiver()(address)")) state = { "manager": manager, "quoteToken": query_token_meta(rpc_url, quote_token), "receiver": receiver, "receiverOwner": parse_address(cast_call(rpc_url, manager, "receiverOwner()(address)")), "isReceiverOwnedByManager": "true" in cast_call(rpc_url, manager, "isReceiverOwnedByManager()(bool)").lower(), "quoteBalanceRaw": str(parse_uint(cast_call(rpc_url, manager, "quoteBalance()(uint256)"))), "availableQuoteRaw": str(parse_uint(cast_call(rpc_url, manager, "availableQuote()(uint256)"))), "receiverSweepableQuoteRaw": str(parse_uint(cast_call(rpc_url, manager, "receiverSweepableQuote()(uint256)"))), "receiverReserveRetainedRaw": str(parse_uint(cast_call(rpc_url, manager, "receiverReserveRetained()(uint256)"))), "managerReserveRetainedRaw": str(parse_uint(cast_call(rpc_url, manager, "managerReserveRetained()(uint256)"))), "gasRecipient": parse_address(cast_call(rpc_url, manager, "gasRecipient()(address)")), "recycleRecipient": parse_address(cast_call(rpc_url, manager, "recycleRecipient()(address)")), } decimals = state["quoteToken"]["decimals"] state["quoteBalanceUnits"] = str(normalize_units(int(state["quoteBalanceRaw"]), decimals)) state["availableQuoteUnits"] = str(normalize_units(int(state["availableQuoteRaw"]), decimals)) state["receiverSweepableQuoteUnits"] = str(normalize_units(int(state["receiverSweepableQuoteRaw"]), decimals)) state["receiverReserveRetainedUnits"] = str(normalize_units(int(state["receiverReserveRetainedRaw"]), decimals)) state["managerReserveRetainedUnits"] = str(normalize_units(int(state["managerReserveRetainedRaw"]), decimals)) return state def query_receiver_state(rpc_url: str, receiver: str, quote_token: str) -> dict: return { "receiver": receiver, "owner": parse_address(cast_call(rpc_url, receiver, "owner()(address)")), "quoteBalanceRaw": str(parse_uint(cast_call(rpc_url, quote_token, "balanceOf(address)(uint256)", receiver))), } def query_defended_quotes(rpc_url: str, defended_pool: str, trader: str, policy: dict) -> list[dict]: rows: list[dict] = [] for item in policy["managedCycle"]["quoteAmountByDeviationBps"]: amount_raw = int(item["flashQuoteAmountRaw"]) base_out_raw, mt_fee_raw = parse_uints( cast_call(rpc_url, defended_pool, "querySellQuote(address,uint256)(uint256,uint256)", trader, str(amount_raw)), 2, ) rows.append( { "minDeviationBps": int(item["minDeviationBps"]), "flashQuoteAmountRaw": amount_raw, "receiveBaseAmountRaw": str(base_out_raw), "mtFeeRaw": str(mt_fee_raw), } ) return rows def build_summary(snapshot: dict) -> dict: public_health = snapshot["health"]["publicPairHealth"] defended_health = snapshot["health"]["defendedVenueHealth"] treasury = snapshot.get("treasuryManager") or {} return { "publicPairDeviationBps": public_health.get("deviationBps"), "publicPairBaseReserveUnits": public_health.get("baseReserveUnits"), "publicPairQuoteReserveUnits": public_health.get("quoteReserveUnits"), "defendedMidPrice": defended_health.get("midPrice"), "defendedDeviationBps": defended_health.get("deviationBps"), "defendedBaseReserveRaw": defended_health.get("baseReserveRaw"), "defendedQuoteReserveRaw": defended_health.get("quoteReserveRaw"), "managerAvailableQuoteUnits": treasury.get("availableQuoteUnits"), "receiverSweepableQuoteUnits": treasury.get("receiverSweepableQuoteUnits"), "decisionAction": snapshot["health"]["decision"]["action"], "decisionSeverity": snapshot["health"]["decision"]["severity"], "flashQuoteAmountRaw": snapshot["health"]["decision"]["flashQuoteAmountRaw"], } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--out", help="Write the JSON snapshot to this file.") args = parser.parse_args() env_values = merged_env_values() policy = load_json(POLICY_PATH) health = run_health_checker() rpc_url = "" for key in policy["network"].get("rpcEnvKeys", []): rpc_url = resolve_env_value(key, env_values) if rpc_url: break if not rpc_url: raise RuntimeError("missing mainnet RPC URL") manager_addr = resolve_env_value("QUOTE_PUSH_TREASURY_MANAGER_MAINNET", env_values) receiver_addr = resolve_env_value("AAVE_QUOTE_PUSH_RECEIVER_MAINNET", env_values) if not receiver_addr: receiver_addr = read_latest_create_address(BROADCAST_RECEIVER, "AaveQuotePushFlashReceiver") if not manager_addr: manager_addr = read_latest_create_address(BROADCAST_MANAGER, "QuotePushTreasuryManager") defended_pool = health["defendedVenue"]["poolAddress"] treasury_manager = None receiver_state = None defended_quotes = [] warnings: list[str] = [] if manager_addr and manager_addr.lower() != ZERO_ADDRESS: try: treasury_manager = query_manager_state(rpc_url, manager_addr) receiver_addr = treasury_manager["receiver"] except Exception as exc: warnings.append(f"treasury manager query failed: {exc}") else: warnings.append("treasury manager query skipped: QUOTE_PUSH_TREASURY_MANAGER_MAINNET not configured") if receiver_addr and receiver_addr.lower() != ZERO_ADDRESS: quote_token = treasury_manager["quoteToken"]["address"] if treasury_manager else health["publicPair"]["quoteAddress"] try: receiver_state = query_receiver_state(rpc_url, receiver_addr, quote_token) quote_decimals = treasury_manager["quoteToken"]["decimals"] if treasury_manager else 6 receiver_state["quoteBalanceUnits"] = str(normalize_units(int(receiver_state["quoteBalanceRaw"]), quote_decimals)) except Exception as exc: warnings.append(f"receiver query failed: {exc}") else: warnings.append("receiver query skipped: AAVE_QUOTE_PUSH_RECEIVER_MAINNET not configured") trader = receiver_addr if receiver_addr and receiver_addr.lower() != ZERO_ADDRESS else "" if trader: try: defended_quotes = query_defended_quotes(rpc_url, defended_pool, trader, policy) except Exception as exc: warnings.append(f"defended quote query failed: {exc}") else: warnings.append("defended quote query skipped: no receiver configured") snapshot = { "generatedAt": datetime.now(timezone.utc).isoformat(), "mode": "read_only_preflight", "policyPath": str(POLICY_PATH), "checkerPath": str(CHECKER_PATH), "resolvedAddresses": { "receiver": receiver_addr or None, "treasuryManager": manager_addr or None, }, "health": health, "treasuryManager": treasury_manager, "receiver": receiver_state, "defendedLaneQuotes": defended_quotes, "warnings": warnings, } snapshot["summary"] = build_summary(snapshot) output = json.dumps(snapshot, indent=2) if args.out: out_path = Path(args.out) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(output + "\n") print(output) return 0 if __name__ == "__main__": sys.exit(main())