Files
Sankofa/infrastructure/omada/api/omada_client.py
defiQUG 9daf1fd378 Apply Composer changes: comprehensive API updates, migrations, middleware, and infrastructure improvements
- Add comprehensive database migrations (001-024) for schema evolution
- Enhance API schema with expanded type definitions and resolvers
- Add new middleware: audit logging, rate limiting, MFA enforcement, security, tenant auth
- Implement new services: AI optimization, billing, blockchain, compliance, marketplace
- Add adapter layer for cloud integrations (Cloudflare, Kubernetes, Proxmox, storage)
- Update Crossplane provider with enhanced VM management capabilities
- Add comprehensive test suite for API endpoints and services
- Update frontend components with improved GraphQL subscriptions and real-time updates
- Enhance security configurations and headers (CSP, CORS, etc.)
- Update documentation and configuration files
- Add new CI/CD workflows and validation scripts
- Implement design system improvements and UI enhancements
2025-12-12 18:01:35 -08:00

374 lines
10 KiB
Python

#!/usr/bin/env python3
"""
TP-Link Omada Controller API Client
A Python client library for interacting with the TP-Link Omada Controller API.
"""
import requests
import json
from typing import Dict, List, Optional, Any
from urllib.parse import urljoin
class OmadaError(Exception):
"""Base exception for Omada API errors"""
pass
class AuthenticationError(OmadaError):
"""Authentication failed"""
pass
class OmadaController:
"""TP-Link Omada Controller API Client"""
def __init__(
self,
host: str,
username: str,
password: str,
port: int = 8043,
verify_ssl: bool = True,
timeout: int = 30
):
"""
Initialize Omada Controller client
Args:
host: Omada Controller hostname or IP
username: Controller username
password: Controller password
port: Controller port (default: 8043)
verify_ssl: Verify SSL certificates (default: True)
timeout: Request timeout in seconds (default: 30)
"""
self.base_url = f"https://{host}:{port}"
self.username = username
self.password = password
self.verify_ssl = verify_ssl
self.timeout = timeout
self.session = requests.Session()
self.session.verify = verify_ssl
self.token = None
self.authenticated = False
def _request(
self,
method: str,
endpoint: str,
data: Optional[Dict] = None,
params: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Make API request
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint
data: Request body data
params: Query parameters
Returns:
API response as dictionary
Raises:
OmadaError: If API request fails
"""
url = urljoin(self.base_url, endpoint)
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
try:
response = self.session.request(
method=method,
url=url,
headers=headers,
json=data,
params=params,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise AuthenticationError("Authentication failed") from e
raise OmadaError(f"API request failed: {e}") from e
except requests.exceptions.RequestException as e:
raise OmadaError(f"Request failed: {e}") from e
def login(self) -> bool:
"""
Authenticate with Omada Controller
Returns:
True if authentication successful
Raises:
AuthenticationError: If authentication fails
"""
endpoint = "/api/v2/login"
data = {
"username": self.username,
"password": self.password
}
try:
response = self._request("POST", endpoint, data=data)
self.token = response.get("token")
self.authenticated = True
return True
except OmadaError as e:
self.authenticated = False
raise AuthenticationError(f"Login failed: {e}") from e
def logout(self) -> None:
"""Logout from Omada Controller"""
if self.authenticated:
endpoint = "/api/v2/logout"
try:
self._request("POST", endpoint)
except OmadaError:
pass # Ignore errors on logout
finally:
self.token = None
self.authenticated = False
def is_authenticated(self) -> bool:
"""Check if authenticated"""
return self.authenticated
def get_sites(self) -> List[Dict[str, Any]]:
"""
Get all sites
Returns:
List of site dictionaries
"""
endpoint = "/api/v2/sites"
response = self._request("GET", endpoint)
return response.get("data", [])
def get_site(self, site_id: str) -> Dict[str, Any]:
"""
Get site by ID
Args:
site_id: Site ID
Returns:
Site dictionary
"""
endpoint = f"/api/v2/sites/{site_id}"
response = self._request("GET", endpoint)
return response.get("data", {})
def create_site(
self,
name: str,
timezone: str = "UTC",
description: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a new site
Args:
name: Site name
timezone: Timezone (e.g., "America/New_York")
description: Site description
Returns:
Created site dictionary
"""
endpoint = "/api/v2/sites"
data = {
"name": name,
"timezone": timezone
}
if description:
data["description"] = description
response = self._request("POST", endpoint, data=data)
return response.get("data", {})
def get_access_points(self, site_id: str) -> List[Dict[str, Any]]:
"""
Get all access points for a site
Args:
site_id: Site ID
Returns:
List of access point dictionaries
"""
endpoint = f"/api/v2/sites/{site_id}/access-points"
response = self._request("GET", endpoint)
return response.get("data", [])
def get_access_point(self, ap_id: str) -> Dict[str, Any]:
"""
Get access point by ID
Args:
ap_id: Access point ID
Returns:
Access point dictionary
"""
endpoint = f"/api/v2/access-points/{ap_id}"
response = self._request("GET", endpoint)
return response.get("data", {})
def configure_ap(
self,
ap_id: str,
name: Optional[str] = None,
location: Optional[str] = None,
radio_config: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Configure access point
Args:
ap_id: Access point ID
name: Access point name
location: Physical location
radio_config: Radio configuration
Returns:
Updated access point dictionary
"""
endpoint = f"/api/v2/access-points/{ap_id}"
data = {}
if name:
data["name"] = name
if location:
data["location"] = location
if radio_config:
data["radio_config"] = radio_config
response = self._request("PUT", endpoint, data=data)
return response.get("data", {})
def get_ssids(self, site_id: str) -> List[Dict[str, Any]]:
"""
Get all SSIDs for a site
Args:
site_id: Site ID
Returns:
List of SSID dictionaries
"""
endpoint = f"/api/v2/sites/{site_id}/ssids"
response = self._request("GET", endpoint)
return response.get("data", [])
def create_ssid(
self,
site_id: str,
name: str,
security: str = "wpa3",
password: Optional[str] = None,
vlan: Optional[int] = None,
radios: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Create SSID
Args:
site_id: Site ID
name: SSID name
security: Security type (open, wpa2, wpa3)
password: WPA password (required for wpa2/wpa3)
vlan: VLAN ID
radios: List of radios (["2.4GHz", "5GHz"])
Returns:
Created SSID dictionary
"""
endpoint = f"/api/v2/sites/{site_id}/ssids"
data = {
"name": name,
"security": security
}
if password:
data["password"] = password
if vlan:
data["vlan"] = vlan
if radios:
data["radios"] = radios
response = self._request("POST", endpoint, data=data)
return response.get("data", {})
def get_clients(self, site_id: str) -> List[Dict[str, Any]]:
"""
Get all client devices for a site
Args:
site_id: Site ID
Returns:
List of client dictionaries
"""
endpoint = f"/api/v2/sites/{site_id}/clients"
response = self._request("GET", endpoint)
return response.get("data", [])
def get_client(self, mac: str) -> Dict[str, Any]:
"""
Get client device by MAC address
Args:
mac: MAC address
Returns:
Client dictionary
"""
endpoint = f"/api/v2/clients/{mac}"
response = self._request("GET", endpoint)
return response.get("data", {})
# Example usage
if __name__ == "__main__":
# Initialize controller
controller = OmadaController(
host="omada.sankofa.nexus",
username="admin",
password="secure-password"
)
try:
# Authenticate
controller.login()
print("Authenticated successfully")
# Get sites
sites = controller.get_sites()
print(f"Found {len(sites)} sites")
# Get access points for first site
if sites:
site_id = sites[0]["id"]
aps = controller.get_access_points(site_id)
print(f"Found {len(aps)} access points")
# Logout
controller.logout()
print("Logged out")
except AuthenticationError as e:
print(f"Authentication failed: {e}")
except OmadaError as e:
print(f"Error: {e}")