Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

const DefaultNumOfWorkers int = 1

var ErrEmptyTaskName = errors.New("task name must not be empty")

// ManagerOption is a function type that modifies the configuration of a ManagerConfig.
// It allows for functional configuration of the Manager, enabling users to customize
// various settings, such as the backoff policy, when creating a Manager.
Expand All @@ -24,6 +26,7 @@ type ManagerConfig struct {
// coordinating task consumption and processing, and gracefully stopping workers.
type Manager struct {
broker Broker
backoff Backoff
workers []Worker

ctx context.Context // The context used for managing the lifecycle of the Manager.
Expand Down Expand Up @@ -83,13 +86,19 @@ func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...Manager
managerConfig.BackoffPolicy = &DefaultBackoffPolicy
}

manager := &Manager{broker: broker, ctx: ctx, cancel: cancel}
manager := &Manager{
broker: broker,
backoff: managerConfig.BackoffPolicy,
ctx: ctx,
cancel: cancel,
}

for i := range numWorkers {
manager.wg.Add(1)
workerConfig := WorkerConfig{
ID: i + 1,
Broker: manager.broker,
Backoff: managerConfig.BackoffPolicy,
Backoff: manager.backoff,
WG: &manager.wg,
}
manager.workers = append(manager.workers, wf(workerConfig))
Expand Down Expand Up @@ -142,7 +151,7 @@ func (m *Manager) RegisterTask(taskName string, handler TaskHandlerFunc) {
// }
func (m *Manager) PublishTask(taskName string, args TaskArgs, maxRetry int) error {
if taskName == "" {
return errors.New("task name must not be empty")
return ErrEmptyTaskName
}

task := Task{
Expand Down Expand Up @@ -185,3 +194,11 @@ func (m *Manager) Stop() {
m.wg.Wait()
log.Println("All workers have shut down.")
}

func (m *Manager) Workers() []Worker {
return m.workers
}

func (m *Manager) BackoffPolicy() Backoff {
return m.backoff
}
112 changes: 112 additions & 0 deletions tests/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package taskqueue_test

import (
"testing"
"time"

"github.com/KengoWada/taskqueue"
"github.com/stretchr/testify/assert"
)

func TestManager(t *testing.T) {
t.Run("should initialize manager with workers and default backoff", func(t *testing.T) {
mockBroker := NewMockBroker(1)
tests := []struct {
numOfWorkers int
expected int
setBackoff bool
}{
{numOfWorkers: -1, expected: 1, setBackoff: true},
{numOfWorkers: 0, expected: 1, setBackoff: false},
{numOfWorkers: 5, expected: 5, setBackoff: true},
{numOfWorkers: 10, expected: 10, setBackoff: false},
}

backoffPolicy := &taskqueue.BackoffPolicy{
BaseDelay: 5 * time.Millisecond, // small delay for fast test
MaxDelay: 20 * time.Millisecond,
UseJitter: false,
JitterRangeMs: 0,
}

for _, tt := range tests {
var manager *taskqueue.Manager
var expectedBackoff *taskqueue.BackoffPolicy
if tt.setBackoff {
manager = taskqueue.NewManager(mockBroker, MockWorkerFactory, tt.numOfWorkers, taskqueue.WithBackoffPolicy(backoffPolicy))
expectedBackoff = backoffPolicy
} else {
manager = taskqueue.NewManager(mockBroker, MockWorkerFactory, tt.numOfWorkers)
expectedBackoff = &taskqueue.DefaultBackoffPolicy
}

assert.Equal(t, tt.expected, len(manager.Workers()))
assert.Equal(t, expectedBackoff, manager.BackoffPolicy().(*taskqueue.BackoffPolicy))
}
})

t.Run("should register handler with all workers", func(t *testing.T) {
mockBroker := NewMockBroker(1)
numOfWorkers := 5
manager := taskqueue.NewManager(mockBroker, MockWorkerFactory, numOfWorkers)

taskName := "task_name"
manager.RegisterTask(taskName, func(ta taskqueue.TaskArgs) error { return nil })

workers := manager.Workers()
assert.Equal(t, numOfWorkers, len(workers))

for _, worker := range workers {
w := worker.(*MockWorker)
task, exists := w.Handler[taskName]
assert.True(t, exists)
assert.Nil(t, task(taskqueue.TaskArgs{}))
}
})

t.Run("should publish task to the broker", func(t *testing.T) {
mockBroker := NewMockBroker(1)
numOfWorkers := 5
manager := taskqueue.NewManager(mockBroker, MockWorkerFactory, numOfWorkers)

// Invalid task name as empty string
err := manager.PublishTask("", taskqueue.TaskArgs{}, 3)
assert.Equal(t, taskqueue.ErrEmptyTaskName, err)

// Valid task name and arguments
err = manager.PublishTask("task_name", taskqueue.TaskArgs{}, 2)
assert.Nil(t, err)

// Broker fails to publish task
mockBroker.badPublish = true
err = manager.PublishTask("task_name", taskqueue.TaskArgs{}, 2)
assert.NotNil(t, err)
assert.Equal(t, errSimulated, err)
})

t.Run("should start and stop all workers", func(t *testing.T) {
mockBroker := NewMockBroker(1)
numOfWorkers := 5
manager := taskqueue.NewManager(mockBroker, MockWorkerFactory, numOfWorkers)

manager.Start()

time.Sleep(100 * time.Millisecond)

workers := manager.Workers()
assert.Equal(t, numOfWorkers, len(workers))
for _, worker := range workers {
w := worker.(*MockWorker)
assert.True(t, w.Started)
}

manager.Stop()

time.Sleep(100 * time.Millisecond)

for _, worker := range workers {
w := worker.(*MockWorker)
assert.False(t, w.Started)
}
})
}
29 changes: 29 additions & 0 deletions tests/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package taskqueue_test

import (
"context"
"errors"
"sync"

Expand All @@ -9,6 +10,34 @@ import (

var errSimulated = errors.New("simulated error")

type MockWorker struct {
id int
Handler taskqueue.HandlerRegistry
Started bool
wg *sync.WaitGroup
}

func (w *MockWorker) Register(name string, handler taskqueue.TaskHandlerFunc) {
w.Handler[name] = handler
}

func (w *MockWorker) Start(ctx context.Context) {
w.Started = true
defer func() { w.Started = false }()
defer w.wg.Done()

<-ctx.Done()
}

func MockWorkerFactory(cfg taskqueue.WorkerConfig) taskqueue.Worker {
return &MockWorker{
id: cfg.ID,
wg: cfg.WG,
Handler: make(taskqueue.HandlerRegistry),
Started: false,
}
}

type MockBroker struct {
mu sync.Mutex
tasks chan taskqueue.Task
Expand Down
6 changes: 4 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Worker interface {
Start(ctx context.Context)
}

type HandlerRegistry map[string]TaskHandlerFunc

// WorkerFactory defines a function that creates a new Worker instance using the provided configuration.
//
// This allows users to customize how Workers are constructed, enabling injection of custom
Expand Down Expand Up @@ -87,7 +89,7 @@ type DefaultWorker struct {
id int
broker Broker
backoff Backoff
handlers map[string]TaskHandlerFunc
handlers HandlerRegistry
wg *sync.WaitGroup
}

Expand All @@ -108,7 +110,7 @@ func DefaultWorkerFactory(cfg WorkerConfig) Worker {
id: cfg.ID,
broker: cfg.Broker,
backoff: cfg.Backoff,
handlers: make(map[string]TaskHandlerFunc),
handlers: make(HandlerRegistry),
wg: cfg.WG,
}
}
Expand Down