Files
smom-dbis-138/scripts/configure-network-advanced.py
defiQUG d717b504a6
Some checks failed
CI/CD Pipeline / Solidity Contracts (push) Failing after 1m12s
CI/CD Pipeline / Security Scanning (push) Successful in 2m21s
CI/CD Pipeline / Lint and Format (push) Failing after 36s
CI/CD Pipeline / Terraform Validation (push) Failing after 22s
CI/CD Pipeline / Kubernetes Validation (push) Successful in 25s
HYBX OMNL TypeScript & anchor / token-aggregation build + reconcile artifact (push) Failing after 23s
Validation / validate-genesis (push) Successful in 26s
Validation / validate-terraform (push) Failing after 21s
Validation / validate-kubernetes (push) Failing after 9s
Validation / validate-smart-contracts (push) Failing after 8s
Validation / validate-security (push) Failing after 1m15s
Validation / validate-documentation (push) Failing after 15s
OMNL reconcile anchor / Run omnl:reconcile and upload artifacts (push) Failing after 26s
Verify Deployment / Verify Deployment (push) Failing after 56s
feat(omnl): settlement terminal, compliance gates, and HYBX integration foundation
Add operator settlement terminal UI/API, swift-listener service, compliance
idempotency gates, GRU reserve dashboards, and @dbis/integration-foundation
for typed HYBX/ISO adapter contracts.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-07 21:51:32 -07:00

