Files
Sankofa/crossplane-provider-proxmox/pkg/controller/resourcediscovery/controller.go
defiQUG 9daf1fd378 Apply Composer changes: comprehensive API updates, migrations, middleware, and infrastructure improvements
- Add comprehensive database migrations (001-024) for schema evolution
- Enhance API schema with expanded type definitions and resolvers
- Add new middleware: audit logging, rate limiting, MFA enforcement, security, tenant auth
- Implement new services: AI optimization, billing, blockchain, compliance, marketplace
- Add adapter layer for cloud integrations (Cloudflare, Kubernetes, Proxmox, storage)
- Update Crossplane provider with enhanced VM management capabilities
- Add comprehensive test suite for API endpoints and services
- Update frontend components with improved GraphQL subscriptions and real-time updates
- Enhance security configurations and headers (CSP, CORS, etc.)
- Update documentation and configuration files
- Add new CI/CD workflows and validation scripts
- Implement design system improvements and UI enhancements
2025-12-12 18:01:35 -08:00

340 lines
11 KiB
Go

package resourcediscovery
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
proxmoxv1alpha1 "github.com/sankofa/crossplane-provider-proxmox/apis/v1alpha1"
"github.com/sankofa/crossplane-provider-proxmox/pkg/cloudflare"
"github.com/sankofa/crossplane-provider-proxmox/pkg/discovery"
"github.com/sankofa/crossplane-provider-proxmox/pkg/proxmox"
)
// ResourceDiscoveryReconciler reconciles a ResourceDiscovery object
type ResourceDiscoveryReconciler struct {
client.Client
Scheme *runtime.Scheme
K8sClient kubernetes.Interface
InventoryAPI string // API endpoint for resource inventory
}
//+kubebuilder:rbac:groups=proxmox.sankofa.nexus,resources=resourcediscoveries,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=proxmox.sankofa.nexus,resources=resourcediscoveries/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=proxmox.sankofa.nexus,resources=resourcediscoveries/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop
func (r *ResourceDiscoveryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
var rd proxmoxv1alpha1.ResourceDiscovery
if err := r.Get(ctx, req.NamespacedName, &rd); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Check if discovery is enabled
if !rd.Spec.Enabled {
logger.Info("Resource discovery is disabled, skipping")
return ctrl.Result{RequeueAfter: time.Duration(rd.Spec.SyncInterval) * time.Second}, nil
}
// Check if we need to sync (based on sync interval)
syncInterval := time.Duration(rd.Spec.SyncInterval) * time.Second
if rd.Status.LastSyncTime != nil {
timeSinceLastSync := time.Since(rd.Status.LastSyncTime.Time)
if timeSinceLastSync < syncInterval {
requeueAfter := syncInterval - timeSinceLastSync
logger.Info("Sync interval not reached, requeuing", "requeueAfter", requeueAfter)
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
}
// Update status to IN_PROGRESS
rd.Status.SyncStatus = "IN_PROGRESS"
rd.Status.LastSyncError = ""
if err := r.Status().Update(ctx, &rd); err != nil {
logger.Error(err, "failed to update status")
}
// Perform discovery based on provider
var discoveredResources []discovery.DiscoveredResource
var err error
switch rd.Spec.Provider {
case "PROXMOX":
discoveredResources, err = r.discoverProxmoxResources(ctx, &rd)
case "KUBERNETES":
discoveredResources, err = r.discoverKubernetesResources(ctx, &rd)
case "CLOUDFLARE":
discoveredResources, err = r.discoverCloudflareResources(ctx, &rd)
default:
err = fmt.Errorf("unsupported provider: %s", rd.Spec.Provider)
}
if err != nil {
logger.Error(err, "discovery failed")
rd.Status.SyncStatus = "FAILED"
rd.Status.LastSyncError = err.Error()
if updateErr := r.Status().Update(ctx, &rd); updateErr != nil {
logger.Error(updateErr, "failed to update status with error")
}
return ctrl.Result{RequeueAfter: syncInterval}, err
}
// Sync discovered resources to inventory API
syncedCount, err := r.syncResourcesToAPI(ctx, discoveredResources, &rd)
if err != nil {
logger.Error(err, "failed to sync resources to API")
rd.Status.SyncStatus = "FAILED"
rd.Status.LastSyncError = err.Error()
if updateErr := r.Status().Update(ctx, &rd); updateErr != nil {
logger.Error(updateErr, "failed to update status with error")
}
return ctrl.Result{RequeueAfter: syncInterval}, err
}
// Update status
now := metav1.Now()
rd.Status.LastSyncTime = &now
rd.Status.ResourcesDiscovered = syncedCount
rd.Status.SyncStatus = "SUCCESS"
rd.Status.LastSyncError = ""
if err := r.Status().Update(ctx, &rd); err != nil {
logger.Error(err, "failed to update status")
return ctrl.Result{}, err
}
logger.Info("Discovery completed successfully", "resources", syncedCount)
return ctrl.Result{RequeueAfter: syncInterval}, nil
}
func (r *ResourceDiscoveryReconciler) discoverProxmoxResources(
ctx context.Context,
rd *proxmoxv1alpha1.ResourceDiscovery,
) ([]discovery.DiscoveredResource, error) {
// Validate ProviderConfigReference
if rd.Spec.ProviderConfigReference == nil {
return []discovery.DiscoveredResource{}, errors.New("providerConfigRef is required")
}
if rd.Spec.ProviderConfigReference.Name == "" {
return []discovery.DiscoveredResource{}, errors.New("providerConfigRef.name is required")
}
// Get ProviderConfig
var providerConfig proxmoxv1alpha1.ProviderConfig
if err := r.Get(ctx, client.ObjectKey{Name: rd.Spec.ProviderConfigReference.Name}, &providerConfig); err != nil {
return []discovery.DiscoveredResource{}, errors.Wrapf(err, "cannot get provider config")
}
// Get credentials from ProviderConfig
var credentials struct {
Username string
Password string
Endpoint string
}
// Try to get credentials from ProviderConfig
if providerConfig.Spec.Credentials.SecretRef != nil {
secretRef := providerConfig.Spec.Credentials.SecretRef
secret := &corev1.Secret{}
secretKey := client.ObjectKey{
Namespace: secretRef.Namespace,
Name: secretRef.Name,
}
if err := r.Get(ctx, secretKey, secret); err == nil {
if userData, ok := secret.Data["username"]; ok {
credentials.Username = string(userData)
}
if passData, ok := secret.Data["password"]; ok {
credentials.Password = string(passData)
}
if endpointData, ok := secret.Data["endpoint"]; ok {
credentials.Endpoint = string(endpointData)
}
}
}
// Find endpoint from ProviderConfig Sites
var endpoint string
var insecureSkipTLS bool
if rd.Spec.Site != "" {
// Find the site matching rd.Spec.Site
for _, site := range providerConfig.Spec.Sites {
if site.Name == rd.Spec.Site {
endpoint = site.Endpoint
insecureSkipTLS = site.InsecureSkipTLSVerify
break
}
}
} else if len(providerConfig.Spec.Sites) > 0 {
// Use first site if no site specified
endpoint = providerConfig.Spec.Sites[0].Endpoint
insecureSkipTLS = providerConfig.Spec.Sites[0].InsecureSkipTLSVerify
}
// Override with endpoint from credentials if provided
if credentials.Endpoint != "" {
endpoint = credentials.Endpoint
}
if endpoint == "" {
return []discovery.DiscoveredResource{}, errors.New("no endpoint found in ProviderConfig sites or credentials")
}
client, err := proxmox.NewClient(endpoint, credentials.Username, credentials.Password, insecureSkipTLS)
if err != nil {
return []discovery.DiscoveredResource{}, errors.Wrap(err, "failed to create Proxmox client")
}
agent := discovery.NewProxmoxDiscoveryAgent(client, rd.Spec.Site, rd.Spec.Region)
return agent.DiscoverAll(ctx)
}
func (r *ResourceDiscoveryReconciler) discoverKubernetesResources(
ctx context.Context,
rd *proxmoxv1alpha1.ResourceDiscovery,
) ([]discovery.DiscoveredResource, error) {
if r.K8sClient == nil {
return nil, fmt.Errorf("kubernetes client not configured")
}
agent := discovery.NewKubernetesDiscoveryAgent(r.K8sClient, rd.Spec.Site, rd.Spec.Region)
return agent.DiscoverAll(ctx)
}
func (r *ResourceDiscoveryReconciler) discoverCloudflareResources(
ctx context.Context,
rd *proxmoxv1alpha1.ResourceDiscovery,
) ([]discovery.DiscoveredResource, error) {
// Get ProviderConfig
var providerConfig proxmoxv1alpha1.ProviderConfig
if err := r.Get(ctx, client.ObjectKey{Name: rd.Spec.ProviderConfigReference.Name}, &providerConfig); err != nil {
return nil, errors.Wrapf(err, "cannot get provider config")
}
// Get credentials from ProviderConfig
var apiToken, accountID string
if providerConfig.Spec.Credentials.SecretRef != nil {
secretRef := providerConfig.Spec.Credentials.SecretRef
secret := &corev1.Secret{}
secretKey := client.ObjectKey{
Namespace: secretRef.Namespace,
Name: secretRef.Name,
}
if err := r.Get(ctx, secretKey, secret); err == nil {
if tokenData, ok := secret.Data["apiToken"]; ok {
apiToken = string(tokenData)
}
if accountData, ok := secret.Data["accountID"]; ok {
accountID = string(accountData)
}
}
}
client := cloudflare.NewClient(apiToken, accountID)
agent := discovery.NewCloudflareDiscoveryAgent(client, rd.Spec.Site, rd.Spec.Region)
return agent.DiscoverAll(ctx)
}
// syncResourcesToAPI syncs discovered resources to the inventory API
func (r *ResourceDiscoveryReconciler) syncResourcesToAPI(
ctx context.Context,
resources []discovery.DiscoveredResource,
rd *proxmoxv1alpha1.ResourceDiscovery,
) (int, error) {
if r.InventoryAPI == "" {
// If no API endpoint configured, just return count
return len(resources), nil
}
// Prepare resources for API
type ResourceInventoryItem struct {
ResourceType string `json:"resourceType"`
Provider string `json:"provider"`
ProviderID string `json:"providerId"`
ProviderResourceID string `json:"providerResourceId"`
Name string `json:"name"`
Region string `json:"region"`
SiteID string `json:"siteId"`
Metadata map[string]interface{} `json:"metadata"`
Tags []string `json:"tags"`
}
items := make([]ResourceInventoryItem, len(resources))
for i, res := range resources {
items[i] = ResourceInventoryItem{
ResourceType: res.ResourceType,
Provider: res.Provider,
ProviderID: res.ProviderID,
ProviderResourceID: res.ProviderResourceID,
Name: res.Name,
Region: res.Region,
SiteID: res.SiteID,
Metadata: res.Metadata,
Tags: res.Tags,
}
}
// Make API call to sync resources
jsonData, err := json.Marshal(map[string]interface{}{
"provider": rd.Spec.Provider,
"resources": items,
})
if err != nil {
return 0, errors.Wrap(err, "failed to marshal resources")
}
req, err := http.NewRequestWithContext(ctx, "POST", r.InventoryAPI+"/sync", bytes.NewBuffer(jsonData))
if err != nil {
return 0, errors.Wrap(err, "failed to create request")
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return 0, errors.Wrap(err, "failed to send request")
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return 0, fmt.Errorf("API returned status %d", resp.StatusCode)
}
var result struct {
Synced int `json:"synced"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
// If response doesn't match expected format, assume all were synced
return len(resources), nil
}
return result.Synced, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *ResourceDiscoveryReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&proxmoxv1alpha1.ResourceDiscovery{}).
Complete(r)
}