- Add comprehensive database migrations (001-024) for schema evolution - Enhance API schema with expanded type definitions and resolvers - Add new middleware: audit logging, rate limiting, MFA enforcement, security, tenant auth - Implement new services: AI optimization, billing, blockchain, compliance, marketplace - Add adapter layer for cloud integrations (Cloudflare, Kubernetes, Proxmox, storage) - Update Crossplane provider with enhanced VM management capabilities - Add comprehensive test suite for API endpoints and services - Update frontend components with improved GraphQL subscriptions and real-time updates - Enhance security configurations and headers (CSP, CORS, etc.) - Update documentation and configuration files - Add new CI/CD workflows and validation scripts - Implement design system improvements and UI enhancements
374 lines
10 KiB
Python
374 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
TP-Link Omada Controller API Client
|
|
|
|
A Python client library for interacting with the TP-Link Omada Controller API.
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
from typing import Dict, List, Optional, Any
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
class OmadaError(Exception):
|
|
"""Base exception for Omada API errors"""
|
|
pass
|
|
|
|
|
|
class AuthenticationError(OmadaError):
|
|
"""Authentication failed"""
|
|
pass
|
|
|
|
|
|
class OmadaController:
|
|
"""TP-Link Omada Controller API Client"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 8043,
|
|
verify_ssl: bool = True,
|
|
timeout: int = 30
|
|
):
|
|
"""
|
|
Initialize Omada Controller client
|
|
|
|
Args:
|
|
host: Omada Controller hostname or IP
|
|
username: Controller username
|
|
password: Controller password
|
|
port: Controller port (default: 8043)
|
|
verify_ssl: Verify SSL certificates (default: True)
|
|
timeout: Request timeout in seconds (default: 30)
|
|
"""
|
|
self.base_url = f"https://{host}:{port}"
|
|
self.username = username
|
|
self.password = password
|
|
self.verify_ssl = verify_ssl
|
|
self.timeout = timeout
|
|
self.session = requests.Session()
|
|
self.session.verify = verify_ssl
|
|
self.token = None
|
|
self.authenticated = False
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
data: Optional[Dict] = None,
|
|
params: Optional[Dict] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Make API request
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, PUT, DELETE)
|
|
endpoint: API endpoint
|
|
data: Request body data
|
|
params: Query parameters
|
|
|
|
Returns:
|
|
API response as dictionary
|
|
|
|
Raises:
|
|
OmadaError: If API request fails
|
|
"""
|
|
url = urljoin(self.base_url, endpoint)
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
if self.token:
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
|
|
try:
|
|
response = self.session.request(
|
|
method=method,
|
|
url=url,
|
|
headers=headers,
|
|
json=data,
|
|
params=params,
|
|
timeout=self.timeout
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise AuthenticationError("Authentication failed") from e
|
|
raise OmadaError(f"API request failed: {e}") from e
|
|
except requests.exceptions.RequestException as e:
|
|
raise OmadaError(f"Request failed: {e}") from e
|
|
|
|
def login(self) -> bool:
|
|
"""
|
|
Authenticate with Omada Controller
|
|
|
|
Returns:
|
|
True if authentication successful
|
|
|
|
Raises:
|
|
AuthenticationError: If authentication fails
|
|
"""
|
|
endpoint = "/api/v2/login"
|
|
data = {
|
|
"username": self.username,
|
|
"password": self.password
|
|
}
|
|
|
|
try:
|
|
response = self._request("POST", endpoint, data=data)
|
|
self.token = response.get("token")
|
|
self.authenticated = True
|
|
return True
|
|
except OmadaError as e:
|
|
self.authenticated = False
|
|
raise AuthenticationError(f"Login failed: {e}") from e
|
|
|
|
def logout(self) -> None:
|
|
"""Logout from Omada Controller"""
|
|
if self.authenticated:
|
|
endpoint = "/api/v2/logout"
|
|
try:
|
|
self._request("POST", endpoint)
|
|
except OmadaError:
|
|
pass # Ignore errors on logout
|
|
finally:
|
|
self.token = None
|
|
self.authenticated = False
|
|
|
|
def is_authenticated(self) -> bool:
|
|
"""Check if authenticated"""
|
|
return self.authenticated
|
|
|
|
def get_sites(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all sites
|
|
|
|
Returns:
|
|
List of site dictionaries
|
|
"""
|
|
endpoint = "/api/v2/sites"
|
|
response = self._request("GET", endpoint)
|
|
return response.get("data", [])
|
|
|
|
def get_site(self, site_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get site by ID
|
|
|
|
Args:
|
|
site_id: Site ID
|
|
|
|
Returns:
|
|
Site dictionary
|
|
"""
|
|
endpoint = f"/api/v2/sites/{site_id}"
|
|
response = self._request("GET", endpoint)
|
|
return response.get("data", {})
|
|
|
|
def create_site(
|
|
self,
|
|
name: str,
|
|
timezone: str = "UTC",
|
|
description: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new site
|
|
|
|
Args:
|
|
name: Site name
|
|
timezone: Timezone (e.g., "America/New_York")
|
|
description: Site description
|
|
|
|
Returns:
|
|
Created site dictionary
|
|
"""
|
|
endpoint = "/api/v2/sites"
|
|
data = {
|
|
"name": name,
|
|
"timezone": timezone
|
|
}
|
|
if description:
|
|
data["description"] = description
|
|
|
|
response = self._request("POST", endpoint, data=data)
|
|
return response.get("data", {})
|
|
|
|
def get_access_points(self, site_id: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all access points for a site
|
|
|
|
Args:
|
|
site_id: Site ID
|
|
|
|
Returns:
|
|
List of access point dictionaries
|
|
"""
|
|
endpoint = f"/api/v2/sites/{site_id}/access-points"
|
|
response = self._request("GET", endpoint)
|
|
return response.get("data", [])
|
|
|
|
def get_access_point(self, ap_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get access point by ID
|
|
|
|
Args:
|
|
ap_id: Access point ID
|
|
|
|
Returns:
|
|
Access point dictionary
|
|
"""
|
|
endpoint = f"/api/v2/access-points/{ap_id}"
|
|
response = self._request("GET", endpoint)
|
|
return response.get("data", {})
|
|
|
|
def configure_ap(
|
|
self,
|
|
ap_id: str,
|
|
name: Optional[str] = None,
|
|
location: Optional[str] = None,
|
|
radio_config: Optional[Dict] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Configure access point
|
|
|
|
Args:
|
|
ap_id: Access point ID
|
|
name: Access point name
|
|
location: Physical location
|
|
radio_config: Radio configuration
|
|
|
|
Returns:
|
|
Updated access point dictionary
|
|
"""
|
|
endpoint = f"/api/v2/access-points/{ap_id}"
|
|
data = {}
|
|
|
|
if name:
|
|
data["name"] = name
|
|
if location:
|
|
data["location"] = location
|
|
if radio_config:
|
|
data["radio_config"] = radio_config
|
|
|
|
response = self._request("PUT", endpoint, data=data)
|
|
return response.get("data", {})
|
|
|
|
def get_ssids(self, site_id: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all SSIDs for a site
|
|
|
|
Args:
|
|
site_id: Site ID
|
|
|
|
Returns:
|
|
List of SSID dictionaries
|
|
"""
|
|
endpoint = f"/api/v2/sites/{site_id}/ssids"
|
|
response = self._request("GET", endpoint)
|
|
return response.get("data", [])
|
|
|
|
def create_ssid(
|
|
self,
|
|
site_id: str,
|
|
name: str,
|
|
security: str = "wpa3",
|
|
password: Optional[str] = None,
|
|
vlan: Optional[int] = None,
|
|
radios: Optional[List[str]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create SSID
|
|
|
|
Args:
|
|
site_id: Site ID
|
|
name: SSID name
|
|
security: Security type (open, wpa2, wpa3)
|
|
password: WPA password (required for wpa2/wpa3)
|
|
vlan: VLAN ID
|
|
radios: List of radios (["2.4GHz", "5GHz"])
|
|
|
|
Returns:
|
|
Created SSID dictionary
|
|
"""
|
|
endpoint = f"/api/v2/sites/{site_id}/ssids"
|
|
data = {
|
|
"name": name,
|
|
"security": security
|
|
}
|
|
|
|
if password:
|
|
data["password"] = password
|
|
if vlan:
|
|
data["vlan"] = vlan
|
|
if radios:
|
|
data["radios"] = radios
|
|
|
|
response = self._request("POST", endpoint, data=data)
|
|
return response.get("data", {})
|
|
|
|
def get_clients(self, site_id: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all client devices for a site
|
|
|
|
Args:
|
|
site_id: Site ID
|
|
|
|
Returns:
|
|
List of client dictionaries
|
|
"""
|
|
endpoint = f"/api/v2/sites/{site_id}/clients"
|
|
response = self._request("GET", endpoint)
|
|
return response.get("data", [])
|
|
|
|
def get_client(self, mac: str) -> Dict[str, Any]:
|
|
"""
|
|
Get client device by MAC address
|
|
|
|
Args:
|
|
mac: MAC address
|
|
|
|
Returns:
|
|
Client dictionary
|
|
"""
|
|
endpoint = f"/api/v2/clients/{mac}"
|
|
response = self._request("GET", endpoint)
|
|
return response.get("data", {})
|
|
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
# Initialize controller
|
|
controller = OmadaController(
|
|
host="omada.sankofa.nexus",
|
|
username="admin",
|
|
password="secure-password"
|
|
)
|
|
|
|
try:
|
|
# Authenticate
|
|
controller.login()
|
|
print("Authenticated successfully")
|
|
|
|
# Get sites
|
|
sites = controller.get_sites()
|
|
print(f"Found {len(sites)} sites")
|
|
|
|
# Get access points for first site
|
|
if sites:
|
|
site_id = sites[0]["id"]
|
|
aps = controller.get_access_points(site_id)
|
|
print(f"Found {len(aps)} access points")
|
|
|
|
# Logout
|
|
controller.logout()
|
|
print("Logged out")
|
|
except AuthenticationError as e:
|
|
print(f"Authentication failed: {e}")
|
|
except OmadaError as e:
|
|
print(f"Error: {e}")
|
|
|