#!/usr/bin/env python3 import argparse import json import subprocess import sys from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[2] ENV_PATH = PROJECT_ROOT / '.env' MINIMAL_REFERRER = 'add_header Referrer-Policy "strict-origin-when-cross-origin" always;' MANAGED_ADVANCED_CONFIG = { 'explorer.d-bis.org': MINIMAL_REFERRER, 'sankofa.nexus': '', 'phoenix.sankofa.nexus': '', 'the-order.sankofa.nexus': '', 'rpc-ws-pub.d-bis.org': MINIMAL_REFERRER, 'rpc-http-prv.d-bis.org': MINIMAL_REFERRER, 'rpc-ws-prv.d-bis.org': MINIMAL_REFERRER, 'dbis-admin.d-bis.org': MINIMAL_REFERRER, 'dbis-api.d-bis.org': '', 'dbis-api-2.d-bis.org': '', 'secure.d-bis.org': MINIMAL_REFERRER, 'mim4u.org': MINIMAL_REFERRER, 'www.mim4u.org': MINIMAL_REFERRER, 'secure.mim4u.org': MINIMAL_REFERRER, 'training.mim4u.org': MINIMAL_REFERRER, 'rpc.public-0138.defi-oracle.io': MINIMAL_REFERRER, 'studio.sankofa.nexus': '', } def load_env(path: Path) -> dict[str, str]: data: dict[str, str] = {} for raw_line in path.read_text().splitlines(): line = raw_line.strip() if not line or line.startswith('#') or '=' not in line: continue key, value = line.split('=', 1) data[key] = value return data def curl_json(url: str, token: str | None, method: str = 'GET', payload: dict | None = None) -> dict | list: cmd = [ 'curl', '-s', '-k', '-L', '--connect-timeout', '10', '--max-time', '120', '-X', method, url, ] if token: cmd.extend(['-H', f'Authorization: Bearer {token}']) if payload is not None: cmd.extend(['-H', 'Content-Type: application/json', '-d', json.dumps(payload)]) raw = subprocess.check_output(cmd, text=True) return json.loads(raw) def normalize_adv(value: str | None) -> str: return (value or '').replace('\r\n', '\n').strip() def main() -> int: parser = argparse.ArgumentParser(description='Normalize managed NPMplus per-host advanced_config values.') parser.add_argument('--dry-run', action='store_true', help='Show proposed changes without applying them') args = parser.parse_args() env = load_env(ENV_PATH) npm_url = env.get('NPM_URL', 'https://192.168.11.167:81') auth = {'identity': env['NPM_EMAIL'], 'secret': env['NPM_PASSWORD']} token = curl_json(f'{npm_url}/api/tokens', None, method='POST', payload=auth)['token'] hosts = curl_json(f'{npm_url}/api/nginx/proxy-hosts', token) changed = 0 for host in hosts: domains = [domain.lower() for domain in host.get('domain_names', [])] matched_domain = next((domain for domain in domains if domain in MANAGED_ADVANCED_CONFIG), None) if not matched_domain: continue desired_adv = MANAGED_ADVANCED_CONFIG[matched_domain] current_adv = normalize_adv(host.get('advanced_config')) if current_adv == normalize_adv(desired_adv): print(f'SKIP {matched_domain}') continue changed += 1 if desired_adv == '': mode = 'clear' elif normalize_adv(desired_adv) == normalize_adv(MINIMAL_REFERRER): mode = 'minimal-referrer' else: mode = 'custom' print(f'UPDATE {matched_domain} -> {mode}') if args.dry_run: continue payload = { 'forward_scheme': host['forward_scheme'], 'forward_host': host['forward_host'], 'forward_port': host['forward_port'], 'allow_websocket_upgrade': host['allow_websocket_upgrade'], 'block_exploits': host['block_exploits'], 'advanced_config': desired_adv, } response = curl_json(f"{npm_url}/api/nginx/proxy-hosts/{host['id']}", token, method='PUT', payload=payload) if str(response.get('id')) != str(host['id']): print(f'FAILED {matched_domain}: unexpected response {json.dumps(response)[:400]}', file=sys.stderr) return 1 if changed == 0: print('No managed advanced_config changes were needed.') elif args.dry_run: print(f'Dry run complete. {changed} host(s) would change.') else: print(f'Applied {changed} host(s).') return 0 if __name__ == '__main__': raise SystemExit(main())