327 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Advanced Interactive CLI for configuring Besu network
Extended version with additional configuration options
"""
import json
import os
import sys
import shutil
from pathlib import Path
from typing import Dict, Any, List, Optional
import subprocess
import re
import base64
# Import base configuration tool
sys.path.insert(0, str(Path(__file__).parent))
from configure_network import (
NetworkConfigurator,
Colors,
print_header,
print_success,
print_error,
print_warning,
print_info,
input_yes_no,
input_int,
input_with_default,
)
class AdvancedNetworkConfigurator(NetworkConfigurator):
"""Extended network configurator with advanced options"""
def collect_security_config(self):
"""Collect security configuration"""
print_header("Security Configuration")
self.config['security'] = {
'enableNetworkPolicies': input_yes_no("Enable Kubernetes Network Policies", True),
'enableRBAC': input_yes_no("Enable RBAC", True),
'enablePodSecurity': input_yes_no("Enable Pod Security Standards", True),
'enableWAF': input_yes_no("Enable Web Application Firewall (WAF)", True),
}
# Key management
print_info("\nKey Management")
self.config['security']['keyManagement'] = {
'useAzureKeyVault': input_yes_no("Use Azure Key Vault", True),
'enableKeyRotation': input_yes_no("Enable key rotation", False),
'keyRotationInterval': input_int("Key rotation interval (days)", 90, 1, 365) if self.config['security']['keyManagement']['enableKeyRotation'] else 0,
}
# Access control
print_info("\nAccess Control")
self.config['security']['accessControl'] = {
'enableIPWhitelist': input_yes_no("Enable IP whitelisting", False),
'adminIPs': input_with_default("Admin IPs (comma-separated)", "").split(',') if input_yes_no("Enable IP whitelisting", False) else [],
}
print_success("Security configuration collected")
def collect_monitoring_config(self):
"""Collect monitoring configuration"""
print_header("Monitoring Configuration")
self.config['monitoring'] = {
'enabled': input_yes_no("Enable monitoring", True),
'prometheus': {
'enabled': input_yes_no("Enable Prometheus", True),
'retention': input_with_default("Retention period", "30d"),
'scrapeInterval': input_with_default("Scrape interval", "30s"),
},
'grafana': {
'enabled': input_yes_no("Enable Grafana", True),
'adminPassword': input_with_default("Grafana admin password", "admin"),
},
'loki': {
'enabled': input_yes_no("Enable Loki", True),
'retention': input_with_default("Retention period", "7d"),
},
'alertmanager': {
'enabled': input_yes_no("Enable Alertmanager", True),
},
}
# Alerting
if self.config['monitoring']['alertmanager']['enabled']:
print_info("\nAlert Configuration")
self.config['monitoring']['alerts'] = {
'email': input_with_default("Alert email", ""),
'slackWebhook': input_with_default("Slack webhook URL", ""),
}
print_success("Monitoring configuration collected")
def collect_backup_config(self):
"""Collect backup configuration"""
print_header("Backup Configuration")
self.config['backup'] = {
'enabled': input_yes_no("Enable backups", True),
'frequency': input_with_default("Backup frequency (daily/weekly/monthly)", "daily",
lambda x: x in ['daily', 'weekly', 'monthly']),
'retention': input_int("Retention period (days)", 30, 1, 365),
'storageAccount': input_with_default("Storage account name", ""),
'storageContainer': input_with_default("Storage container name", "backups"),
}
print_success("Backup configuration collected")
def collect_oracle_config(self):
"""Collect oracle publisher configuration"""
print_header("Oracle Publisher Configuration")
self.config['oracle'] = {
'enabled': input_yes_no("Enable oracle publisher", True),
'publisherCount': input_int("Number of oracle publishers", 1, 1, 10) if input_yes_no("Enable oracle publisher", True) else 0,
'updateInterval': input_int("Update interval (seconds)", 60, 1, 3600),
'aggregatorAddress': input_with_default("Aggregator contract address", "0x0"),
}
print_success("Oracle configuration collected")
def generate_permissions_config(self):
"""Generate permissions configuration files"""
# Permissions nodes
permissions_nodes = {
'nodes-allowlist': []
}
# Add validator and sentry nodes to allowlist
# This would be populated with actual node enodes after deployment
permissions_nodes_file = self.project_root / "config" / "permissions-nodes.toml"
permissions_nodes_file.parent.mkdir(parents=True, exist_ok=True)
with open(permissions_nodes_file, 'w') as f:
f.write("# Permissions Nodes Configuration\n")
f.write("# Generated by configure-network.py\n\n")
f.write("nodes-allowlist=[]\n")
f.write("# Add node enodes here after deployment\n")
print_success(f"Generated {permissions_nodes_file}")
# Permissions accounts
permissions_accounts = {
'accounts-allowlist': []
}
permissions_accounts_file = self.project_root / "config" / "permissions-accounts.toml"
permissions_accounts_file.parent.mkdir(parents=True, exist_ok=True)
with open(permissions_accounts_file, 'w') as f:
f.write("# Permissions Accounts Configuration\n")
f.write("# Generated by configure-network.py\n\n")
f.write("accounts-allowlist=[]\n")
f.write("# Add allowed account addresses here\n")
print_success(f"Generated {permissions_accounts_file}")
def generate_static_nodes(self):
"""Generate static nodes configuration"""
static_nodes = []
# This would be populated with actual node enodes after deployment
static_nodes_file = self.project_root / "config" / "static-nodes.json"
static_nodes_file.parent.mkdir(parents=True, exist_ok=True)
with open(static_nodes_file, 'w') as f:
json.dump(static_nodes, f, indent=2)
print_success(f"Generated {static_nodes_file}")
print_info("Note: Add node enodes to this file after deployment")
def generate_k8s_network_policies(self):
"""Generate Kubernetes Network Policies"""
if not self.config.get('security', {}).get('enableNetworkPolicies', False):
return
network_policies_dir = self.project_root / "k8s" / "network-policies"
network_policies_dir.mkdir(parents=True, exist_ok=True)
# Default deny policy
default_deny = """apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: besu-network
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
"""
default_deny_file = network_policies_dir / "default-deny.yaml"
with open(default_deny_file, 'w') as f:
f.write(default_deny)
print_success(f"Generated {default_deny_file}")
def generate_monitoring_config(self):
"""Generate monitoring configuration files"""
if not self.config.get('monitoring', {}).get('enabled', False):
return
monitoring_dir = self.project_root / "monitoring"
monitoring_dir.mkdir(parents=True, exist_ok=True)
# Prometheus configuration
if self.config['monitoring']['prometheus']['enabled']:
prometheus_config = {
'global': {
'scrape_interval': self.config['monitoring']['prometheus']['scrapeInterval'],
'evaluation_interval': '30s',
},
'scrape_configs': [
{
'job_name': 'besu',
'static_configs': [
{
'targets': ['besu-validators:9545', 'besu-sentries:9545', 'besu-rpc:9545']
}
]
}
]
}
prometheus_file = monitoring_dir / "prometheus" / "prometheus.yml"
prometheus_file.parent.mkdir(parents=True, exist_ok=True)
with open(prometheus_file, 'w') as f:
import yaml
yaml.dump(prometheus_config, f, default_flow_style=False)
print_success(f"Generated {prometheus_file}")
def generate_backup_config(self):
"""Generate backup configuration"""
if not self.config.get('backup', {}).get('enabled', False):
return
backup_dir = self.project_root / "scripts" / "backup"
backup_dir.mkdir(parents=True, exist_ok=True)
backup_config = {
'frequency': self.config['backup']['frequency'],
'retention': self.config['backup']['retention'],
'storageAccount': self.config['backup']['storageAccount'],
'storageContainer': self.config['backup']['storageContainer'],
}
backup_config_file = backup_dir / "backup-config.json"
with open(backup_config_file, 'w') as f:
json.dump(backup_config, f, indent=2)
print_success(f"Generated {backup_config_file}")
def run(self):
"""Run the advanced configuration process"""
try:
print_header("Advanced Besu Network Configuration Tool")
print_info("This tool will help you configure all necessary files for your Besu network.")
print_warning("Existing configuration files will be backed up.")
if not input_yes_no("Continue?", True):
print_info("Configuration cancelled")
return
# Backup existing files
self.backup_existing_files()
# Collect configuration (base + advanced)
self.collect_genesis_config()
self.collect_network_config()
self.collect_besu_config()
self.collect_deployment_config()
self.collect_security_config()
self.collect_monitoring_config()
self.collect_backup_config()
self.collect_oracle_config()
# Generate files (base + advanced)
print_header("Generating Configuration Files")
self.generate_genesis_json()
self.generate_besu_config('validators')
self.generate_besu_config('sentries')
self.generate_besu_config('rpc')
self.generate_terraform_vars()
self.generate_helm_values()
self.generate_permissions_config()
self.generate_static_nodes()
self.generate_k8s_network_policies()
self.generate_monitoring_config()
self.generate_backup_config()
self.generate_config_summary()
print_header("Configuration Complete")
print_success("All configuration files have been generated successfully!")
print_info(f"Configuration summary: {self.project_root / 'CONFIG_SUMMARY.md'}")
print_info(f"Backup directory: {self.backup_dir}")
print_warning("Please review the generated files before deploying.")
print_info("Next steps:")
print_info("1. Review CONFIG_SUMMARY.md")
print_info("2. Generate validator keys: ./scripts/key-management/generate-validator-keys.sh")
print_info("3. Update permissions-nodes.toml with node enodes after deployment")
print_info("4. Deploy infrastructure: cd terraform && terraform apply")
except KeyboardInterrupt:
print_error("\nConfiguration cancelled by user")
sys.exit(1)
except Exception as e:
print_error(f"Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
def main():
"""Main entry point"""
project_root = Path(__file__).parent.parent
configurator = AdvancedNetworkConfigurator(project_root)
configurator.run()
if __name__ == "__main__":
main()