diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/edf/edf.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/edf/edf.go new file mode 100644 index 000000000..0aea48038 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/edf/edf.go @@ -0,0 +1,124 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package edf + +import ( + "time" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// EDFPolicyName is the name of the Earliest Deadline First (EDF) intra-flow dispatch policy. +// +// This policy implements a deadline-urgency scheduling strategy by selecting the request with the earliest absolute +// deadline, computed as `EnqueueTime() + EffectiveTTL()`. Requests without a valid TTL (i.e., EffectiveTTL <= 0) are +// treated as having no deadline and are scheduled after all time-bound requests, using FCFS as a tie-breaker for fairness. +const EDFPolicyName = "EDF" + +func init() { + dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(EDFPolicyName), + func() (framework.IntraFlowDispatchPolicy, error) { + return newEDFPolicy(), nil + }) +} + +// EDFPolicy implements an intra-flow dispatch policy based on the Earliest Deadline First (EDF) scheduling algorithm. +// Requests with earlier absolute deadlines (EnqueueTime + EffectiveTTL) are dispatched first. +// See the documentation for the exported `EDFPolicyName` constant for detailed behavioral guarantees. +type EDFPolicy struct { + comparator framework.ItemComparator +} + +var _ framework.IntraFlowDispatchPolicy = &EDFPolicy{} + +func newEDFPolicy() framework.IntraFlowDispatchPolicy { + return &EDFPolicy{ + comparator: &edfComparator{}, + } +} + +func (p *EDFPolicy) Name() string { + return EDFPolicyName +} + +// RequiredQueueCapabilities returns the queue capabilities required by this policy. +// It requires a priority-configurable queue (e.g., heap-based) to maintain items in deadline-sorted order. +func (p *EDFPolicy) RequiredQueueCapabilities() []framework.QueueCapability { + return []framework.QueueCapability{framework.CapabilityPriorityConfigurable} +} + +func (p *EDFPolicy) Comparator() framework.ItemComparator { + return p.comparator +} + +// SelectItem selects the next item to dispatch by returning the head of the queue. +// This assumes the underlying queue is ordered according to the policy's comparator +// (enforced via RequiredQueueCapabilities). Thus, the most urgent request is always at the head. +// Returns (nil, nil) if the queue is empty or nil. +func (p *EDFPolicy) SelectItem(queue framework.FlowQueueAccessor) (selectedItem types.QueueItemAccessor, err error) { + if queue == nil { + return nil, nil + } + return queue.PeekHead(), nil +} + +var maxDeadlineTime = time.Unix(0, 1<<63-1) + +// calculateDeadline computes the absolute deadline for a request. +// The deadline is defined as the logical enqueue time plus the effective time-to-live (TTL). +// If EffectiveTTL is zero or negative, the request is considered non-time-sensitive and assigned a +// far-future deadline so it sorts after all SLO-bound requests. +func calculateDeadline(item types.QueueItemAccessor) time.Time { + ttl := item.EffectiveTTL() + if ttl <= 0 { + // No TTL: treat as "never expire", but still respect enqueue time for fairness. + return maxDeadlineTime + } + return item.EnqueueTime().Add(ttl) +} + +type edfComparator struct{} + +func (d *edfComparator) Func() framework.ItemComparatorFunc { + return func(a, b types.QueueItemAccessor) bool { + if a == nil && b == nil { + return false + } + if a == nil { // Treat nil as lowest priority + return false + } + if b == nil { // Treat non-nil 'a' as higher priority than nil 'b' + return true + } + deadlineA := calculateDeadline(a) + deadlineB := calculateDeadline(b) + + if !deadlineA.Equal(deadlineB) { + return deadlineA.Before(deadlineB) // earlier deadline = higher priority + } + + // Same deadline: FCFS (earlier enqueue time = higher priority) + return a.EnqueueTime().Before(b.EnqueueTime()) + } +} + +// ScoreType indicates this policy uses EDF-based scoring. +func (d *edfComparator) ScoreType() string { + return string(framework.EDFPriorityScoreType) +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/edf/edf_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/edf/edf_test.go new file mode 100644 index 000000000..7a12d77f4 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/edf/edf_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package edf + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" +) + +var testFlowKey = types.FlowKey{ID: "test-flow", Priority: 0} + +func TestEDFPolicy_Name(t *testing.T) { + t.Parallel() + policy := newEDFPolicy() + assert.Equal(t, EDFPolicyName, policy.Name()) +} + +func TestEDFPolicy_RequiredQueueCapabilities(t *testing.T) { + t.Parallel() + policy := newEDFPolicy() + caps := policy.RequiredQueueCapabilities() + require.Len(t, caps, 1) + assert.Equal(t, framework.CapabilityPriorityConfigurable, caps[0]) +} + +func TestEDFPolicy_SelectItem(t *testing.T) { + t.Parallel() + policy := newEDFPolicy() + + mockItem := typesmocks.NewMockQueueItemAccessor(1, "item1", testFlowKey) + mockQueue := &frameworkmocks.MockFlowQueueAccessor{ + PeekHeadV: mockItem, + LenV: 1, + } + + item, err := policy.SelectItem(mockQueue) + require.NoError(t, err) + assert.Equal(t, mockItem, item, "Should return the head of the queue") +} + +func TestEDFComparator_Func(t *testing.T) { + t.Parallel() + comparator := &edfComparator{} + compareFunc := comparator.Func() + require.NotNil(t, compareFunc) + + now := time.Now() + + // Item A: TTL=10s → deadline = now + 10s + itemA := typesmocks.NewMockQueueItemAccessor(10, "itemA", testFlowKey) + itemA.EnqueueTimeV = now + itemA.EffectiveTTLV = 10 * time.Second + + // Item B: TTL=5s → deadline = now + 5s (earlier than A) + itemB := typesmocks.NewMockQueueItemAccessor(20, "itemB", testFlowKey) + itemB.EnqueueTimeV = now.Add(1 * time.Second) // enqueued later, but tighter deadline + itemB.EffectiveTTLV = 5 * time.Second + + // Item C: TTL <= 0 → treated as far-future deadline + itemC := typesmocks.NewMockQueueItemAccessor(30, "itemC", testFlowKey) + itemC.EnqueueTimeV = now.Add(-5 * time.Second) // enqueued earlier, but no deadline + itemC.EffectiveTTLV = -1 * time.Second + + // Item D: same deadline as B, but enqueued earlier → should win tie-breaker + itemD := typesmocks.NewMockQueueItemAccessor(40, "itemD", testFlowKey) + itemD.EnqueueTimeV = now.Add(-1 * time.Second) + itemD.EffectiveTTLV = 6 * time.Second // deadline = now + 5s (same as B) + + // Item E: another non-deadline item + itemE := typesmocks.NewMockQueueItemAccessor(40, "itemD", testFlowKey) + itemE.EnqueueTimeV = now.Add(-10 * time.Second) // earlier than C + itemE.EffectiveTTLV = 0 + + testCases := []struct { + name string + a types.QueueItemAccessor + b types.QueueItemAccessor + expected bool // true if a should be dispatched before b + }{ + {"B before A (earlier deadline)", itemB, itemA, true}, + {"A after B", itemA, itemB, false}, + + {"Deadline-bound before non-deadline (A vs C)", itemA, itemC, true}, + {"Non-deadline after deadline-bound (C vs A)", itemC, itemA, false}, + + {"Tie-breaker: D before B (same deadline, D enqueued earlier)", itemD, itemB, true}, + {"Tie-breaker: B after D", itemB, itemD, false}, + + {"Non-deadline items: FCFS (E enqueued earlier than C → E before C)", itemE, itemC, true}, + {"Non-deadline items: C after E", itemC, itemE, false}, + + {"a is nil → b wins", nil, itemA, false}, + {"b is nil → a wins", itemA, nil, true}, + {"both nil → false", nil, nil, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tc.expected, compareFunc(tc.a, tc.b)) + }) + } +} + +func TestEDFComparator_ScoreType(t *testing.T) { + t.Parallel() + comparator := &edfComparator{} + assert.Equal(t, string(framework.EDFPriorityScoreType), comparator.ScoreType()) +} + +func TestCalculateDeadline(t *testing.T) { + t.Parallel() + + now := time.Now() + + // Valid TTL + itemWithTTL := typesmocks.NewMockQueueItemAccessor(1, "test", testFlowKey) + itemWithTTL.EnqueueTimeV = now + itemWithTTL.EffectiveTTLV = 5 * time.Second + + deadline := calculateDeadline(itemWithTTL) + assert.Equal(t, now.Add(5*time.Second), deadline) + + // Zero TTL → far future + itemZeroTTL := typesmocks.NewMockQueueItemAccessor(2, "test2", testFlowKey) + itemZeroTTL.EnqueueTimeV = now + itemZeroTTL.EffectiveTTLV = 0 + + deadlineZero := calculateDeadline(itemZeroTTL) + assert.Equal(t, maxDeadlineTime, deadlineZero) + + // Negative TTL → far future + itemNegTTL := typesmocks.NewMockQueueItemAccessor(3, "test3", testFlowKey) + itemNegTTL.EnqueueTimeV = now + itemNegTTL.EffectiveTTLV = -10 * time.Second + + deadlineNeg := calculateDeadline(itemNegTTL) + assert.Equal(t, maxDeadlineTime, deadlineNeg) +} diff --git a/pkg/epp/flowcontrol/framework/policies.go b/pkg/epp/flowcontrol/framework/policies.go index eeea034eb..82d5fda0d 100644 --- a/pkg/epp/flowcontrol/framework/policies.go +++ b/pkg/epp/flowcontrol/framework/policies.go @@ -25,6 +25,9 @@ const ( // EnqueueTimePriorityScoreType indicates that the priority is based on the item's enqueue time, with earlier times // being higher priority. EnqueueTimePriorityScoreType PriorityScoreType = "enqueue_time_ns_asc" + + // EDFPriorityScoreType indicates priority scoring based on the Earliest Deadline First (EDF) scheduling policy. + EDFPriorityScoreType PriorityScoreType = "earliest_deadline_first" ) // ItemComparatorFunc defines the function signature for comparing two `types.QueueItemAccessor` instances to determine