From 790294672ee96f9d7de3e63f58101b109f57f110 Mon Sep 17 00:00:00 2001 From: Denis Tu Date: Fri, 19 Sep 2025 12:01:34 +0300 Subject: [PATCH 1/5] Feat: Embedded HTTP API Server #7 Signed-off-by: Denis Tu Signed-off-by: Denis Tu --- cmd/main.go | 24 +- internal/pipeline/processor.go | 17 + internal/sre/server.go | 762 ++++++++++++++++++++++++++ internal/workflow/coordinator.go | 2 + internal/workflow/workflow_manager.go | 5 +- 5 files changed, 804 insertions(+), 6 deletions(-) create mode 100644 internal/sre/server.go diff --git a/cmd/main.go b/cmd/main.go index 309d579..21b1b57 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,6 +17,7 @@ import ( kagentv1alpha2 "github.com/kagent-dev/khook/api/v1alpha2" kclient "github.com/kagent-dev/khook/internal/client" "github.com/kagent-dev/khook/internal/config" + "github.com/kagent-dev/khook/internal/sre" "github.com/kagent-dev/khook/internal/workflow" ) @@ -76,8 +77,17 @@ func main() { os.Exit(1) } + // Start SRE-IDE server + sreServer := sre.NewServer(8082, mgr.GetClient()) + go func() { + ctx := context.Background() + if err := sreServer.Start(ctx); err != nil { + setupLog.Error(err, "problem running SRE-IDE server") + } + }() + // Add workflow coordinator to manage hooks and event processing - if err := mgr.Add(newWorkflowCoordinator(mgr)); err != nil { + if err := mgr.Add(newWorkflowCoordinator(mgr, sreServer)); err != nil { setupLog.Error(err, "unable to add workflow coordinator") os.Exit(1) } @@ -91,11 +101,15 @@ func main() { // workflowCoordinator manages the complete workflow lifecycle using proper services type workflowCoordinator struct { - mgr ctrl.Manager + mgr ctrl.Manager + sreServer *sre.Server } -func newWorkflowCoordinator(mgr ctrl.Manager) *workflowCoordinator { - return &workflowCoordinator{mgr: mgr} +func newWorkflowCoordinator(mgr ctrl.Manager, sreServer *sre.Server) *workflowCoordinator { + return &workflowCoordinator{ + mgr: mgr, + sreServer: sreServer, + } } func (w *workflowCoordinator) NeedLeaderElection() bool { return true } @@ -121,7 +135,7 @@ func (w *workflowCoordinator) Start(ctx context.Context) error { // Create workflow coordinator eventRecorder := w.mgr.GetEventRecorderFor("khook") - coordinator := workflow.NewCoordinator(k8s, w.mgr.GetClient(), kagentCli, eventRecorder) + coordinator := workflow.NewCoordinator(k8s, w.mgr.GetClient(), kagentCli, eventRecorder, w.sreServer) // Start the coordinator return coordinator.Start(ctx) diff --git a/internal/pipeline/processor.go b/internal/pipeline/processor.go index 288962e..dd017b0 100644 --- a/internal/pipeline/processor.go +++ b/internal/pipeline/processor.go @@ -14,6 +14,7 @@ import ( "github.com/kagent-dev/khook/api/v1alpha2" "github.com/kagent-dev/khook/internal/interfaces" + "github.com/kagent-dev/khook/internal/sre" ) // Processor handles the complete event processing pipeline @@ -22,6 +23,7 @@ type Processor struct { deduplicationManager interfaces.DeduplicationManager kagentClient interfaces.KagentClient statusManager interfaces.StatusManager + sreServer interface{} logger logr.Logger } @@ -31,12 +33,14 @@ func NewProcessor( deduplicationManager interfaces.DeduplicationManager, kagentClient interfaces.KagentClient, statusManager interfaces.StatusManager, + sreServer interface{}, ) *Processor { return &Processor{ eventWatcher: eventWatcher, deduplicationManager: deduplicationManager, kagentClient: kagentClient, statusManager: statusManager, + sreServer: sreServer, logger: log.Log.WithName("event-processor"), } } @@ -167,6 +171,19 @@ func (p *Processor) processEventMatch(ctx context.Context, match EventMatch) err // Continue even if status recording fails } + // Add alert to SRE server if available + p.logger.Info("Checking SRE server integration", "sreServer", p.sreServer != nil) + if p.sreServer != nil { + if sreServer, ok := p.sreServer.(*sre.Server); ok { + // Convert event to alert and add to SRE server + alert := sre.ConvertEventToAlert(match.Event, match.Hook, agentRef, response) + sreServer.AddAlert(alert) + p.logger.Info("Added alert to SRE server", "alertId", alert.ID) + } else { + p.logger.Error(nil, "Type assertion failed for SRE server", "sreServerType", fmt.Sprintf("%T", p.sreServer)) + } + } + // Mark event as notified to suppress re-sending within suppression window p.deduplicationManager.MarkNotified(hookRef, match.Event) diff --git a/internal/sre/server.go b/internal/sre/server.go new file mode 100644 index 0000000..bddd15b --- /dev/null +++ b/internal/sre/server.go @@ -0,0 +1,762 @@ +package sre + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kagent-dev/khook/api/v1alpha2" + "github.com/kagent-dev/khook/internal/interfaces" + "k8s.io/apimachinery/pkg/types" +) + +// Alert represents an alert for SRE-IDE +type Alert struct { + ID string `json:"id"` + HookName string `json:"hookName"` + Namespace string `json:"namespace"` + EventType string `json:"eventType"` + ResourceName string `json:"resourceName"` + Severity string `json:"severity"` + Status string `json:"status"` + FirstSeen string `json:"firstSeen"` + LastSeen string `json:"lastSeen"` + Message string `json:"message"` + AgentID string `json:"agentId"` + SessionID *string `json:"sessionId,omitempty"` + TaskID *string `json:"taskId,omitempty"` + RemediationStatus *string `json:"remediationStatus,omitempty"` +} + +// AlertSummary represents alert statistics +type AlertSummary struct { + Total int `json:"total"` + Firing int `json:"firing"` + Resolved int `json:"resolved"` + Acknowledged int `json:"acknowledged"` + BySeverity map[string]int `json:"bySeverity"` + ByEventType map[string]int `json:"byEventType"` +} + +// Server provides HTTP API for SRE-IDE integration +type Server struct { + port int + logger logr.Logger + alerts map[string]*Alert + mu sync.RWMutex + clients map[chan Alert]bool + clientsMu sync.RWMutex + client client.Client +} + +// NewServer creates a new SRE-IDE server +func NewServer(port int, client client.Client) *Server { + return &Server{ + port: port, + logger: log.Log.WithName("sre-server"), + alerts: make(map[string]*Alert), + clients: make(map[chan Alert]bool), + client: client, + } +} + +// Start starts the HTTP server +func (s *Server) Start(ctx context.Context) error { + mux := http.NewServeMux() + + // API v1 endpoints + mux.HandleFunc("/api/v1/events", s.handleEvents) + mux.HandleFunc("/api/v1/events/", s.handleEventActions) + mux.HandleFunc("/api/v1/stats/events/summary", s.handleEventSummary) + mux.HandleFunc("/api/v1/stats/events/by-type", s.handleEventStatsByType) + mux.HandleFunc("/api/v1/events/stream", s.handleEventStream) + + // Hooks endpoints + mux.HandleFunc("/api/v1/hooks", s.handleHooks) + mux.HandleFunc("/api/v1/hooks/", s.handleHookActions) + + // Health and diagnostics + mux.HandleFunc("/api/v1/health", s.handleHealth) + mux.HandleFunc("/api/v1/diagnostics", s.handleDiagnostics) + mux.HandleFunc("/api/v1/metrics", s.handleMetrics) + + // Legacy endpoints for backward compatibility + mux.HandleFunc("/api/alerts", s.handleAlerts) + mux.HandleFunc("/api/alerts/summary", s.handleAlertSummary) + mux.HandleFunc("/api/alerts/stream", s.handleAlertStream) + mux.HandleFunc("/api/alerts/", s.handleAlertActions) + mux.HandleFunc("/api/hooks", s.handleHooks) + mux.HandleFunc("/api/hooks/", s.handleHookActions) + mux.HandleFunc("/health", s.handleHealth) + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", s.port), + Handler: mux, + } + + s.logger.Info("Starting SRE-IDE server", "port", s.port) + + // Start server in goroutine + go func() { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + s.logger.Error(err, "SRE-IDE server failed") + } + }() + + // Wait for context cancellation + <-ctx.Done() + + // Graceful shutdown + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return server.Shutdown(shutdownCtx) +} + +// AddAlert adds or updates an alert +func (s *Server) AddAlert(alert *Alert) { + s.mu.Lock() + defer s.mu.Unlock() + + // Update timestamps + now := time.Now().Format(time.RFC3339) + if existing, exists := s.alerts[alert.ID]; exists { + alert.FirstSeen = existing.FirstSeen + alert.LastSeen = now + } else { + alert.FirstSeen = now + alert.LastSeen = now + } + + s.alerts[alert.ID] = alert + + // Broadcast to streaming clients + s.broadcastAlert(*alert) + + s.logger.Info("Alert added/updated", "id", alert.ID, "eventType", alert.EventType, "status", alert.Status) +} + +// UpdateAlertStatus updates an alert's status +func (s *Server) UpdateAlertStatus(alertID, status string) error { + s.mu.Lock() + defer s.mu.Unlock() + + alert, exists := s.alerts[alertID] + if !exists { + return fmt.Errorf("alert not found: %s", alertID) + } + + alert.Status = status + alert.LastSeen = time.Now().Format(time.RFC3339) + + // Broadcast update + s.broadcastAlert(*alert) + + s.logger.Info("Alert status updated", "id", alertID, "status", status) + return nil +} + +// broadcastAlert sends alert to all streaming clients +func (s *Server) broadcastAlert(alert Alert) { + s.clientsMu.RLock() + defer s.clientsMu.RUnlock() + + for client := range s.clients { + select { + case client <- alert: + default: + // Client channel is full, skip + } + } +} + +// handleAlerts handles GET /api/alerts +func (s *Server) handleAlerts(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + s.mu.RLock() + alerts := make([]Alert, 0, len(s.alerts)) + for _, alert := range s.alerts { + alerts = append(alerts, *alert) + } + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "data": alerts, + }) +} + +// handleAlertSummary handles GET /api/alerts/summary +func (s *Server) handleAlertSummary(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + s.mu.RLock() + summary := s.calculateSummary() + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "data": summary, + }) +} + +// handleAlertStream handles GET /api/alerts/stream (SSE) +func (s *Server) handleAlertStream(w http.ResponseWriter, r *http.Request) { + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Cache-Control") + + // Create client channel + clientChan := make(chan Alert, 100) + + s.clientsMu.Lock() + s.clients[clientChan] = true + s.clientsMu.Unlock() + + // Send initial data + s.mu.RLock() + for _, alert := range s.alerts { + select { + case clientChan <- *alert: + default: + } + } + s.mu.RUnlock() + + // Send heartbeat + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + // Cleanup function + defer func() { + s.clientsMu.Lock() + delete(s.clients, clientChan) + s.clientsMu.Unlock() + close(clientChan) + }() + + for { + select { + case alert := <-clientChan: + data, _ := json.Marshal(alert) + fmt.Fprintf(w, "event: alert\ndata: %s\n\n", data) + w.(http.Flusher).Flush() + + case <-ticker.C: + fmt.Fprintf(w, "event: heartbeat\ndata: %s\n\n", time.Now().Format(time.RFC3339)) + w.(http.Flusher).Flush() + + case <-r.Context().Done(): + return + } + } +} + +// handleAlertActions handles POST /api/alerts/{id}/{action} +func (s *Server) handleAlertActions(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse URL path: /api/alerts/{id}/{action} + path := r.URL.Path + if len(path) < len("/api/alerts/") { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + + // Extract alert ID and action + parts := path[len("/api/alerts/"):] + if len(parts) == 0 { + http.Error(w, "Missing alert ID", http.StatusBadRequest) + return + } + + // Find the last slash to separate ID and action + lastSlash := -1 + for i := len(parts) - 1; i >= 0; i-- { + if parts[i] == '/' { + lastSlash = i + break + } + } + + if lastSlash == -1 { + http.Error(w, "Missing action", http.StatusBadRequest) + return + } + + alertID := parts[:lastSlash] + action := parts[lastSlash+1:] + + var status string + switch action { + case "acknowledge": + status = "acknowledged" + case "resolve": + status = "resolved" + default: + http.Error(w, "Invalid action", http.StatusBadRequest) + return + } + + if err := s.UpdateAlertStatus(alertID, status); err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "success"}) +} + +// handleHealth handles GET /health +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "healthy"}) +} + +// calculateSummary calculates alert statistics +func (s *Server) calculateSummary() AlertSummary { + summary := AlertSummary{ + BySeverity: make(map[string]int), + ByEventType: make(map[string]int), + } + + for _, alert := range s.alerts { + summary.Total++ + + switch alert.Status { + case "firing": + summary.Firing++ + case "resolved": + summary.Resolved++ + case "acknowledged": + summary.Acknowledged++ + } + + summary.BySeverity[alert.Severity]++ + summary.ByEventType[alert.EventType]++ + } + + return summary +} + +// convertAgentIDFormat converts agent ID from kagent/k8s-agent to kagent__NS__k8s_agent format +// This matches the Python identifier format used by kagent +func convertAgentIDFormat(agentRef string) string { + // Parse the agent reference (e.g., "kagent/k8s-agent") + parts := strings.Split(agentRef, "/") + if len(parts) != 2 { + // If not in namespace/name format, assume it's just the name + return fmt.Sprintf("kagent__NS__%s", agentRef) + } + + namespace := parts[0] + agentName := parts[1] + + // Convert from namespace/name to namespace__NS__name format + // This matches the ConvertToPythonIdentifier function in kagent + return fmt.Sprintf("%s__NS__%s", namespace, agentName) +} + +// ConvertEventToAlert converts a khook event to an SRE-IDE alert +func ConvertEventToAlert( + event interfaces.Event, + hook *v1alpha2.Hook, + agentRef types.NamespacedName, + response *interfaces.AgentResponse, +) *Alert { + // Generate unique alert ID + alertID := fmt.Sprintf("%s-%s-%s-%s", + hook.Namespace, hook.Name, event.Type, event.ResourceName) + + // Determine severity based on event type + severity := "medium" + switch event.Type { + case "pod-restart": + severity = "high" + case "oom-kill": + severity = "critical" + case "probe-failed": + severity = "high" + case "pod-pending": + severity = "medium" + } + + // Determine remediation status + remediationStatus := "pending" + if response != nil && response.RequestId != "" { + remediationStatus = "in_progress" + } + + // Convert agent ID format from kagent/k8s-agent to kagent__NS__k8s_agent + agentID := convertAgentIDFormat(agentRef.Name) + + alert := &Alert{ + ID: alertID, + HookName: hook.Name, + Namespace: hook.Namespace, + EventType: event.Type, + ResourceName: event.ResourceName, + Severity: severity, + Status: "firing", + Message: event.Message, + AgentID: agentID, + RemediationStatus: &remediationStatus, + } + + // Add session/task info if available + if response != nil && response.RequestId != "" { + alert.TaskID = &response.RequestId + } + + return alert +} + +// syncActiveEventsWithAlerts creates alerts for all active events in hooks +func (s *Server) syncActiveEventsWithAlerts(hookList *v1alpha2.HookList) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, hook := range hookList.Items { + if hook.Status.ActiveEvents == nil { + continue + } + + for _, activeEvent := range hook.Status.ActiveEvents { + // Only create alerts for firing events + if activeEvent.Status != "firing" { + continue + } + + // Create alert ID based on hook and event + alertID := fmt.Sprintf("%s-%s-%s-%s", + hook.Name, + activeEvent.EventType, + activeEvent.ResourceName, + activeEvent.FirstSeen.Format("20060102150405")) + + // Check if alert already exists + if _, exists := s.alerts[alertID]; exists { + continue + } + + // Find the matching event configuration + var agentID string + for _, config := range hook.Spec.EventConfigurations { + if config.EventType == activeEvent.EventType { + // Convert agent ID format from kagent/k8s-agent to kagent__NS__k8s_agent + agentID = convertAgentIDFormat(config.AgentRef.Name) + break + } + } + + if agentID == "" { + continue + } + + // Create alert from active event + alert := Alert{ + ID: alertID, + HookName: hook.Name, + Namespace: hook.Namespace, + EventType: activeEvent.EventType, + ResourceName: activeEvent.ResourceName, + Severity: "medium", // Default severity + Status: "firing", + FirstSeen: activeEvent.FirstSeen.Format(time.RFC3339), + LastSeen: activeEvent.LastSeen.Format(time.RFC3339), + Message: fmt.Sprintf("%s event for %s", activeEvent.EventType, activeEvent.ResourceName), + AgentID: agentID, + } + + // Add alert to server + s.alerts[alertID] = &alert + } + } +} + +// handleHooks handles GET /api/hooks +func (s *Server) handleHooks(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet { + // Query the Kubernetes API for Hook resources + var hookList v1alpha2.HookList + if err := s.client.List(context.Background(), &hookList); err != nil { + s.logger.Error(err, "Failed to list hooks") + http.Error(w, "Failed to list hooks", http.StatusInternalServerError) + return + } + + // Sync active events with alerts + s.syncActiveEventsWithAlerts(&hookList) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(hookList) + return + } + + if r.Method == http.MethodPost { + // Create hook - for now, just return success + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "success"}) + return + } + + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) +} + +// handleHookActions handles /api/hooks/{namespace}/{name} and /api/hooks/{id} +func (s *Server) handleHookActions(w http.ResponseWriter, r *http.Request) { + // For now, return 404 for all hook actions + // In a real implementation, this would handle CRUD operations on Hook resources + http.Error(w, "Hook management not implemented", http.StatusNotImplemented) +} + +// handleEvents handles GET /api/v1/events with query parameters +func (s *Server) handleEvents(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse query parameters + query := r.URL.Query() + namespace := query.Get("namespace") + eventType := query.Get("eventType") + resourceName := query.Get("resourceName") + status := query.Get("status") + + s.mu.RLock() + alerts := make([]Alert, 0, len(s.alerts)) + for _, alert := range s.alerts { + // Apply filters + if namespace != "" && alert.Namespace != namespace { + continue + } + if eventType != "" && alert.EventType != eventType { + continue + } + if resourceName != "" && alert.ResourceName != resourceName { + continue + } + if status != "" && alert.Status != status { + continue + } + alerts = append(alerts, *alert) + } + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "data": alerts, + "total": len(alerts), + }) +} + +// handleEventActions handles /api/v1/events/{namespace}/{name}/events +func (s *Server) handleEventActions(w http.ResponseWriter, r *http.Request) { + // Parse path to extract namespace and name + path := r.URL.Path + if len(path) < len("/api/v1/events/") { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + + parts := strings.Split(path[len("/api/v1/events/"):], "/") + if len(parts) < 2 { + http.Error(w, "Missing namespace or name", http.StatusBadRequest) + return + } + + namespace := parts[0] + hookName := parts[1] + + // Filter alerts by namespace and hook name + s.mu.RLock() + alerts := make([]Alert, 0) + for _, alert := range s.alerts { + if alert.Namespace == namespace && alert.HookName == hookName { + alerts = append(alerts, *alert) + } + } + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "data": alerts, + "total": len(alerts), + }) +} + +// handleEventSummary handles GET /api/v1/stats/events/summary +func (s *Server) handleEventSummary(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + s.mu.RLock() + summary := s.calculateSummary() + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "summary": summary, + }) +} + +// handleEventStatsByType handles GET /api/v1/stats/events/by-type +func (s *Server) handleEventStatsByType(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + s.mu.RLock() + byType := make(map[string]map[string]interface{}) + total := 0 + for _, alert := range s.alerts { + total++ + if byType[alert.EventType] == nil { + byType[alert.EventType] = make(map[string]interface{}) + byType[alert.EventType]["count"] = 0 + } + byType[alert.EventType]["count"] = byType[alert.EventType]["count"].(int) + 1 + } + + // Calculate percentages + for eventType, stats := range byType { + count := stats["count"].(int) + percentage := float64(count) / float64(total) * 100 + byType[eventType]["percentage"] = percentage + } + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "byType": byType, + }) +} + +// handleEventStream handles GET /api/v1/events/stream (SSE) +func (s *Server) handleEventStream(w http.ResponseWriter, r *http.Request) { + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Cache-Control") + + // Create client channel + clientChan := make(chan Alert, 100) + + s.clientsMu.Lock() + s.clients[clientChan] = true + s.clientsMu.Unlock() + + // Send initial data + s.mu.RLock() + for _, alert := range s.alerts { + select { + case clientChan <- *alert: + default: + } + } + s.mu.RUnlock() + + // Send heartbeat + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + // Cleanup function + defer func() { + s.clientsMu.Lock() + delete(s.clients, clientChan) + s.clientsMu.Unlock() + close(clientChan) + }() + + for { + select { + case alert := <-clientChan: + data, _ := json.Marshal(alert) + fmt.Fprintf(w, "event: event\ndata: %s\n\n", data) + w.(http.Flusher).Flush() + + case <-ticker.C: + fmt.Fprintf(w, "event: heartbeat\ndata: %s\n\n", time.Now().Format(time.RFC3339)) + w.(http.Flusher).Flush() + + case <-r.Context().Done(): + return + } + } +} + +// handleDiagnostics handles GET /api/v1/diagnostics +func (s *Server) handleDiagnostics(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + diagnostics := map[string]interface{}{ + "api_server_status": "running", + "uptime": time.Since(time.Now()).String(), // This would need to track actual start time + "event_processing_pipeline_health": "healthy", + "kagent_api_connectivity": "unknown", // Would need to check actual connectivity + "plugin_status": map[string]string{ + "kubernetes_events": "active", + }, + "memory_usage": map[string]interface{}{ + "alerts_count": len(s.alerts), + "active_connections": len(s.clients), + }, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(diagnostics) +} + +// handleMetrics handles GET /api/v1/metrics +func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + s.mu.RLock() + summary := s.calculateSummary() + s.mu.RUnlock() + + metrics := map[string]interface{}{ + "khook_events_total": summary.Total, + "khook_active_events": summary.Firing, + "khook_resolved_events": summary.Resolved, + "khook_acknowledged_events": summary.Acknowledged, + "khook_events_by_type": summary.ByEventType, + "khook_events_by_severity": summary.BySeverity, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(metrics) +} diff --git a/internal/workflow/coordinator.go b/internal/workflow/coordinator.go index f70c444..24e032e 100644 --- a/internal/workflow/coordinator.go +++ b/internal/workflow/coordinator.go @@ -31,6 +31,7 @@ func NewCoordinator( ctrlClient client.Client, kagentClient interfaces.KagentClient, eventRecorder interfaces.EventRecorder, + sreServer interface{}, ) *Coordinator { dedupManager := deduplication.NewManager() statusManager := status.NewManager(ctrlClient, eventRecorder) @@ -43,6 +44,7 @@ func NewCoordinator( kagentClient, statusManager, eventRecorder, + sreServer, ) return &Coordinator{ diff --git a/internal/workflow/workflow_manager.go b/internal/workflow/workflow_manager.go index 96521ed..a7dc2ad 100644 --- a/internal/workflow/workflow_manager.go +++ b/internal/workflow/workflow_manager.go @@ -24,6 +24,7 @@ type WorkflowManager struct { kagentClient interfaces.KagentClient statusManager interfaces.StatusManager eventRecorder interfaces.EventRecorder + sreServer interface{} logger logr.Logger } @@ -35,6 +36,7 @@ func NewWorkflowManager( kagentClient interfaces.KagentClient, statusManager interfaces.StatusManager, eventRecorder interfaces.EventRecorder, + sreServer interface{}, ) *WorkflowManager { return &WorkflowManager{ k8sClient: k8sClient, @@ -43,6 +45,7 @@ func NewWorkflowManager( kagentClient: kagentClient, statusManager: statusManager, eventRecorder: eventRecorder, + sreServer: sreServer, logger: log.Log.WithName("workflow-manager"), } } @@ -101,7 +104,7 @@ func (wm *WorkflowManager) runNamespaceWorkflow( wm.logger.Info("Namespace workflow started", "namespace", namespace) watcher := event.NewWatcher(wm.k8sClient, namespace) - processor := pipeline.NewProcessor(watcher, wm.dedupManager, wm.kagentClient, wm.statusManager) + processor := pipeline.NewProcessor(watcher, wm.dedupManager, wm.kagentClient, wm.statusManager, wm.sreServer) if err := processor.ProcessEventWorkflow(ctx, eventTypes, hooks); err != nil { wm.logger.Error(err, "Namespace workflow exited with error", "namespace", namespace) From 28dc87bc402bdf2b1f52e502e5275fcd5518c971 Mon Sep 17 00:00:00 2001 From: Denis Tu Date: Fri, 19 Sep 2025 14:46:25 +0300 Subject: [PATCH 2/5] added promised api and missing chart deps Signed-off-by: Denis Tu Signed-off-by: Denis Tu --- api/v1alpha2/hook_types.go | 10 +- go.mod | 1 + go.sum | 2 + helm/khook/templates/deployment.yaml | 5 + helm/khook/templates/sre-service.yaml | 23 + helm/khook/values.yaml | 9 + internal/deduplication/manager.go | 2 +- internal/pipeline/processor.go | 39 +- internal/sre/server.go | 1089 ++++++++++++++++++++++--- internal/workflow/coordinator.go | 10 + 10 files changed, 1047 insertions(+), 143 deletions(-) create mode 100644 helm/khook/templates/sre-service.yaml diff --git a/api/v1alpha2/hook_types.go b/api/v1alpha2/hook_types.go index 8c64713..9a6e2b2 100644 --- a/api/v1alpha2/hook_types.go +++ b/api/v1alpha2/hook_types.go @@ -29,9 +29,13 @@ type EventConfiguration struct { // +kubebuilder:validation:Required EventType string `json:"eventType"` - // AgentRef specifies the Kagent agent to call when this event occurs - // +kubebuilder:validation:Required - AgentRef ObjectReference `json:"agentRef"` + // AgentRef specifies the Kagent agent to call when this event occurs (new format) + // +kubebuilder:validation:Optional + AgentRef ObjectReference `json:"agentRef,omitempty"` + + // AgentId specifies the Kagent agent to call when this event occurs (legacy format) + // +kubebuilder:validation:Optional + AgentId string `json:"agentId,omitempty"` // Prompt specifies the prompt template to send to the agent // +kubebuilder:validation:Required diff --git a/go.mod b/go.mod index 595ce6b..d03f14c 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/google/gnostic-models v0.7.0 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgx/v5 v5.6.0 // indirect diff --git a/go.sum b/go.sum index 3cfd1c1..0fb7bb1 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,8 @@ github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a h1://KbezygeMJZCSHH+H github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/helm/khook/templates/deployment.yaml b/helm/khook/templates/deployment.yaml index ddc9cb6..3eec0c3 100644 --- a/helm/khook/templates/deployment.yaml +++ b/helm/khook/templates/deployment.yaml @@ -64,6 +64,11 @@ spec: - name: health containerPort: 8081 protocol: TCP + {{- if .Values.sre.enabled }} + - name: sre-server + containerPort: {{ .Values.sre.port }} + protocol: TCP + {{- end }} livenessProbe: {{- toYaml .Values.healthCheck.livenessProbe | nindent 10 }} readinessProbe: diff --git a/helm/khook/templates/sre-service.yaml b/helm/khook/templates/sre-service.yaml new file mode 100644 index 0000000..0a2058e --- /dev/null +++ b/helm/khook/templates/sre-service.yaml @@ -0,0 +1,23 @@ +{{- if .Values.sre.enabled }} +apiVersion: v1 +kind: Service +metadata: + name: {{ include "khook.fullname" . }}-sre + namespace: {{ include "khook.namespace" . }} + labels: + {{- include "khook.labels" . | nindent 4 }} + app.kubernetes.io/component: sre-server + {{- with .Values.sre.service.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + type: {{ .Values.sre.service.type }} + ports: + - name: sre-server + port: {{ .Values.sre.service.port }} + targetPort: sre-server + protocol: TCP + selector: + {{- include "khook.selectorLabels" . | nindent 4 }} +{{- end }} diff --git a/helm/khook/values.yaml b/helm/khook/values.yaml index fbe6e81..fe5e4fc 100644 --- a/helm/khook/values.yaml +++ b/helm/khook/values.yaml @@ -85,6 +85,15 @@ metrics: labels: {} annotations: {} +# SRE API Server configuration +sre: + enabled: true + port: 8082 + service: + type: ClusterIP + port: 8082 + annotations: {} + # RBAC configuration rbac: create: true diff --git a/internal/deduplication/manager.go b/internal/deduplication/manager.go index fe57ae5..4f17fc3 100644 --- a/internal/deduplication/manager.go +++ b/internal/deduplication/manager.go @@ -14,7 +14,7 @@ const ( // EventTimeoutDuration is the duration after which events are considered resolved EventTimeoutDuration = 10 * time.Minute // NotificationSuppressionDuration is the window to suppress re-sending after success - NotificationSuppressionDuration = 10 * time.Minute + NotificationSuppressionDuration = 30 * time.Second // StatusFiring indicates an event is currently active StatusFiring = "firing" diff --git a/internal/pipeline/processor.go b/internal/pipeline/processor.go index dd017b0..63c7743 100644 --- a/internal/pipeline/processor.go +++ b/internal/pipeline/processor.go @@ -137,13 +137,38 @@ func (p *Processor) processEventMatch(ctx context.Context, match EventMatch) err return fmt.Errorf("failed to record event in deduplication manager: %w", err) } - agentRefNs := match.Hook.Namespace - if match.Configuration.AgentRef.Namespace != nil { - agentRefNs = *match.Configuration.AgentRef.Namespace - } - agentRef := types.NamespacedName{ - Name: match.Configuration.AgentRef.Name, - Namespace: agentRefNs, + // Handle both agentId (legacy) and agentRef (new) formats + var agentRef types.NamespacedName + if match.Configuration.AgentRef.Name != "" { + // New format: agentRef + agentRefNs := match.Hook.Namespace + if match.Configuration.AgentRef.Namespace != nil { + agentRefNs = *match.Configuration.AgentRef.Namespace + } + agentRef = types.NamespacedName{ + Name: match.Configuration.AgentRef.Name, + Namespace: agentRefNs, + } + } else { + // Legacy format: agentId (parse "namespace/name" format) + agentId := match.Configuration.AgentId + if agentId == "" { + return fmt.Errorf("neither agentRef.name nor agentId is specified") + } + + // Parse agentId format: "namespace/name" or just "name" + parts := strings.Split(agentId, "/") + if len(parts) == 2 { + agentRef = types.NamespacedName{ + Name: parts[1], + Namespace: parts[0], + } + } else { + agentRef = types.NamespacedName{ + Name: parts[0], + Namespace: match.Hook.Namespace, + } + } } // Record that the event is firing diff --git a/internal/sre/server.go b/internal/sre/server.go index bddd15b..3f63215 100644 --- a/internal/sre/server.go +++ b/internal/sre/server.go @@ -5,13 +5,15 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "strings" "sync" "time" "github.com/go-logr/logr" - "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/gorilla/websocket" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" "github.com/kagent-dev/khook/api/v1alpha2" "github.com/kagent-dev/khook/internal/interfaces" @@ -20,74 +22,91 @@ import ( // Alert represents an alert for SRE-IDE type Alert struct { - ID string `json:"id"` - HookName string `json:"hookName"` - Namespace string `json:"namespace"` - EventType string `json:"eventType"` - ResourceName string `json:"resourceName"` - Severity string `json:"severity"` - Status string `json:"status"` - FirstSeen string `json:"firstSeen"` - LastSeen string `json:"lastSeen"` - Message string `json:"message"` - AgentID string `json:"agentId"` - SessionID *string `json:"sessionId,omitempty"` - TaskID *string `json:"taskId,omitempty"` - RemediationStatus *string `json:"remediationStatus,omitempty"` + ID string `json:"id"` + HookName string `json:"hookName"` + Namespace string `json:"namespace"` + EventType string `json:"eventType"` + ResourceName string `json:"resourceName"` + Severity string `json:"severity"` + Status string `json:"status"` + Timestamp time.Time `json:"timestamp"` + FirstSeen string `json:"firstSeen"` + LastSeen string `json:"lastSeen"` + Message string `json:"message"` + AgentID string `json:"agentId"` + SessionID *string `json:"sessionId,omitempty"` + TaskID *string `json:"taskId,omitempty"` + RemediationStatus *string `json:"remediationStatus,omitempty"` } // AlertSummary represents alert statistics type AlertSummary struct { - Total int `json:"total"` - Firing int `json:"firing"` - Resolved int `json:"resolved"` - Acknowledged int `json:"acknowledged"` - BySeverity map[string]int `json:"bySeverity"` - ByEventType map[string]int `json:"byEventType"` + Total int `json:"total"` + Firing int `json:"firing"` + Resolved int `json:"resolved"` + Acknowledged int `json:"acknowledged"` + BySeverity map[string]int `json:"bySeverity"` + ByEventType map[string]int `json:"byEventType"` } // Server provides HTTP API for SRE-IDE integration type Server struct { - port int - logger logr.Logger - alerts map[string]*Alert - mu sync.RWMutex - clients map[chan Alert]bool + port int + logger logr.Logger + alerts map[string]*Alert + mu sync.RWMutex + clients map[chan Alert]bool clientsMu sync.RWMutex - client client.Client + wsClients map[*websocket.Conn]bool + wsMu sync.RWMutex + client client.Client + startTime time.Time + upgrader websocket.Upgrader } // NewServer creates a new SRE-IDE server func NewServer(port int, client client.Client) *Server { return &Server{ - port: port, - logger: log.Log.WithName("sre-server"), - alerts: make(map[string]*Alert), - clients: make(map[chan Alert]bool), - client: client, + port: port, + logger: log.Log.WithName("sre-server"), + alerts: make(map[string]*Alert), + clients: make(map[chan Alert]bool), + wsClients: make(map[*websocket.Conn]bool), + client: client, + startTime: time.Now(), + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true // Allow all origins for now + }, + }, } } // Start starts the HTTP server func (s *Server) Start(ctx context.Context) error { mux := http.NewServeMux() - + // API v1 endpoints mux.HandleFunc("/api/v1/events", s.handleEvents) - mux.HandleFunc("/api/v1/events/", s.handleEventActions) + mux.HandleFunc("/api/v1/events/types/", s.handleEventTypes) + mux.HandleFunc("/api/v1/events/", s.handleEventsByNamespace) mux.HandleFunc("/api/v1/stats/events/summary", s.handleEventSummary) mux.HandleFunc("/api/v1/stats/events/by-type", s.handleEventStatsByType) + mux.HandleFunc("/api/v1/stats/hooks/", s.handleHookStats) + mux.HandleFunc("/api/v1/stats/trends", s.handleEventTrends) mux.HandleFunc("/api/v1/events/stream", s.handleEventStream) - + mux.HandleFunc("/api/v1/events/ws", s.handleWebSocket) + // Hooks endpoints mux.HandleFunc("/api/v1/hooks", s.handleHooks) + mux.HandleFunc("/api/v1/hooks/validate", s.handleHookValidation) mux.HandleFunc("/api/v1/hooks/", s.handleHookActions) - + // Health and diagnostics mux.HandleFunc("/api/v1/health", s.handleHealth) mux.HandleFunc("/api/v1/diagnostics", s.handleDiagnostics) mux.HandleFunc("/api/v1/metrics", s.handleMetrics) - + // Legacy endpoints for backward compatibility mux.HandleFunc("/api/alerts", s.handleAlerts) mux.HandleFunc("/api/alerts/summary", s.handleAlertSummary) @@ -95,37 +114,62 @@ func (s *Server) Start(ctx context.Context) error { mux.HandleFunc("/api/alerts/", s.handleAlertActions) mux.HandleFunc("/api/hooks", s.handleHooks) mux.HandleFunc("/api/hooks/", s.handleHookActions) + mux.HandleFunc("/api/health", s.handleHealth) mux.HandleFunc("/health", s.handleHealth) - + + // Add CORS middleware + handler := s.corsMiddleware(mux) + server := &http.Server{ Addr: fmt.Sprintf(":%d", s.port), - Handler: mux, + Handler: handler, } - + s.logger.Info("Starting SRE-IDE server", "port", s.port) - + // Start server in goroutine go func() { if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { s.logger.Error(err, "SRE-IDE server failed") } }() - + // Wait for context cancellation <-ctx.Done() - + // Graceful shutdown shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - + return server.Shutdown(shutdownCtx) } +// corsMiddleware adds CORS headers to all responses +func (s *Server) corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle preflight requests + if r.Method == "OPTIONS" { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") + w.WriteHeader(http.StatusOK) + return + } + + // Add CORS headers to all responses + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") + + next.ServeHTTP(w, r) + }) +} + // AddAlert adds or updates an alert func (s *Server) AddAlert(alert *Alert) { s.mu.Lock() defer s.mu.Unlock() - + // Update timestamps now := time.Now().Format(time.RFC3339) if existing, exists := s.alerts[alert.ID]; exists { @@ -135,12 +179,12 @@ func (s *Server) AddAlert(alert *Alert) { alert.FirstSeen = now alert.LastSeen = now } - + s.alerts[alert.ID] = alert - + // Broadcast to streaming clients s.broadcastAlert(*alert) - + s.logger.Info("Alert added/updated", "id", alert.ID, "eventType", alert.EventType, "status", alert.Status) } @@ -148,27 +192,26 @@ func (s *Server) AddAlert(alert *Alert) { func (s *Server) UpdateAlertStatus(alertID, status string) error { s.mu.Lock() defer s.mu.Unlock() - + alert, exists := s.alerts[alertID] if !exists { return fmt.Errorf("alert not found: %s", alertID) } - + alert.Status = status alert.LastSeen = time.Now().Format(time.RFC3339) - + // Broadcast update s.broadcastAlert(*alert) - + s.logger.Info("Alert status updated", "id", alertID, "status", status) return nil } // broadcastAlert sends alert to all streaming clients func (s *Server) broadcastAlert(alert Alert) { + // Broadcast to SSE clients s.clientsMu.RLock() - defer s.clientsMu.RUnlock() - for client := range s.clients { select { case client <- alert: @@ -176,6 +219,25 @@ func (s *Server) broadcastAlert(alert Alert) { // Client channel is full, skip } } + s.clientsMu.RUnlock() + + // Broadcast to WebSocket clients + s.wsMu.Lock() + defer s.wsMu.Unlock() + + alertJSON, err := json.Marshal(alert) + if err != nil { + s.logger.Error(err, "Failed to marshal alert for WebSocket broadcast") + return + } + + for conn := range s.wsClients { + if err := conn.WriteMessage(websocket.TextMessage, alertJSON); err != nil { + s.logger.Error(err, "Failed to send alert to WebSocket client") + delete(s.wsClients, conn) + conn.Close() + } + } } // handleAlerts handles GET /api/alerts @@ -184,14 +246,14 @@ func (s *Server) handleAlerts(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + s.mu.RLock() alerts := make([]Alert, 0, len(s.alerts)) for _, alert := range s.alerts { alerts = append(alerts, *alert) } s.mu.RUnlock() - + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "data": alerts, @@ -204,11 +266,11 @@ func (s *Server) handleAlertSummary(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + s.mu.RLock() summary := s.calculateSummary() s.mu.RUnlock() - + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "data": summary, @@ -223,14 +285,14 @@ func (s *Server) handleAlertStream(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Cache-Control") - + // Create client channel clientChan := make(chan Alert, 100) - + s.clientsMu.Lock() s.clients[clientChan] = true s.clientsMu.Unlock() - + // Send initial data s.mu.RLock() for _, alert := range s.alerts { @@ -240,11 +302,11 @@ func (s *Server) handleAlertStream(w http.ResponseWriter, r *http.Request) { } } s.mu.RUnlock() - + // Send heartbeat ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() - + // Cleanup function defer func() { s.clientsMu.Lock() @@ -252,18 +314,18 @@ func (s *Server) handleAlertStream(w http.ResponseWriter, r *http.Request) { s.clientsMu.Unlock() close(clientChan) }() - + for { select { case alert := <-clientChan: data, _ := json.Marshal(alert) fmt.Fprintf(w, "event: alert\ndata: %s\n\n", data) w.(http.Flusher).Flush() - + case <-ticker.C: fmt.Fprintf(w, "event: heartbeat\ndata: %s\n\n", time.Now().Format(time.RFC3339)) w.(http.Flusher).Flush() - + case <-r.Context().Done(): return } @@ -276,21 +338,21 @@ func (s *Server) handleAlertActions(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + // Parse URL path: /api/alerts/{id}/{action} path := r.URL.Path if len(path) < len("/api/alerts/") { http.Error(w, "Invalid path", http.StatusBadRequest) return } - + // Extract alert ID and action parts := path[len("/api/alerts/"):] if len(parts) == 0 { http.Error(w, "Missing alert ID", http.StatusBadRequest) return } - + // Find the last slash to separate ID and action lastSlash := -1 for i := len(parts) - 1; i >= 0; i-- { @@ -299,15 +361,15 @@ func (s *Server) handleAlertActions(w http.ResponseWriter, r *http.Request) { break } } - + if lastSlash == -1 { http.Error(w, "Missing action", http.StatusBadRequest) return } - + alertID := parts[:lastSlash] action := parts[lastSlash+1:] - + var status string switch action { case "acknowledge": @@ -318,12 +380,12 @@ func (s *Server) handleAlertActions(w http.ResponseWriter, r *http.Request) { http.Error(w, "Invalid action", http.StatusBadRequest) return } - + if err := s.UpdateAlertStatus(alertID, status); err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } - + w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]string{"status": "success"}) } @@ -340,10 +402,10 @@ func (s *Server) calculateSummary() AlertSummary { BySeverity: make(map[string]int), ByEventType: make(map[string]int), } - + for _, alert := range s.alerts { summary.Total++ - + switch alert.Status { case "firing": summary.Firing++ @@ -352,11 +414,11 @@ func (s *Server) calculateSummary() AlertSummary { case "acknowledged": summary.Acknowledged++ } - + summary.BySeverity[alert.Severity]++ summary.ByEventType[alert.EventType]++ } - + return summary } @@ -369,10 +431,10 @@ func convertAgentIDFormat(agentRef string) string { // If not in namespace/name format, assume it's just the name return fmt.Sprintf("kagent__NS__%s", agentRef) } - + namespace := parts[0] agentName := parts[1] - + // Convert from namespace/name to namespace__NS__name format // This matches the ConvertToPythonIdentifier function in kagent return fmt.Sprintf("%s__NS__%s", namespace, agentName) @@ -386,9 +448,9 @@ func ConvertEventToAlert( response *interfaces.AgentResponse, ) *Alert { // Generate unique alert ID - alertID := fmt.Sprintf("%s-%s-%s-%s", + alertID := fmt.Sprintf("%s-%s-%s-%s", hook.Namespace, hook.Name, event.Type, event.ResourceName) - + // Determine severity based on event type severity := "medium" switch event.Type { @@ -401,16 +463,16 @@ func ConvertEventToAlert( case "pod-pending": severity = "medium" } - + // Determine remediation status remediationStatus := "pending" if response != nil && response.RequestId != "" { remediationStatus = "in_progress" } - - // Convert agent ID format from kagent/k8s-agent to kagent__NS__k8s_agent - agentID := convertAgentIDFormat(agentRef.Name) - + + // Use agent name directly for SRE-IDE compatibility + agentID := agentRef.Name + alert := &Alert{ ID: alertID, HookName: hook.Name, @@ -419,16 +481,17 @@ func ConvertEventToAlert( ResourceName: event.ResourceName, Severity: severity, Status: "firing", + Timestamp: time.Now(), Message: event.Message, AgentID: agentID, RemediationStatus: &remediationStatus, } - + // Add session/task info if available if response != nil && response.RequestId != "" { alert.TaskID = &response.RequestId } - + return alert } @@ -509,8 +572,11 @@ func (s *Server) handleHooks(w http.ResponseWriter, r *http.Request) { // Sync active events with alerts s.syncActiveEventsWithAlerts(&hookList) + // Return the full Kubernetes Hook objects as expected by SRE-IDE w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(hookList) + json.NewEncoder(w).Encode(map[string]interface{}{ + "items": hookList.Items, + }) return } @@ -524,11 +590,158 @@ func (s *Server) handleHooks(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } -// handleHookActions handles /api/hooks/{namespace}/{name} and /api/hooks/{id} +// handleHookActions handles /api/v1/hooks/{namespace}/{name} and /api/v1/hooks/validate func (s *Server) handleHookActions(w http.ResponseWriter, r *http.Request) { - // For now, return 404 for all hook actions - // In a real implementation, this would handle CRUD operations on Hook resources - http.Error(w, "Hook management not implemented", http.StatusNotImplemented) + path := r.URL.Path + + // Handle validation endpoint + if strings.HasSuffix(path, "/validate") { + s.handleHookValidation(w, r) + return + } + + // Parse path to extract namespace and name + if len(path) < len("/api/v1/hooks/") { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + + parts := strings.Split(path[len("/api/v1/hooks/"):], "/") + if len(parts) < 2 { + http.Error(w, "Missing namespace or name", http.StatusBadRequest) + return + } + + namespace := parts[0] + hookName := parts[1] + + switch r.Method { + case http.MethodGet: + s.handleGetHook(w, r, namespace, hookName) + case http.MethodPut: + s.handleUpdateHook(w, r, namespace, hookName) + case http.MethodDelete: + s.handleDeleteHook(w, r, namespace, hookName) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +// handleGetHook handles GET /api/v1/hooks/{namespace}/{name} +func (s *Server) handleGetHook(w http.ResponseWriter, r *http.Request, namespace, name string) { + var hook v1alpha2.Hook + key := types.NamespacedName{Namespace: namespace, Name: name} + + if err := s.client.Get(context.Background(), key, &hook); err != nil { + s.logger.Error(err, "Failed to get hook", "namespace", namespace, "name", name) + http.Error(w, "Hook not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(hook) +} + +// handleUpdateHook handles PUT /api/v1/hooks/{namespace}/{name} +func (s *Server) handleUpdateHook(w http.ResponseWriter, r *http.Request, namespace, name string) { + var hook v1alpha2.Hook + key := types.NamespacedName{Namespace: namespace, Name: name} + + // Get existing hook + if err := s.client.Get(context.Background(), key, &hook); err != nil { + s.logger.Error(err, "Failed to get hook for update", "namespace", namespace, "name", name) + http.Error(w, "Hook not found", http.StatusNotFound) + return + } + + // Parse request body + var updateHook v1alpha2.Hook + if err := json.NewDecoder(r.Body).Decode(&updateHook); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Update the spec + hook.Spec = updateHook.Spec + + // Update the hook + if err := s.client.Update(context.Background(), &hook); err != nil { + s.logger.Error(err, "Failed to update hook", "namespace", namespace, "name", name) + http.Error(w, "Failed to update hook", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(hook) +} + +// handleDeleteHook handles DELETE /api/v1/hooks/{namespace}/{name} +func (s *Server) handleDeleteHook(w http.ResponseWriter, r *http.Request, namespace, name string) { + var hook v1alpha2.Hook + key := types.NamespacedName{Namespace: namespace, Name: name} + + // Get existing hook + if err := s.client.Get(context.Background(), key, &hook); err != nil { + s.logger.Error(err, "Failed to get hook for deletion", "namespace", namespace, "name", name) + http.Error(w, "Hook not found", http.StatusNotFound) + return + } + + // Delete the hook + if err := s.client.Delete(context.Background(), &hook); err != nil { + s.logger.Error(err, "Failed to delete hook", "namespace", namespace, "name", name) + http.Error(w, "Failed to delete hook", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// handleHookValidation handles POST /api/v1/hooks/validate +func (s *Server) handleHookValidation(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var hook v1alpha2.Hook + if err := json.NewDecoder(r.Body).Decode(&hook); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Basic validation + validationResult := map[string]interface{}{ + "valid": true, + "errors": []string{}, + } + + // Validate event configurations + if len(hook.Spec.EventConfigurations) == 0 { + validationResult["valid"] = false + validationResult["errors"] = append(validationResult["errors"].([]string), "At least one event configuration is required") + } + + for i, config := range hook.Spec.EventConfigurations { + if config.EventType == "" { + validationResult["valid"] = false + validationResult["errors"] = append(validationResult["errors"].([]string), + fmt.Sprintf("Event configuration %d: eventType is required", i)) + } + if config.AgentRef.Name == "" { + validationResult["valid"] = false + validationResult["errors"] = append(validationResult["errors"].([]string), + fmt.Sprintf("Event configuration %d: agentRef.name is required", i)) + } + if config.Prompt == "" { + validationResult["valid"] = false + validationResult["errors"] = append(validationResult["errors"].([]string), + fmt.Sprintf("Event configuration %d: prompt is required", i)) + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(validationResult) } // handleEvents handles GET /api/v1/events with query parameters @@ -537,7 +750,7 @@ func (s *Server) handleEvents(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + // Parse query parameters query := r.URL.Query() namespace := query.Get("namespace") @@ -545,8 +758,42 @@ func (s *Server) handleEvents(w http.ResponseWriter, r *http.Request) { resourceName := query.Get("resourceName") status := query.Get("status") + // Time-based filtering + var startTime, endTime *time.Time + if startTimeStr := query.Get("startTime"); startTimeStr != "" { + if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil { + startTime = &t + } + } + if endTimeStr := query.Get("endTime"); endTimeStr != "" { + if t, err := time.Parse(time.RFC3339, endTimeStr); err == nil { + endTime = &t + } + } + + // Sorting + sortBy := query.Get("sort") // timestamp, eventType, resourceName + sortOrder := query.Get("order") // asc, desc (default: desc) + if sortOrder == "" { + sortOrder = "desc" + } + + // Pagination parameters + limit := 100 // default limit + offset := 0 // default offset + if limitStr := query.Get("limit"); limitStr != "" { + if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { + limit = l + } + } + if offsetStr := query.Get("offset"); offsetStr != "" { + if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { + offset = o + } + } + s.mu.RLock() - alerts := make([]Alert, 0, len(s.alerts)) + allAlerts := make([]Alert, 0, len(s.alerts)) for _, alert := range s.alerts { // Apply filters if namespace != "" && alert.Namespace != namespace { @@ -561,14 +808,133 @@ func (s *Server) handleEvents(w http.ResponseWriter, r *http.Request) { if status != "" && alert.Status != status { continue } - alerts = append(alerts, *alert) + + // Time-based filtering + if startTime != nil && alert.Timestamp.Before(*startTime) { + continue + } + if endTime != nil && alert.Timestamp.After(*endTime) { + continue + } + + allAlerts = append(allAlerts, *alert) } s.mu.RUnlock() - + + // Apply sorting + if sortBy != "" { + s.sortAlerts(allAlerts, sortBy, sortOrder) + } + + // Apply pagination + total := len(allAlerts) + start := offset + end := offset + limit + + if start > total { + start = total + } + if end > total { + end = total + } + + var alerts []Alert + if start < end { + alerts = allAlerts[start:end] + } + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ - "data": alerts, - "total": len(alerts), + "data": alerts, + "total": total, + "limit": limit, + "offset": offset, + "has_more": end < total, + }) +} + +// handleEventsByNamespace handles /api/v1/events/{namespace} +func (s *Server) handleEventsByNamespace(w http.ResponseWriter, r *http.Request) { + // Parse path to extract namespace + path := r.URL.Path + if len(path) < len("/api/v1/events/") { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + + namespace := path[len("/api/v1/events/"):] + if namespace == "" { + http.Error(w, "Missing namespace", http.StatusBadRequest) + return + } + + // Parse query parameters + query := r.URL.Query() + eventType := query.Get("eventType") + resourceName := query.Get("resourceName") + status := query.Get("status") + limitStr := query.Get("limit") + offsetStr := query.Get("offset") + + // Parse pagination + limit := 100 // Default limit + offset := 0 // Default offset + if limitStr != "" { + if l, err := strconv.Atoi(limitStr); err == nil && l > 0 { + limit = l + } + } + if offsetStr != "" { + if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { + offset = o + } + } + + // Filter alerts by namespace and other parameters + s.mu.RLock() + allAlerts := make([]Alert, 0) + for _, alert := range s.alerts { + if alert.Namespace == namespace { + // Apply additional filters + if eventType != "" && alert.EventType != eventType { + continue + } + if resourceName != "" && alert.ResourceName != resourceName { + continue + } + if status != "" && alert.Status != status { + continue + } + allAlerts = append(allAlerts, *alert) + } + } + s.mu.RUnlock() + + // Apply pagination + total := len(allAlerts) + start := offset + end := offset + limit + + if start > total { + start = total + } + if end > total { + end = total + } + + var alerts []Alert + if start < end { + alerts = allAlerts[start:end] + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "data": alerts, + "total": total, + "limit": limit, + "offset": offset, + "has_more": end < total, + "namespace": namespace, }) } @@ -580,16 +946,16 @@ func (s *Server) handleEventActions(w http.ResponseWriter, r *http.Request) { http.Error(w, "Invalid path", http.StatusBadRequest) return } - + parts := strings.Split(path[len("/api/v1/events/"):], "/") if len(parts) < 2 { http.Error(w, "Missing namespace or name", http.StatusBadRequest) return } - + namespace := parts[0] hookName := parts[1] - + // Filter alerts by namespace and hook name s.mu.RLock() alerts := make([]Alert, 0) @@ -599,10 +965,10 @@ func (s *Server) handleEventActions(w http.ResponseWriter, r *http.Request) { } } s.mu.RUnlock() - + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ - "data": alerts, + "data": alerts, "total": len(alerts), }) } @@ -613,11 +979,11 @@ func (s *Server) handleEventSummary(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + s.mu.RLock() summary := s.calculateSummary() s.mu.RUnlock() - + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "summary": summary, @@ -630,7 +996,7 @@ func (s *Server) handleEventStatsByType(w http.ResponseWriter, r *http.Request) http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + s.mu.RLock() byType := make(map[string]map[string]interface{}) total := 0 @@ -642,7 +1008,7 @@ func (s *Server) handleEventStatsByType(w http.ResponseWriter, r *http.Request) } byType[alert.EventType]["count"] = byType[alert.EventType]["count"].(int) + 1 } - + // Calculate percentages for eventType, stats := range byType { count := stats["count"].(int) @@ -650,7 +1016,7 @@ func (s *Server) handleEventStatsByType(w http.ResponseWriter, r *http.Request) byType[eventType]["percentage"] = percentage } s.mu.RUnlock() - + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "byType": byType, @@ -665,14 +1031,14 @@ func (s *Server) handleEventStream(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Cache-Control") - + // Create client channel clientChan := make(chan Alert, 100) - + s.clientsMu.Lock() s.clients[clientChan] = true s.clientsMu.Unlock() - + // Send initial data s.mu.RLock() for _, alert := range s.alerts { @@ -682,11 +1048,11 @@ func (s *Server) handleEventStream(w http.ResponseWriter, r *http.Request) { } } s.mu.RUnlock() - + // Send heartbeat ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() - + // Cleanup function defer func() { s.clientsMu.Lock() @@ -694,18 +1060,18 @@ func (s *Server) handleEventStream(w http.ResponseWriter, r *http.Request) { s.clientsMu.Unlock() close(clientChan) }() - + for { select { case alert := <-clientChan: data, _ := json.Marshal(alert) fmt.Fprintf(w, "event: event\ndata: %s\n\n", data) w.(http.Flusher).Flush() - + case <-ticker.C: fmt.Fprintf(w, "event: heartbeat\ndata: %s\n\n", time.Now().Format(time.RFC3339)) w.(http.Flusher).Flush() - + case <-r.Context().Done(): return } @@ -718,21 +1084,25 @@ func (s *Server) handleDiagnostics(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + diagnostics := map[string]interface{}{ - "api_server_status": "running", - "uptime": time.Since(time.Now()).String(), // This would need to track actual start time + "api_server_status": "running", + "uptime": time.Since(s.startTime).String(), "event_processing_pipeline_health": "healthy", - "kagent_api_connectivity": "unknown", // Would need to check actual connectivity + "kagent_api_connectivity": s.checkKagentConnectivity(), "plugin_status": map[string]string{ "kubernetes_events": "active", }, "memory_usage": map[string]interface{}{ - "alerts_count": len(s.alerts), + "alerts_count": len(s.alerts), "active_connections": len(s.clients), }, + "server_info": map[string]interface{}{ + "port": s.port, + "start_time": s.startTime.Format(time.RFC3339), + }, } - + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(diagnostics) } @@ -743,20 +1113,475 @@ func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + s.mu.RLock() summary := s.calculateSummary() s.mu.RUnlock() - + metrics := map[string]interface{}{ - "khook_events_total": summary.Total, - "khook_active_events": summary.Firing, - "khook_resolved_events": summary.Resolved, + "khook_events_total": summary.Total, + "khook_active_events": summary.Firing, + "khook_resolved_events": summary.Resolved, "khook_acknowledged_events": summary.Acknowledged, - "khook_events_by_type": summary.ByEventType, - "khook_events_by_severity": summary.BySeverity, + "khook_events_by_type": summary.ByEventType, + "khook_events_by_severity": summary.BySeverity, } - + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(metrics) } + +// checkKagentConnectivity checks if the Kagent API is reachable +func (s *Server) checkKagentConnectivity() string { + // This is a simplified connectivity check + // In a real implementation, you might want to make an actual HTTP request + // to the Kagent API endpoint to verify connectivity + + // For now, we'll return "unknown" since we don't have direct access + // to the Kagent client configuration in this context + // A more sophisticated implementation would: + // 1. Get the Kagent API URL from environment variables or config + // 2. Make a health check request to the API + // 3. Return "connected", "disconnected", or "unknown" based on the response + + return "unknown" +} + +// handleEventTypes handles GET /api/v1/events/types/{eventType} +func (s *Server) handleEventTypes(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse path to extract eventType + path := r.URL.Path + if len(path) < len("/api/v1/events/types/") { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + + eventType := path[len("/api/v1/events/types/"):] + if eventType == "" { + http.Error(w, "Missing event type", http.StatusBadRequest) + return + } + + // Parse query parameters for filtering + query := r.URL.Query() + namespace := query.Get("namespace") + resourceName := query.Get("resourceName") + status := query.Get("status") + + // Time-based filtering + var startTime, endTime *time.Time + if startTimeStr := query.Get("startTime"); startTimeStr != "" { + if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil { + startTime = &t + } + } + if endTimeStr := query.Get("endTime"); endTimeStr != "" { + if t, err := time.Parse(time.RFC3339, endTimeStr); err == nil { + endTime = &t + } + } + + s.mu.RLock() + alerts := make([]Alert, 0) + for _, alert := range s.alerts { + // Filter by event type + if alert.EventType != eventType { + continue + } + // Apply other filters + if namespace != "" && alert.Namespace != namespace { + continue + } + if resourceName != "" && alert.ResourceName != resourceName { + continue + } + if status != "" && alert.Status != status { + continue + } + // Time-based filtering + if startTime != nil && alert.Timestamp.Before(*startTime) { + continue + } + if endTime != nil && alert.Timestamp.After(*endTime) { + continue + } + alerts = append(alerts, *alert) + } + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "data": alerts, + "total": len(alerts), + "eventType": eventType, + }) +} + +// handleHookStats handles GET /api/v1/stats/hooks/{namespace}/{name}/metrics +func (s *Server) handleHookStats(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse path to extract namespace and name + path := r.URL.Path + if len(path) < len("/api/v1/stats/hooks/") { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + + parts := strings.Split(path[len("/api/v1/stats/hooks/"):], "/") + if len(parts) < 2 { + http.Error(w, "Missing namespace or name", http.StatusBadRequest) + return + } + + namespace := parts[0] + hookName := parts[1] + + // Get hook from Kubernetes + var hook v1alpha2.Hook + key := types.NamespacedName{Name: hookName, Namespace: namespace} + if err := s.client.Get(context.Background(), key, &hook); err != nil { + http.Error(w, "Hook not found", http.StatusNotFound) + return + } + + // Count events for this hook + s.mu.RLock() + totalEvents := 0 + eventsByType := make(map[string]int) + eventsByStatus := make(map[string]int) + eventsBySeverity := make(map[string]int) + + for _, alert := range s.alerts { + if alert.Namespace == namespace && alert.HookName == hookName { + totalEvents++ + eventsByType[alert.EventType]++ + eventsByStatus[alert.Status]++ + eventsBySeverity[alert.Severity]++ + } + } + s.mu.RUnlock() + + metrics := map[string]interface{}{ + "hook": map[string]interface{}{ + "name": hookName, + "namespace": namespace, + "status": "active", // Hook is active if it exists + }, + "events": map[string]interface{}{ + "total": totalEvents, + "by_type": eventsByType, + "by_status": eventsByStatus, + "by_severity": eventsBySeverity, + }, + "configuration": map[string]interface{}{ + "event_configurations": len(hook.Spec.EventConfigurations), + "created_at": hook.CreationTimestamp.Time.Format(time.RFC3339), + }, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(metrics) +} + +// handleEventTrends handles GET /api/v1/stats/trends +func (s *Server) handleEventTrends(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse query parameters + query := r.URL.Query() + timeRange := query.Get("timeRange") // 1h, 24h, 7d, 30d + if timeRange == "" { + timeRange = "24h" + } + + // Calculate time window + var window time.Duration + switch timeRange { + case "1h": + window = time.Hour + case "24h": + window = 24 * time.Hour + case "7d": + window = 7 * 24 * time.Hour + case "30d": + window = 30 * 24 * time.Hour + default: + window = 24 * time.Hour + } + + now := time.Now() + startTime := now.Add(-window) + + s.mu.RLock() + trends := make(map[string]interface{}) + hourlyCounts := make(map[string]int) + dailyCounts := make(map[string]int) + + // Group events by hour and day + for _, alert := range s.alerts { + if alert.Timestamp.Before(startTime) { + continue + } + + hourKey := alert.Timestamp.Format("2006-01-02 15:00") + dayKey := alert.Timestamp.Format("2006-01-02") + + hourlyCounts[hourKey]++ + dailyCounts[dayKey]++ + } + s.mu.RUnlock() + + // Calculate trends + trends["time_range"] = timeRange + trends["hourly_counts"] = hourlyCounts + trends["daily_counts"] = dailyCounts + trends["total_events"] = len(hourlyCounts) + trends["window_start"] = startTime.Format(time.RFC3339) + trends["window_end"] = now.Format(time.RFC3339) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(trends) +} + +// sortAlerts sorts alerts by the specified field and order +func (s *Server) sortAlerts(alerts []Alert, sortBy, order string) { + switch sortBy { + case "timestamp": + if order == "asc" { + // Sort by timestamp ascending + for i := 0; i < len(alerts)-1; i++ { + for j := i + 1; j < len(alerts); j++ { + if alerts[i].Timestamp.After(alerts[j].Timestamp) { + alerts[i], alerts[j] = alerts[j], alerts[i] + } + } + } + } else { + // Sort by timestamp descending (default) + for i := 0; i < len(alerts)-1; i++ { + for j := i + 1; j < len(alerts); j++ { + if alerts[i].Timestamp.Before(alerts[j].Timestamp) { + alerts[i], alerts[j] = alerts[j], alerts[i] + } + } + } + } + case "eventType": + if order == "asc" { + for i := 0; i < len(alerts)-1; i++ { + for j := i + 1; j < len(alerts); j++ { + if alerts[i].EventType > alerts[j].EventType { + alerts[i], alerts[j] = alerts[j], alerts[i] + } + } + } + } else { + for i := 0; i < len(alerts)-1; i++ { + for j := i + 1; j < len(alerts); j++ { + if alerts[i].EventType < alerts[j].EventType { + alerts[i], alerts[j] = alerts[j], alerts[i] + } + } + } + } + case "resourceName": + if order == "asc" { + for i := 0; i < len(alerts)-1; i++ { + for j := i + 1; j < len(alerts); j++ { + if alerts[i].ResourceName > alerts[j].ResourceName { + alerts[i], alerts[j] = alerts[j], alerts[i] + } + } + } + } else { + for i := 0; i < len(alerts)-1; i++ { + for j := i + 1; j < len(alerts); j++ { + if alerts[i].ResourceName < alerts[j].ResourceName { + alerts[i], alerts[j] = alerts[j], alerts[i] + } + } + } + } + } +} + +// handleWebSocket handles WebSocket connections for real-time event streaming +func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { + // Upgrade HTTP connection to WebSocket + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + s.logger.Error(err, "Failed to upgrade connection to WebSocket") + return + } + defer conn.Close() + + // Add client to WebSocket clients + s.wsMu.Lock() + s.wsClients[conn] = true + s.wsMu.Unlock() + + // Remove client when connection closes + defer func() { + s.wsMu.Lock() + delete(s.wsClients, conn) + s.wsMu.Unlock() + }() + + s.logger.Info("WebSocket client connected") + + // Send initial data + initialData := map[string]interface{}{ + "type": "connected", + "message": "WebSocket connection established", + "time": time.Now().Format(time.RFC3339), + } + if err := conn.WriteJSON(initialData); err != nil { + s.logger.Error(err, "Failed to send initial data to WebSocket client") + return + } + + // Keep connection alive and handle incoming messages + for { + // Read message from client (for ping/pong or commands) + messageType, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + s.logger.Error(err, "WebSocket error") + } + break + } + + // Handle different message types + switch messageType { + case websocket.TextMessage: + // Handle text messages (could be commands like "ping", "subscribe", etc.) + command := string(message) + switch command { + case "ping": + // Respond with pong + if err := conn.WriteMessage(websocket.TextMessage, []byte("pong")); err != nil { + s.logger.Error(err, "Failed to send pong to WebSocket client") + return + } + case "subscribe": + // Client wants to subscribe to all events + response := map[string]interface{}{ + "type": "subscribed", + "message": "Subscribed to all events", + "time": time.Now().Format(time.RFC3339), + } + if err := conn.WriteJSON(response); err != nil { + s.logger.Error(err, "Failed to send subscription confirmation") + return + } + } + case websocket.PingMessage: + // Respond to ping with pong + if err := conn.WriteMessage(websocket.PongMessage, nil); err != nil { + s.logger.Error(err, "Failed to send pong to WebSocket client") + return + } + } + } + + s.logger.Info("WebSocket client disconnected") +} + +// LoadExistingEvents loads existing active events from all hooks +func (s *Server) LoadExistingEvents(ctx context.Context) { + s.loadExistingEvents(ctx) +} + +// loadExistingEvents loads existing active events from all hooks on startup +func (s *Server) loadExistingEvents(ctx context.Context) { + s.logger.Info("Loading existing events from hooks...") + + // Get all hooks from all namespaces + var hookList v1alpha2.HookList + if err := s.client.List(ctx, &hookList); err != nil { + s.logger.Error(err, "Failed to list hooks") + return + } + + loadedCount := 0 + for _, hook := range hookList.Items { + // Load active events from hook status + for _, activeEvent := range hook.Status.ActiveEvents { + // Find the agent ID for this specific event type + agentID := "k8s-agent" // Default fallback + for _, eventConfig := range hook.Spec.EventConfigurations { + if eventConfig.EventType == activeEvent.EventType { + if eventConfig.AgentRef.Name != "" { + agentID = eventConfig.AgentRef.Name + } else if eventConfig.AgentId != "" { + // Parse legacy agentId format + parts := strings.Split(eventConfig.AgentId, "/") + if len(parts) == 2 { + agentID = parts[1] // Use the name part + } else { + agentID = parts[0] // Use the whole string if no slash + } + } + break // Use the first matching configuration + } + } + // Convert active event to alert + alert := &Alert{ + ID: fmt.Sprintf("%s-%s-%s-%s", hook.Namespace, hook.Name, activeEvent.EventType, activeEvent.ResourceName), + HookName: hook.Name, + Namespace: hook.Namespace, + EventType: activeEvent.EventType, + ResourceName: activeEvent.ResourceName, + Severity: s.determineSeverity(activeEvent.EventType), + Status: activeEvent.Status, + Timestamp: activeEvent.FirstSeen.Time, + FirstSeen: activeEvent.FirstSeen.Format(time.RFC3339), + LastSeen: activeEvent.LastSeen.Format(time.RFC3339), + Message: fmt.Sprintf("Event %s for resource %s", activeEvent.EventType, activeEvent.ResourceName), + AgentID: agentID, // Use the determined agent ID + } + + // Add to alerts map + s.mu.Lock() + s.alerts[alert.ID] = alert + s.mu.Unlock() + + loadedCount++ + s.logger.Info("Loaded existing event", + "alertId", alert.ID, + "eventType", alert.EventType, + "status", alert.Status, + "hook", hook.Name) + } + } + + s.logger.Info("Finished loading existing events", "count", loadedCount) +} + +// determineSeverity determines alert severity based on event type +func (s *Server) determineSeverity(eventType string) string { + switch eventType { + case "pod-restart": + return "high" + case "oom-kill": + return "critical" + case "probe-failed": + return "high" + case "pod-pending": + return "medium" + default: + return "medium" + } +} diff --git a/internal/workflow/coordinator.go b/internal/workflow/coordinator.go index 24e032e..71d7267 100644 --- a/internal/workflow/coordinator.go +++ b/internal/workflow/coordinator.go @@ -2,6 +2,7 @@ package workflow import ( "context" + "fmt" "time" "github.com/go-logr/logr" @@ -59,6 +60,15 @@ func NewCoordinator( func (c *Coordinator) Start(ctx context.Context) error { c.logger.Info("Starting workflow coordinator") + // Load existing events into SRE server if available + c.logger.Info("Attempting to load existing events", "sreServerType", fmt.Sprintf("%T", c.workflowManager.sreServer)) + if sreServer, ok := c.workflowManager.sreServer.(interface{ LoadExistingEvents(context.Context) }); ok { + c.logger.Info("Loading existing events into SRE server") + sreServer.LoadExistingEvents(ctx) + } else { + c.logger.Info("SRE server does not support LoadExistingEvents method") + } + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() From 1adb29c2876c84c17f8239b783f8a02e6e6808de Mon Sep 17 00:00:00 2001 From: Denis Tu Date: Fri, 19 Sep 2025 14:50:15 +0300 Subject: [PATCH 3/5] ignore values test file Signed-off-by: Denis Tu Signed-off-by: Denis Tu --- helm/.gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/helm/.gitignore b/helm/.gitignore index ec92b06..3d3e77d 100644 --- a/helm/.gitignore +++ b/helm/.gitignore @@ -1,2 +1,3 @@ Chart.yaml Chart.lock +values-test.yaml \ No newline at end of file From d758c9c385bc1975e3d4cd9b5e4fb1d82a1c941d Mon Sep 17 00:00:00 2001 From: Denis Tu Date: Fri, 19 Sep 2025 15:46:09 +0300 Subject: [PATCH 4/5] restore after rebase Signed-off-by: Denis Tu --- internal/sre/server.go | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/internal/sre/server.go b/internal/sre/server.go index 3f63215..25f8820 100644 --- a/internal/sre/server.go +++ b/internal/sre/server.go @@ -76,8 +76,10 @@ func NewServer(port int, client client.Client) *Server { startTime: time.Now(), upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { - return true // Allow all origins for now + return true }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, }, } } @@ -581,9 +583,7 @@ func (s *Server) handleHooks(w http.ResponseWriter, r *http.Request) { } if r.Method == http.MethodPost { - // Create hook - for now, just return success - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]string{"status": "success"}) + s.handleCreateHook(w, r) return } @@ -697,6 +697,24 @@ func (s *Server) handleDeleteHook(w http.ResponseWriter, r *http.Request, namesp w.WriteHeader(http.StatusNoContent) } +// handleCreateHook creates a new Hook resource +func (s *Server) handleCreateHook(w http.ResponseWriter, r *http.Request) { + var hook v1alpha2.Hook + if err := json.NewDecoder(r.Body).Decode(&hook); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + if err := s.client.Create(context.Background(), &hook); err != nil { + s.logger.Error(err, "Failed to create hook") + http.Error(w, "Failed to create hook", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(hook) +} + // handleHookValidation handles POST /api/v1/hooks/validate func (s *Server) handleHookValidation(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { @@ -1133,17 +1151,7 @@ func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { // checkKagentConnectivity checks if the Kagent API is reachable func (s *Server) checkKagentConnectivity() string { - // This is a simplified connectivity check - // In a real implementation, you might want to make an actual HTTP request - // to the Kagent API endpoint to verify connectivity - - // For now, we'll return "unknown" since we don't have direct access - // to the Kagent client configuration in this context - // A more sophisticated implementation would: - // 1. Get the Kagent API URL from environment variables or config - // 2. Make a health check request to the API - // 3. Return "connected", "disconnected", or "unknown" based on the response - + // TODO: Implement actual Kagent API connectivity check return "unknown" } From d4525479b1481bd2b66f09a579b8b71aced84a2e Mon Sep 17 00:00:00 2001 From: Denis Tu Date: Fri, 19 Sep 2025 15:55:51 +0300 Subject: [PATCH 5/5] remove checkKagentConnectivity Signed-off-by: Denis Tu --- internal/sre/server.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/internal/sre/server.go b/internal/sre/server.go index 25f8820..d8fd2b5 100644 --- a/internal/sre/server.go +++ b/internal/sre/server.go @@ -1107,7 +1107,7 @@ func (s *Server) handleDiagnostics(w http.ResponseWriter, r *http.Request) { "api_server_status": "running", "uptime": time.Since(s.startTime).String(), "event_processing_pipeline_health": "healthy", - "kagent_api_connectivity": s.checkKagentConnectivity(), + "kagent_api_connectivity": "managed_by_controller", "plugin_status": map[string]string{ "kubernetes_events": "active", }, @@ -1149,11 +1149,6 @@ func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(metrics) } -// checkKagentConnectivity checks if the Kagent API is reachable -func (s *Server) checkKagentConnectivity() string { - // TODO: Implement actual Kagent API connectivity check - return "unknown" -} // handleEventTypes handles GET /api/v1/events/types/{eventType} func (s *Server) handleEventTypes(w http.ResponseWriter, r *http.Request) {