333 lines
12 KiB
Bash
Executable File
333 lines
12 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
set -euo pipefail
|
|
|
|
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
|
|
CONFIG_DEFAULT="$ROOT/MEV_Bot/mev-platform/config.toml"
|
|
ENV_DEFAULT="$ROOT/config/mev-platform/mev-platform-backend-ct.env.example"
|
|
|
|
CONFIG_PATH="${MEV_CONFIG_PATH:-$CONFIG_DEFAULT}"
|
|
ENV_PATH="${MEV_ENV_FILE:-$ENV_DEFAULT}"
|
|
BASE_URL="${MEV_BASE_URL:-}"
|
|
API_KEY="${MEV_API_KEY:-}"
|
|
CHAIN_ID="${MEV_CHAIN_ID:-1}"
|
|
RPC_URL_OVERRIDE="${MEV_RPC_URL:-}"
|
|
|
|
usage() {
|
|
cat <<'EOF'
|
|
Usage: check-mev-execution-readiness.sh [options]
|
|
|
|
Checks the execution-critical MEV values required to move from shadow mode
|
|
toward live submission. It validates local config/env sources and can compare
|
|
them with the live admin API safety endpoint when a base URL and API key are
|
|
provided.
|
|
|
|
Options:
|
|
--config PATH TOML config file to inspect (default: MEV_Bot/mev-platform/config.toml)
|
|
--env-file PATH Env file to inspect for runtime values (default: config/mev-platform/mev-platform-backend-ct.env.example)
|
|
--base URL Optional admin API base URL, e.g. https://mev.defi-oracle.io
|
|
--api-key KEY Optional API key for protected routes
|
|
--chain ID Chain ID to inspect (default: 1)
|
|
--rpc-url URL Optional RPC URL for on-chain executor checks
|
|
-h, --help Show this help
|
|
|
|
Exit codes:
|
|
0 Ready for live execution inputs
|
|
1 Missing or invalid required values
|
|
2 Usage error
|
|
EOF
|
|
}
|
|
|
|
while [[ $# -gt 0 ]]; do
|
|
case "$1" in
|
|
--config)
|
|
CONFIG_PATH="$2"
|
|
shift 2
|
|
;;
|
|
--env-file)
|
|
ENV_PATH="$2"
|
|
shift 2
|
|
;;
|
|
--base)
|
|
BASE_URL="$2"
|
|
shift 2
|
|
;;
|
|
--api-key)
|
|
API_KEY="$2"
|
|
shift 2
|
|
;;
|
|
--chain)
|
|
CHAIN_ID="$2"
|
|
shift 2
|
|
;;
|
|
--rpc-url)
|
|
RPC_URL_OVERRIDE="$2"
|
|
shift 2
|
|
;;
|
|
-h|--help)
|
|
usage
|
|
exit 0
|
|
;;
|
|
*)
|
|
echo "Unknown argument: $1" >&2
|
|
usage >&2
|
|
exit 2
|
|
;;
|
|
esac
|
|
done
|
|
|
|
if [[ ! -f "$CONFIG_PATH" ]]; then
|
|
echo "Config file not found: $CONFIG_PATH" >&2
|
|
exit 2
|
|
fi
|
|
|
|
if [[ ! -f "$ENV_PATH" ]]; then
|
|
echo "Env file not found: $ENV_PATH" >&2
|
|
exit 2
|
|
fi
|
|
|
|
python3 - "$CONFIG_PATH" "$ENV_PATH" "$BASE_URL" "$API_KEY" "$CHAIN_ID" "$RPC_URL_OVERRIDE" <<'PY'
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tomllib
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
config_path = Path(sys.argv[1])
|
|
env_path = Path(sys.argv[2])
|
|
base_url = sys.argv[3].rstrip("/")
|
|
api_key = sys.argv[4]
|
|
chain_id = sys.argv[5]
|
|
rpc_url_override = sys.argv[6]
|
|
|
|
|
|
def parse_env_file(path: Path) -> dict[str, str]:
|
|
values: dict[str, str] = {}
|
|
for raw_line in path.read_text().splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
values[key.strip()] = value.strip().strip('"').strip("'")
|
|
return values
|
|
|
|
|
|
def is_zero_address(value: str | None) -> bool:
|
|
if not value:
|
|
return True
|
|
normalized = value.lower()
|
|
return normalized in {
|
|
"",
|
|
"0x0",
|
|
"0x0000000000000000000000000000000000000000",
|
|
}
|
|
|
|
|
|
def run_cast_call(address: str, signature: str, rpc_url: str) -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["cast", "call", address, signature, "--rpc-url", rpc_url],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
except Exception: # noqa: BLE001
|
|
return None
|
|
return result.stdout.strip() or None
|
|
|
|
|
|
def run_cast_wallet_address(private_key: str) -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["cast", "wallet", "address", "--private-key", private_key],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
except Exception: # noqa: BLE001
|
|
return None
|
|
return result.stdout.strip() or None
|
|
|
|
|
|
config = tomllib.loads(config_path.read_text())
|
|
env_values = parse_env_file(env_path)
|
|
chain_key = str(chain_id)
|
|
chains = config.get("chains", {})
|
|
chain = chains.get(chain_key)
|
|
|
|
rows: list[tuple[str, str, str, str]] = []
|
|
issues: list[str] = []
|
|
|
|
|
|
def add_row(name: str, source: str, value: str, status: str) -> None:
|
|
rows.append((name, source, value, status))
|
|
|
|
|
|
signer_key = os.environ.get("MEV_EXECUTOR_PRIVATE_KEY") or env_values.get("MEV_EXECUTOR_PRIVATE_KEY", "")
|
|
signer_address = run_cast_wallet_address(signer_key) if signer_key else None
|
|
if signer_key:
|
|
add_row("MEV_EXECUTOR_PRIVATE_KEY", str(env_path), "(present, masked)", "ok")
|
|
else:
|
|
add_row("MEV_EXECUTOR_PRIVATE_KEY", str(env_path), "(missing)", "missing")
|
|
issues.append("MEV_EXECUTOR_PRIVATE_KEY is not configured")
|
|
|
|
submit_disabled = os.environ.get("MEV_SUBMIT_DISABLED") or env_values.get("MEV_SUBMIT_DISABLED", "")
|
|
truthy = {"1", "true", "yes", "on"}
|
|
if submit_disabled.strip().lower() in truthy:
|
|
add_row("MEV_SUBMIT_DISABLED", str(env_path), submit_disabled or "1", "blocking")
|
|
issues.append("MEV_SUBMIT_DISABLED is enabled")
|
|
else:
|
|
add_row("MEV_SUBMIT_DISABLED", str(env_path), submit_disabled or "0", "ok")
|
|
|
|
supervisor_url = os.environ.get("MEV_SUPERVISOR_URL") or env_values.get("MEV_SUPERVISOR_URL", "")
|
|
if supervisor_url:
|
|
add_row("MEV_SUPERVISOR_URL", str(env_path), supervisor_url, "ok")
|
|
else:
|
|
add_row("MEV_SUPERVISOR_URL", str(env_path), "(missing)", "missing")
|
|
issues.append("MEV_SUPERVISOR_URL is not configured")
|
|
|
|
if chain is None:
|
|
add_row(f"chains.{chain_key}", str(config_path), "(missing chain section)", "missing")
|
|
issues.append(f"chains.{chain_key} section is missing")
|
|
else:
|
|
execution = chain.get("execution", {})
|
|
executor_contract = execution.get("executor_contract", "")
|
|
flash_loan_provider = execution.get("flash_loan_provider", "")
|
|
relay_url = execution.get("relay_url", "")
|
|
rpc_url = rpc_url_override or chain.get("rpc_url", "")
|
|
add_row(
|
|
f"chains.{chain_key}.execution.executor_contract",
|
|
str(config_path),
|
|
executor_contract or "(missing)",
|
|
"ok" if not is_zero_address(executor_contract) else "missing",
|
|
)
|
|
if is_zero_address(executor_contract):
|
|
issues.append(f"chain {chain_key}: executor_contract is zero address")
|
|
add_row(
|
|
f"chains.{chain_key}.execution.flash_loan_provider",
|
|
str(config_path),
|
|
flash_loan_provider or "(missing)",
|
|
"ok" if not is_zero_address(flash_loan_provider) else "missing",
|
|
)
|
|
if is_zero_address(flash_loan_provider):
|
|
issues.append(f"chain {chain_key}: flash_loan_provider is zero address")
|
|
add_row(
|
|
f"chains.{chain_key}.execution.relay_url",
|
|
str(config_path),
|
|
relay_url or "(missing)",
|
|
"ok" if relay_url else "missing",
|
|
)
|
|
if not relay_url:
|
|
issues.append(f"chain {chain_key}: relay_url is missing")
|
|
|
|
factories = chain.get("factories", [])
|
|
router_required_dexes = {"uniswap_v2", "sushiswap"}
|
|
required_factories = [f for f in factories if f.get("dex") in router_required_dexes]
|
|
if not required_factories:
|
|
add_row(
|
|
f"chains.{chain_key}.factories",
|
|
str(config_path),
|
|
"(no router-required V2 factories configured)",
|
|
"warning",
|
|
)
|
|
for factory in required_factories:
|
|
dex = factory.get("dex", "(unknown)")
|
|
router = factory.get("router", "")
|
|
status = "ok" if not is_zero_address(router) else "missing"
|
|
value = router or "(missing)"
|
|
add_row(f"chains.{chain_key}.factories[{dex}].router", str(config_path), value, status)
|
|
if is_zero_address(router):
|
|
issues.append(f"chain {chain_key}: router missing for dex {dex}")
|
|
|
|
if rpc_url and not is_zero_address(executor_contract):
|
|
owner = run_cast_call(executor_contract, "owner()(address)", rpc_url)
|
|
pending_owner = run_cast_call(executor_contract, "pendingOwner()(address)", rpc_url)
|
|
paused = run_cast_call(executor_contract, "paused()(bool)", rpc_url)
|
|
onchain_provider = run_cast_call(executor_contract, "flashLoanProvider()(address)", rpc_url)
|
|
onchain_treasury = run_cast_call(executor_contract, "treasury()(address)", rpc_url)
|
|
|
|
if owner is None:
|
|
add_row(f"chains.{chain_key}.onchain.owner", rpc_url, "(unavailable)", "missing")
|
|
issues.append(f"chain {chain_key}: failed to read owner() from executor")
|
|
else:
|
|
add_row(f"chains.{chain_key}.onchain.owner", rpc_url, owner, "ok")
|
|
if signer_address and owner.lower() != signer_address.lower():
|
|
issues.append(f"chain {chain_key}: executor owner does not match signer address")
|
|
|
|
if pending_owner is None:
|
|
add_row(f"chains.{chain_key}.onchain.pendingOwner", rpc_url, "(unavailable)", "missing")
|
|
issues.append(f"chain {chain_key}: failed to read pendingOwner() from executor")
|
|
else:
|
|
pending_status = "ok" if is_zero_address(pending_owner) else "blocking"
|
|
add_row(f"chains.{chain_key}.onchain.pendingOwner", rpc_url, pending_owner, pending_status)
|
|
if not is_zero_address(pending_owner):
|
|
issues.append(f"chain {chain_key}: pendingOwner must accept ownership")
|
|
|
|
if paused is None:
|
|
add_row(f"chains.{chain_key}.onchain.paused", rpc_url, "(unavailable)", "missing")
|
|
issues.append(f"chain {chain_key}: failed to read paused() from executor")
|
|
else:
|
|
paused_normalized = paused.lower()
|
|
paused_status = "ok" if paused_normalized == "false" else "blocking"
|
|
add_row(f"chains.{chain_key}.onchain.paused", rpc_url, paused, paused_status)
|
|
if paused_normalized != "false":
|
|
issues.append(f"chain {chain_key}: executor is paused")
|
|
|
|
if onchain_provider is None:
|
|
add_row(f"chains.{chain_key}.onchain.flashLoanProvider", rpc_url, "(unavailable)", "missing")
|
|
issues.append(f"chain {chain_key}: failed to read flashLoanProvider() from executor")
|
|
else:
|
|
provider_status = "ok" if onchain_provider.lower() == flash_loan_provider.lower() else "blocking"
|
|
add_row(f"chains.{chain_key}.onchain.flashLoanProvider", rpc_url, onchain_provider, provider_status)
|
|
if onchain_provider.lower() != flash_loan_provider.lower():
|
|
issues.append(f"chain {chain_key}: on-chain flashLoanProvider does not match config")
|
|
|
|
if onchain_treasury is None:
|
|
add_row(f"chains.{chain_key}.onchain.treasury", rpc_url, "(unavailable)", "missing")
|
|
else:
|
|
add_row(f"chains.{chain_key}.onchain.treasury", rpc_url, onchain_treasury, "ok")
|
|
|
|
print("MEV execution readiness")
|
|
print(f"config: {config_path}")
|
|
print(f"env: {env_path}")
|
|
print(f"chain: {chain_key}")
|
|
print("")
|
|
|
|
name_width = max(len(r[0]) for r in rows) if rows else 10
|
|
source_width = max(len(r[1]) for r in rows) if rows else 10
|
|
status_width = max(len(r[3]) for r in rows) if rows else 6
|
|
|
|
for name, source, value, status in rows:
|
|
print(f"{status.upper():<{status_width}} {name:<{name_width}} {source:<{source_width}} {value}")
|
|
|
|
live_payload = None
|
|
live_error = None
|
|
if base_url:
|
|
request = urllib.request.Request(f"{base_url}/api/safety/signer")
|
|
if api_key:
|
|
request.add_header("X-API-Key", api_key)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=10) as response:
|
|
live_payload = json.loads(response.read().decode("utf-8"))
|
|
except Exception as exc: # noqa: BLE001
|
|
live_error = str(exc)
|
|
|
|
if base_url:
|
|
print("")
|
|
print(f"live api: {base_url}/api/safety/signer")
|
|
if live_payload is not None:
|
|
print(json.dumps(live_payload, indent=2, sort_keys=True))
|
|
else:
|
|
print(f"(unavailable: {live_error})")
|
|
|
|
print("")
|
|
if issues:
|
|
print("blocking issues:")
|
|
for issue in issues:
|
|
print(f"- {issue}")
|
|
sys.exit(1)
|
|
|
|
print("ready: no local execution blockers detected")
|
|
PY
|