- Submodule pins: dbis_core, cross-chain-pmm-lps, mcp-proxmox (local, push may be pending), metamask-integration, smom-dbis-138 - Atomic swap + cross-chain-pmm-lops-publish, deploy-portal workflow, phoenix deploy-targets, routing/aggregator matrices - Docs, token-lists, forge proxy, phoenix API, runbooks, verify scripts Made-with: Cursor
181 lines
6.2 KiB
Python
181 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Merge deployment-status.json + Uni V2 pair-discovery report into a mesh matrix.
|
|
|
|
Read-only: does not call RPC. Use after running promod_uniswap_v2_live_pair_discovery.py
|
|
or pointing at an existing reports/extraction/promod-uniswap-v2-live-pair-discovery-*.json.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
DEFAULT_STATUS = ROOT / "cross-chain-pmm-lps" / "config" / "deployment-status.json"
|
|
DEFAULT_DISCOVERY = ROOT / "reports" / "extraction" / "promod-uniswap-v2-live-pair-discovery-latest.json"
|
|
|
|
|
|
def load_json(path: Path) -> dict:
|
|
return json.loads(path.read_text())
|
|
|
|
|
|
def pmm_settlement_pools(pools: list) -> list[str]:
|
|
out: list[str] = []
|
|
for x in pools or []:
|
|
b, q = x.get("base"), x.get("quote")
|
|
if b in ("cWUSDC", "cWUSDT") and q in ("USDC", "USDT"):
|
|
out.append(f"{b}/{q}")
|
|
return sorted(set(out))
|
|
|
|
|
|
def pmm_has_cw_mesh(pools: list) -> bool:
|
|
for x in pools or []:
|
|
if x.get("base") == "cWUSDT" and x.get("quote") == "cWUSDC":
|
|
return True
|
|
return False
|
|
|
|
|
|
def discovery_mesh_entry(entries: list, chain_id: int) -> dict | None:
|
|
for e in entries:
|
|
if int(e.get("chain_id", -1)) == chain_id:
|
|
return e
|
|
return None
|
|
|
|
|
|
def pair_mesh_state(entry: dict | None) -> tuple[bool | None, bool | None, str | None]:
|
|
"""Returns live, healthy (None if unknown), pool address (None if n/a)."""
|
|
if not entry:
|
|
return None, None, None
|
|
for p in entry.get("pairsChecked") or []:
|
|
if p.get("base") == "cWUSDT" and p.get("quote") == "cWUSDC":
|
|
live = p.get("live")
|
|
h = p.get("health") or {}
|
|
healthy = h.get("healthy")
|
|
addr = p.get("poolAddress") or ""
|
|
if addr in ("", "0x0000000000000000000000000000000000000000"):
|
|
return bool(live), None, None
|
|
return bool(live), bool(healthy) if healthy is not None else None, addr
|
|
return False, None, None
|
|
|
|
|
|
def build_rows(status_path: Path, discovery_path: Path) -> list[dict]:
|
|
dep = load_json(status_path)
|
|
disc_data = load_json(discovery_path)
|
|
entries = disc_data.get("entries") or []
|
|
rows: list[dict] = []
|
|
|
|
for cid_s, info in sorted((dep.get("chains") or {}).items(), key=lambda kv: int(kv[0])):
|
|
cid = int(cid_s)
|
|
cw = info.get("cwTokens") or {}
|
|
pools = info.get("pmmPools") or []
|
|
has_wusdt = "cWUSDT" in cw
|
|
has_wusdc = "cWUSDC" in cw
|
|
d_entry = discovery_mesh_entry(entries, cid)
|
|
live, healthy, pool_addr = pair_mesh_state(d_entry)
|
|
settle = pmm_settlement_pools(pools)
|
|
rows.append(
|
|
{
|
|
"chainId": cid,
|
|
"network": info.get("name", ""),
|
|
"activationState": info.get("activationState", ""),
|
|
"hasCWUSDT": has_wusdt,
|
|
"hasCWUSDC": has_wusdc,
|
|
"cwTokenCount": len(cw),
|
|
"pmmCWUSDTvsCWUSDC": pmm_has_cw_mesh(pools),
|
|
"pmmSettlementRails": settle,
|
|
"uniswapV2PairDiscoveryPresent": d_entry is not None,
|
|
"uniswapV2CWUSDTvsCWUSDCLive": live,
|
|
"uniswapV2CWUSDTvsCWUSDCHealthy": healthy,
|
|
"uniswapV2CWUSDTvsCWDCPool": pool_addr,
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def print_markdown(rows: list[dict], generated_from: dict[str, str]) -> None:
|
|
print("# cW* mesh deployment matrix\n")
|
|
for k, v in generated_from.items():
|
|
print(f"- **{k}:** `{v}`")
|
|
print()
|
|
print(
|
|
"| Chain | Network | cWUSDT | cWUSDC | PMM cWUSDT↔cWUSDC | PMM settlement | "
|
|
"UniV2 cWUSDT/cWUSDC live | healthy | Pool |"
|
|
)
|
|
print("|------:|:---|:---:|:---:|:---:|:---|:---:|:---:|:---|")
|
|
def _fmt_bool(v: bool | None) -> str:
|
|
if v is None:
|
|
return "—"
|
|
return str(v)
|
|
|
|
for r in rows:
|
|
settle = ", ".join(r["pmmSettlementRails"]) if r["pmmSettlementRails"] else "—"
|
|
if len(settle) > 48:
|
|
settle = settle[:45] + "…"
|
|
print(
|
|
f"| {r['chainId']} | {r['network'][:26]} | "
|
|
f"{'✓' if r['hasCWUSDT'] else '—'} | {'✓' if r['hasCWUSDC'] else '—'} | "
|
|
f"{'✓' if r['pmmCWUSDTvsCWUSDC'] else '—'} | {settle} | "
|
|
f"{_fmt_bool(r['uniswapV2CWUSDTvsCWUSDCLive'])} | {_fmt_bool(r['uniswapV2CWUSDTvsCWUSDCHealthy'])} | "
|
|
f"`{r['uniswapV2CWUSDTvsCWDCPool'] or '—'}` |"
|
|
)
|
|
print()
|
|
print("## Notes\n")
|
|
print(
|
|
"- **PMM settlement**: pools where base is `cWUSDC` or `cWUSDT` and quote is `USDC` or `USDT` "
|
|
"in `deployment-status.json`."
|
|
)
|
|
print(
|
|
"- **Uni V2** columns come from the pair-discovery report (reserves/health are snapshot, not live RPC)."
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument(
|
|
"--deployment-status",
|
|
type=Path,
|
|
default=DEFAULT_STATUS,
|
|
help=f"Default: {DEFAULT_STATUS}",
|
|
)
|
|
ap.add_argument(
|
|
"--pair-discovery",
|
|
type=Path,
|
|
default=DEFAULT_DISCOVERY,
|
|
help=f"Default: {DEFAULT_DISCOVERY}",
|
|
)
|
|
ap.add_argument(
|
|
"--json-out",
|
|
type=Path,
|
|
default=None,
|
|
help="Optional path to write machine-readable rows (e.g. reports/status/cw-mesh-deployment-matrix-latest.json)",
|
|
)
|
|
ap.add_argument(
|
|
"--no-markdown",
|
|
action="store_true",
|
|
help="Do not print markdown (useful with --json-out only)",
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
rows = build_rows(args.deployment_status, args.pair_discovery)
|
|
payload = {
|
|
"schemaVersion": "1.0.0",
|
|
"description": "Per-chain merge of deployment-status cwTokens/pmmPools and Uni V2 pair-discovery snapshot.",
|
|
"generatedFrom": {
|
|
"deploymentStatus": str(args.deployment_status.resolve()),
|
|
"pairDiscovery": str(args.pair_discovery.resolve()),
|
|
},
|
|
"rows": rows,
|
|
}
|
|
|
|
if args.json_out:
|
|
args.json_out.parent.mkdir(parents=True, exist_ok=True)
|
|
args.json_out.write_text(json.dumps(payload, indent=2) + "\n")
|
|
|
|
if not args.no_markdown:
|
|
print_markdown(rows, payload["generatedFrom"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|