#!/usr/bin/env python3 """ Validation and Decision Logic Tree for Besu Network Configuration Prevents erroneous configurations through comprehensive validation """ from typing import Dict, Any, List, Optional, Tuple from pathlib import Path import re class ValidationError(Exception): """Custom exception for validation errors""" pass class ConfigurationValidator: """Validates configuration and applies decision logic""" def __init__(self, config: Dict[str, Any], project_root: Path): self.config = config self.project_root = project_root self.errors: List[str] = [] self.warnings: List[str] = [] self.decision_tree_applied: List[str] = [] def validate_all(self) -> Tuple[bool, List[str], List[str]]: """Validate entire configuration""" self.errors = [] self.warnings = [] self.decision_tree_applied = [] # Run validation in order self.validate_genesis() self.validate_network() self.validate_besu_config() self.validate_deployment() self.validate_dependencies() self.apply_decision_tree() self.validate_resources() self.validate_security() return len(self.errors) == 0, self.errors, self.warnings def validate_genesis(self): """Validate genesis configuration""" genesis = self.config.get('genesis', {}) # Chain ID validation chain_id = genesis.get('chainId', 0) if chain_id < 1 or chain_id > 2147483647: self.errors.append(f"Chain ID must be between 1 and 2147483647, got {chain_id}") elif chain_id in [1, 3, 4, 5, 42]: # Mainnet, Ropsten, Rinkeby, Goerli, Kovan self.warnings.append(f"Chain ID {chain_id} is reserved for Ethereum networks. Consider using a different chain ID.") # Block period validation block_period = genesis.get('blockPeriodSeconds', 0) if block_period < 1 or block_period > 60: self.errors.append(f"Block period must be between 1 and 60 seconds, got {block_period}") elif block_period < 2: self.warnings.append(f"Block period of {block_period} second(s) may cause network instability. Recommended minimum: 2 seconds") # Epoch length validation epoch_length = genesis.get('epochLength', 0) if epoch_length < 1000 or epoch_length > 1000000: self.errors.append(f"Epoch length must be between 1000 and 1000000, got {epoch_length}") elif epoch_length < 10000: self.warnings.append(f"Epoch length of {epoch_length} may cause frequent validator set changes. Recommended minimum: 10000") # Request timeout validation request_timeout = genesis.get('requestTimeoutSeconds', 0) if request_timeout < 1 or request_timeout > 60: self.errors.append(f"Request timeout must be between 1 and 60 seconds, got {request_timeout}") elif request_timeout >= block_period: self.warnings.append(f"Request timeout ({request_timeout}s) should be less than block period ({block_period}s)") # Gas limit validation gas_limit = genesis.get('gasLimit', '0x0') try: gas_limit_int = int(gas_limit, 16) if gas_limit_int < 5000: self.errors.append(f"Gas limit too low: {gas_limit}. Minimum: 0x1388 (5000)") elif gas_limit_int > 0x7fffffffffffffff: self.errors.append(f"Gas limit too high: {gas_limit}. Maximum: 0x7fffffffffffffff") except ValueError: self.errors.append(f"Invalid gas limit format: {gas_limit}") # Validator count validation validators = self.config.get('validators', []) if len(validators) < 1: self.errors.append("At least one validator is required for IBFT2 consensus") elif len(validators) < 4: self.warnings.append(f"Only {len(validators)} validator(s) configured. For production, recommend at least 4 validators for fault tolerance") elif len(validators) % 2 == 0: # Even number of validators can cause consensus issues self.warnings.append(f"Even number of validators ({len(validators)}) can cause consensus deadlocks. Consider using an odd number") # Validator address validation for i, validator in enumerate(validators): if not re.match(r'^0x[a-fA-F0-9]{40}$', validator): self.errors.append(f"Invalid validator address {i+1}: {validator}. Must be a valid Ethereum address (0x followed by 40 hex characters)") def validate_network(self): """Validate network configuration""" network = self.config.get('network', {}) nodes = self.config.get('nodes', {}) # Cluster name validation cluster_name = network.get('clusterName', '') if not cluster_name: self.errors.append("Cluster name is required") elif not re.match(r'^[a-z0-9-]+$', cluster_name): self.errors.append(f"Invalid cluster name: {cluster_name}. Must contain only lowercase letters, numbers, and hyphens") elif len(cluster_name) > 63: self.errors.append(f"Cluster name too long: {cluster_name}. Maximum length: 63 characters") # Resource group validation resource_group = network.get('resourceGroup', '') if not resource_group: self.errors.append("Resource group name is required") elif not re.match(r'^[a-zA-Z0-9._()-]+$', resource_group): self.errors.append(f"Invalid resource group name: {resource_group}. Must contain only alphanumeric characters, periods, underscores, parentheses, and hyphens") elif len(resource_group) > 90: self.errors.append(f"Resource group name too long: {resource_group}. Maximum length: 90 characters") # VNet address space validation vnet_address = network.get('vnetAddressSpace', '') if not self._validate_cidr(vnet_address): self.errors.append(f"Invalid VNet address space: {vnet_address}. Must be valid CIDR notation") # Subnet validation subnets = network.get('subnets', {}) vnet_cidr = self._parse_cidr(vnet_address) if vnet_cidr: vnet_network, vnet_mask = vnet_cidr for subnet_name, subnet_cidr in subnets.items(): if not self._validate_cidr(subnet_cidr): self.errors.append(f"Invalid {subnet_name} subnet: {subnet_cidr}. Must be valid CIDR notation") else: subnet_network, subnet_mask = self._parse_cidr(subnet_cidr) if subnet_network and not self._is_subnet_of(subnet_network, subnet_mask, vnet_network, vnet_mask): self.errors.append(f"{subnet_name} subnet {subnet_cidr} is not within VNet {vnet_address}") if subnet_mask < vnet_mask: self.errors.append(f"{subnet_name} subnet {subnet_cidr} has larger mask than VNet {vnet_address}") # Node count validation validator_count = nodes.get('validatorCount', 0) sentry_count = nodes.get('sentryCount', 0) rpc_count = nodes.get('rpcCount', 0) if validator_count < 1: self.errors.append("At least one validator is required") if sentry_count < 1: self.warnings.append("No sentries configured. Validators will be directly exposed to the network") if rpc_count < 1: self.warnings.append("No RPC nodes configured. No public RPC access will be available") # VM size validation vm_sizes = self.config.get('vmSizes', {}) for vm_type, vm_size in vm_sizes.items(): if not vm_size: self.errors.append(f"VM size for {vm_type} is required") elif not re.match(r'^Standard_[A-Z][0-9]+[a-z]*_v[0-9]+$', vm_size): self.warnings.append(f"VM size {vm_size} may not be valid. Standard format: Standard_[Series][Size]_v[Version]") def validate_besu_config(self): """Validate Besu configuration""" besu = self.config.get('besu', {}) ports = self.config.get('ports', {}) # Port validation used_ports = set() port_configs = [ ('p2p', ports.get('p2p', 30303)), ('rpcHttp', ports.get('rpcHttp', 8545)), ('rpcWs', ports.get('rpcWs', 8546)), ('metrics', ports.get('metrics', 9545)), ] for port_name, port_value in port_configs: if port_value < 1024 and port_value != 0: self.warnings.append(f"Port {port_name} ({port_value}) is in privileged range (0-1023). May require root access") elif port_value in used_ports: self.errors.append(f"Port conflict: {port_name} port {port_value} is already in use") else: used_ports.add(port_value) # RPC configuration validation for node_type in ['validators', 'sentries', 'rpc']: node_config = besu.get(node_type, {}) # Validators should not have RPC enabled if node_type == 'validators' and node_config.get('rpcHttpEnabled', False): self.warnings.append("Validators have RPC enabled. For security, validators should have RPC disabled") # RPC nodes should have RPC enabled if node_type == 'rpc' and not node_config.get('rpcHttpEnabled', False): self.errors.append("RPC nodes must have RPC HTTP enabled") # RPC nodes should not have P2P enabled if node_type == 'rpc' and node_config.get('p2pEnabled', True): self.warnings.append("RPC nodes have P2P enabled. For security, RPC nodes should have P2P disabled") # CORS validation rpc_config = besu.get('rpc', {}) cors_origins = rpc_config.get('corsOrigins', []) if cors_origins: for origin in cors_origins: if origin == '*': self.warnings.append("CORS origin '*' allows all origins. Consider restricting to specific domains for security") elif not (origin.startswith('http://') or origin.startswith('https://')): self.warnings.append(f"CORS origin '{origin}' should include protocol (http:// or https://)") def validate_deployment(self): """Validate deployment configuration""" deployment = self.config.get('deployment', {}) # Deployment type validation deployment_type = deployment.get('type', '') if deployment_type not in ['aks', 'vm', 'both']: self.errors.append(f"Invalid deployment type: {deployment_type}. Must be 'aks', 'vm', or 'both'") # VM deployment validation if deployment.get('vmEnabled', False): vm_config = deployment.get('vm', {}) # SSH key validation ssh_key_path = vm_config.get('sshPublicKey', '') if ssh_key_path: expanded_path = Path(ssh_key_path).expanduser() if not expanded_path.exists(): self.errors.append(f"SSH public key not found: {ssh_key_path}") else: # Validate SSH key format try: with open(expanded_path, 'r') as f: key_content = f.read().strip() if not key_content.startswith('ssh-rsa') and not key_content.startswith('ssh-ed25519'): self.warnings.append(f"SSH key may not be in correct format: {ssh_key_path}") except Exception as e: self.errors.append(f"Error reading SSH key: {e}") # Regions validation regions = vm_config.get('regions', []) if not regions: self.errors.append("At least one region is required for VM deployment") elif len(regions) > 10: self.warnings.append(f"Deploying to {len(regions)} regions may be costly. Consider reducing number of regions") def validate_dependencies(self): """Validate dependencies between configuration sections""" # Check if validator count matches genesis validators genesis_validators = len(self.config.get('validators', [])) node_validators = self.config.get('nodes', {}).get('validatorCount', 0) if genesis_validators > 0 and node_validators > 0 and genesis_validators != node_validators: self.warnings.append(f"Validator count mismatch: {genesis_validators} validators in genesis, {node_validators} validators in node configuration") # Check if monitoring is enabled but no monitoring components selected monitoring = self.config.get('monitoring', {}) if monitoring.get('enabled', False): has_monitoring = ( monitoring.get('prometheusEnabled', False) or monitoring.get('grafanaEnabled', False) or monitoring.get('lokiEnabled', False) ) if not has_monitoring: self.warnings.append("Monitoring is enabled but no monitoring components are selected") # Check if Blockscout is enabled but RPC is disabled blockscout = self.config.get('blockscout', {}) if blockscout.get('enabled', False): rpc_config = self.config.get('besu', {}).get('rpc', {}) if not rpc_config.get('rpcHttpEnabled', False): self.errors.append("Blockscout requires RPC to be enabled. Enable RPC HTTP for RPC nodes") def apply_decision_tree(self): """Apply decision tree logic to fix or warn about configurations""" # Decision Tree 1: Validator Count and Consensus validator_count = len(self.config.get('validators', [])) if validator_count == 1: self.decision_tree_applied.append("Single validator detected - network will be centralized") self.warnings.append("Single validator configuration: Network will be centralized. Not suitable for production") elif validator_count == 2: self.decision_tree_applied.append("Two validators detected - risk of deadlock") self.warnings.append("Two validators: Risk of consensus deadlock if one validator goes offline") elif validator_count == 3: self.decision_tree_applied.append("Three validators detected - can tolerate 1 failure") self.warnings.append("Three validators: Can tolerate 1 failure. For production, recommend at least 4 validators") # Decision Tree 2: Network Architecture sentry_count = self.config.get('nodes', {}).get('sentryCount', 0) validator_count = self.config.get('nodes', {}).get('validatorCount', 0) if sentry_count == 0 and validator_count > 0: self.decision_tree_applied.append("No sentries configured - validators exposed directly") self.warnings.append("No sentries: Validators will be directly exposed to the network. Consider adding sentries for security") elif sentry_count > 0 and sentry_count < validator_count: self.decision_tree_applied.append("Fewer sentries than validators - may cause connectivity issues") self.warnings.append(f"Fewer sentries ({sentry_count}) than validators ({validator_count}). Recommend at least {validator_count} sentries") # Decision Tree 3: RPC Configuration rpc_count = self.config.get('nodes', {}).get('rpcCount', 0) rpc_config = self.config.get('besu', {}).get('rpc', {}) if rpc_count > 0: if not rpc_config.get('rpcHttpEnabled', False): self.errors.append("RPC nodes configured but RPC HTTP is disabled") if rpc_config.get('p2pEnabled', True): self.decision_tree_applied.append("RPC nodes have P2P enabled - security risk") self.warnings.append("RPC nodes have P2P enabled. For security, disable P2P on RPC nodes") # Decision Tree 4: Deployment Type and Resources deployment = self.config.get('deployment', {}) nodes = self.config.get('nodes', {}) vm_sizes = self.config.get('vmSizes', {}) if deployment.get('vmEnabled', False): total_nodes = nodes.get('validatorCount', 0) + nodes.get('sentryCount', 0) + nodes.get('rpcCount', 0) if total_nodes > 50: self.decision_tree_applied.append("Large VM deployment detected - cost consideration") self.warnings.append(f"Large VM deployment: {total_nodes} nodes. Consider using VM Scale Sets for cost optimization") # Check VM sizes are appropriate for vm_type, vm_size in vm_sizes.items(): if 'D2' in vm_size and vm_type == 'rpc': self.warnings.append(f"RPC nodes using {vm_size} may have insufficient resources. Recommend D4s_v3 or larger") if 'D2' in vm_size and vm_type == 'validator': self.warnings.append(f"Validators using {vm_size} may have insufficient resources. Recommend D4s_v3 or larger") # Decision Tree 5: Security Configuration rpc_config = self.config.get('besu', {}).get('rpc', {}) cors_origins = rpc_config.get('corsOrigins', []) host_allowlist = rpc_config.get('hostAllowlist', []) if rpc_config.get('rpcHttpEnabled', False): if not cors_origins and not host_allowlist: self.decision_tree_applied.append("RPC enabled without CORS or host restrictions - security risk") self.warnings.append("RPC enabled without CORS or host restrictions. Consider adding security restrictions") if '0.0.0.0' in host_allowlist or '*' in host_allowlist: self.decision_tree_applied.append("RPC host allowlist allows all hosts - security risk") self.warnings.append("RPC host allowlist allows all hosts. Consider restricting to specific hosts") # Decision Tree 6: Resource Allocation nodes = self.config.get('nodes', {}) vm_sizes = self.config.get('vmSizes', {}) besu_config = self.config.get('besu', {}) validator_count = nodes.get('validatorCount', 0) if validator_count > 10: self.decision_tree_applied.append("Large validator count - resource consideration") self.warnings.append(f"Large validator count ({validator_count}). Ensure sufficient network bandwidth and resources") # Check JVM options match VM sizes for node_type in ['validators', 'sentries', 'rpc']: jvm_options = besu_config.get(node_type, {}).get('jvmOptions', '') vm_size = vm_sizes.get(node_type.replace('s', ''), '') if 'Xmx' in jvm_options: # Extract memory from JVM options memory_match = re.search(r'-Xmx(\d+)([gGmM])', jvm_options) if memory_match: memory_value = int(memory_match.group(1)) memory_unit = memory_match.group(2).upper() memory_gb = memory_value if memory_unit == 'G' else memory_value / 1024 # Check if memory is appropriate for VM size if 'D2' in vm_size and memory_gb > 8: self.warnings.append(f"{node_type} JVM memory ({memory_gb}GB) may exceed VM size {vm_size} capacity") elif 'D4' in vm_size and memory_gb > 16: self.warnings.append(f"{node_type} JVM memory ({memory_gb}GB) may exceed VM size {vm_size} capacity") def validate_resources(self): """Validate resource allocation""" nodes = self.config.get('nodes', {}) vm_sizes = self.config.get('vmSizes', {}) # Check if node counts are reasonable total_nodes = nodes.get('validatorCount', 0) + nodes.get('sentryCount', 0) + nodes.get('rpcCount', 0) if total_nodes > 100: self.warnings.append(f"Total node count ({total_nodes}) is very high. Consider if this is necessary") # Check VM size consistency validator_size = vm_sizes.get('validator', '') sentry_size = vm_sizes.get('sentry', '') rpc_size = vm_sizes.get('rpc', '') # RPC nodes typically need more resources if rpc_size and validator_size: if self._compare_vm_sizes(rpc_size, validator_size) < 0: self.warnings.append("RPC nodes have smaller VM size than validators. RPC nodes typically need more resources") def validate_security(self): """Validate security configuration""" # Check if validators have RPC enabled (security risk) besu_config = self.config.get('besu', {}) validator_config = besu_config.get('validators', {}) if validator_config.get('rpcHttpEnabled', False): self.warnings.append("Security: Validators have RPC enabled. Validators should not expose RPC endpoints") # Check CORS configuration rpc_config = besu_config.get('rpc', {}) cors_origins = rpc_config.get('corsOrigins', []) if '*' in cors_origins: self.warnings.append("Security: CORS allows all origins ('*'). This is a security risk in production") # Check host allowlist host_allowlist = rpc_config.get('hostAllowlist', []) if '0.0.0.0' in host_allowlist or '*' in host_allowlist: self.warnings.append("Security: Host allowlist allows all hosts. This is a security risk in production") def _validate_cidr(self, cidr: str) -> bool: """Validate CIDR notation""" try: parts = cidr.split('/') if len(parts) != 2: return False ip = parts[0] mask = int(parts[1]) if mask < 0 or mask > 32: return False ip_parts = ip.split('.') if len(ip_parts) != 4: return False return all(0 <= int(part) <= 255 for part in ip_parts) except (ValueError, AttributeError): return False def _parse_cidr(self, cidr: str) -> Optional[Tuple[str, int]]: """Parse CIDR notation""" try: parts = cidr.split('/') if len(parts) != 2: return None ip = parts[0] mask = int(parts[1]) return (ip, mask) except (ValueError, AttributeError): return None def _is_subnet_of(self, subnet_ip: str, subnet_mask: int, network_ip: str, network_mask: int) -> bool: """Check if subnet is within network""" if subnet_mask < network_mask: return False # Simple check: if masks are equal, IPs must be equal # For production, should do proper bitwise comparison return True def _compare_vm_sizes(self, size1: str, size2: str) -> int: """Compare VM sizes. Returns -1 if size1 < size2, 0 if equal, 1 if size1 > size2""" # Extract size number from VM size string (e.g., D4s_v3 -> 4) def extract_size(size: str) -> int: match = re.search(r'D(\d+)', size) return int(match.group(1)) if match else 0 size1_num = extract_size(size1) size2_num = extract_size(size2) if size1_num < size2_num: return -1 elif size1_num > size2_num: return 1 else: return 0