-
Notifications
You must be signed in to change notification settings - Fork 5
Feat: Embedded HTTP API Server #7 #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ import ( | |
| kagentv1alpha2 "github.com/kagent-dev/khook/api/v1alpha2" | ||
| kclient "github.com/kagent-dev/khook/internal/client" | ||
| "github.com/kagent-dev/khook/internal/config" | ||
| "github.com/kagent-dev/khook/internal/sre" | ||
| "github.com/kagent-dev/khook/internal/workflow" | ||
| ) | ||
|
|
||
|
|
@@ -76,8 +77,17 @@ func main() { | |
| os.Exit(1) | ||
| } | ||
|
|
||
| // Start SRE-IDE server | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have an issue with nomenclature - I know you built this specifically for your use case but would like to make stuff generic so other people can potentially build on them. I think just calling it "API server" will work better. |
||
| sreServer := sre.NewServer(8082, mgr.GetClient()) | ||
| go func() { | ||
| ctx := context.Background() | ||
| if err := sreServer.Start(ctx); err != nil { | ||
| setupLog.Error(err, "problem running SRE-IDE server") | ||
| } | ||
| }() | ||
|
|
||
| // Add workflow coordinator to manage hooks and event processing | ||
| if err := mgr.Add(newWorkflowCoordinator(mgr)); err != nil { | ||
| if err := mgr.Add(newWorkflowCoordinator(mgr, sreServer)); err != nil { | ||
| setupLog.Error(err, "unable to add workflow coordinator") | ||
| os.Exit(1) | ||
| } | ||
|
|
@@ -91,11 +101,15 @@ func main() { | |
|
|
||
| // workflowCoordinator manages the complete workflow lifecycle using proper services | ||
| type workflowCoordinator struct { | ||
| mgr ctrl.Manager | ||
| mgr ctrl.Manager | ||
| sreServer *sre.Server | ||
| } | ||
|
|
||
| func newWorkflowCoordinator(mgr ctrl.Manager) *workflowCoordinator { | ||
| return &workflowCoordinator{mgr: mgr} | ||
| func newWorkflowCoordinator(mgr ctrl.Manager, sreServer *sre.Server) *workflowCoordinator { | ||
| return &workflowCoordinator{ | ||
| mgr: mgr, | ||
| sreServer: sreServer, | ||
| } | ||
| } | ||
|
|
||
| func (w *workflowCoordinator) NeedLeaderElection() bool { return true } | ||
|
|
@@ -121,7 +135,7 @@ func (w *workflowCoordinator) Start(ctx context.Context) error { | |
|
|
||
| // Create workflow coordinator | ||
| eventRecorder := w.mgr.GetEventRecorderFor("khook") | ||
| coordinator := workflow.NewCoordinator(k8s, w.mgr.GetClient(), kagentCli, eventRecorder) | ||
| coordinator := workflow.NewCoordinator(k8s, w.mgr.GetClient(), kagentCli, eventRecorder, w.sreServer) | ||
|
|
||
| // Start the coordinator | ||
| return coordinator.Start(ctx) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| Chart.yaml | ||
| Chart.lock | ||
| values-test.yaml |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| {{- if .Values.sre.enabled }} | ||
| apiVersion: v1 | ||
| kind: Service | ||
| metadata: | ||
| name: {{ include "khook.fullname" . }}-sre | ||
| namespace: {{ include "khook.namespace" . }} | ||
| labels: | ||
| {{- include "khook.labels" . | nindent 4 }} | ||
| app.kubernetes.io/component: sre-server | ||
| {{- with .Values.sre.service.annotations }} | ||
| annotations: | ||
| {{- toYaml . | nindent 4 }} | ||
| {{- end }} | ||
| spec: | ||
| type: {{ .Values.sre.service.type }} | ||
| ports: | ||
| - name: sre-server | ||
| port: {{ .Values.sre.service.port }} | ||
| targetPort: sre-server | ||
| protocol: TCP | ||
| selector: | ||
| {{- include "khook.selectorLabels" . | nindent 4 }} | ||
| {{- end }} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ const ( | |
| // EventTimeoutDuration is the duration after which events are considered resolved | ||
| EventTimeoutDuration = 10 * time.Minute | ||
| // NotificationSuppressionDuration is the window to suppress re-sending after success | ||
| NotificationSuppressionDuration = 10 * time.Minute | ||
| NotificationSuppressionDuration = 30 * time.Second | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs to be discussed. While the original 10 minutes may be too long, 30 seconds is IMHO too short. Maybe we should at least make this configurable by values/configmap |
||
|
|
||
| // StatusFiring indicates an event is currently active | ||
| StatusFiring = "firing" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ import ( | |
|
|
||
| "github.com/kagent-dev/khook/api/v1alpha2" | ||
| "github.com/kagent-dev/khook/internal/interfaces" | ||
| "github.com/kagent-dev/khook/internal/sre" | ||
| ) | ||
|
|
||
| // Processor handles the complete event processing pipeline | ||
|
|
@@ -22,6 +23,7 @@ type Processor struct { | |
| deduplicationManager interfaces.DeduplicationManager | ||
| kagentClient interfaces.KagentClient | ||
| statusManager interfaces.StatusManager | ||
| sreServer interface{} | ||
| logger logr.Logger | ||
| } | ||
|
|
||
|
|
@@ -31,12 +33,14 @@ func NewProcessor( | |
| deduplicationManager interfaces.DeduplicationManager, | ||
| kagentClient interfaces.KagentClient, | ||
| statusManager interfaces.StatusManager, | ||
| sreServer interface{}, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this an |
||
| ) *Processor { | ||
| return &Processor{ | ||
| eventWatcher: eventWatcher, | ||
| deduplicationManager: deduplicationManager, | ||
| kagentClient: kagentClient, | ||
| statusManager: statusManager, | ||
| sreServer: sreServer, | ||
| logger: log.Log.WithName("event-processor"), | ||
| } | ||
| } | ||
|
|
@@ -133,13 +137,38 @@ func (p *Processor) processEventMatch(ctx context.Context, match EventMatch) err | |
| return fmt.Errorf("failed to record event in deduplication manager: %w", err) | ||
| } | ||
|
|
||
| agentRefNs := match.Hook.Namespace | ||
| if match.Configuration.AgentRef.Namespace != nil { | ||
| agentRefNs = *match.Configuration.AgentRef.Namespace | ||
| } | ||
| agentRef := types.NamespacedName{ | ||
| Name: match.Configuration.AgentRef.Name, | ||
| Namespace: agentRefNs, | ||
| // Handle both agentId (legacy) and agentRef (new) formats | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a fan of this as I mentioned above |
||
| var agentRef types.NamespacedName | ||
| if match.Configuration.AgentRef.Name != "" { | ||
| // New format: agentRef | ||
| agentRefNs := match.Hook.Namespace | ||
| if match.Configuration.AgentRef.Namespace != nil { | ||
| agentRefNs = *match.Configuration.AgentRef.Namespace | ||
| } | ||
| agentRef = types.NamespacedName{ | ||
| Name: match.Configuration.AgentRef.Name, | ||
| Namespace: agentRefNs, | ||
| } | ||
| } else { | ||
| // Legacy format: agentId (parse "namespace/name" format) | ||
| agentId := match.Configuration.AgentId | ||
| if agentId == "" { | ||
| return fmt.Errorf("neither agentRef.name nor agentId is specified") | ||
| } | ||
|
|
||
| // Parse agentId format: "namespace/name" or just "name" | ||
| parts := strings.Split(agentId, "/") | ||
| if len(parts) == 2 { | ||
| agentRef = types.NamespacedName{ | ||
| Name: parts[1], | ||
| Namespace: parts[0], | ||
| } | ||
| } else { | ||
| agentRef = types.NamespacedName{ | ||
| Name: parts[0], | ||
| Namespace: match.Hook.Namespace, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Record that the event is firing | ||
|
|
@@ -167,6 +196,19 @@ func (p *Processor) processEventMatch(ctx context.Context, match EventMatch) err | |
| // Continue even if status recording fails | ||
| } | ||
|
|
||
| // Add alert to SRE server if available | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At a high level, why do we have a callout to the server from here, rather than the server sharing the same datasource as this component? |
||
| p.logger.Info("Checking SRE server integration", "sreServer", p.sreServer != nil) | ||
| if p.sreServer != nil { | ||
| if sreServer, ok := p.sreServer.(*sre.Server); ok { | ||
| // Convert event to alert and add to SRE server | ||
| alert := sre.ConvertEventToAlert(match.Event, match.Hook, agentRef, response) | ||
| sreServer.AddAlert(alert) | ||
| p.logger.Info("Added alert to SRE server", "alertId", alert.ID) | ||
| } else { | ||
| p.logger.Error(nil, "Type assertion failed for SRE server", "sreServerType", fmt.Sprintf("%T", p.sreServer)) | ||
| } | ||
| } | ||
|
|
||
| // Mark event as notified to suppress re-sending within suppression window | ||
| p.deduplicationManager.MarkNotified(hookRef, match.Event) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this added back in, this format has been deprecated and has many issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess a rebase gone wrong. @DmarshalTU - we switched to AgentRef instead of AgentId to be aligned with the rest of Kagent