diff --git a/manager.go b/manager.go index 3dcc4a3..4049d55 100644 --- a/manager.go +++ b/manager.go @@ -10,6 +10,8 @@ import ( const DefaultNumOfWorkers int = 1 +var ErrEmptyTaskName = errors.New("task name must not be empty") + // ManagerOption is a function type that modifies the configuration of a ManagerConfig. // It allows for functional configuration of the Manager, enabling users to customize // various settings, such as the backoff policy, when creating a Manager. @@ -24,6 +26,7 @@ type ManagerConfig struct { // coordinating task consumption and processing, and gracefully stopping workers. type Manager struct { broker Broker + backoff Backoff workers []Worker ctx context.Context // The context used for managing the lifecycle of the Manager. @@ -83,13 +86,19 @@ func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...Manager managerConfig.BackoffPolicy = &DefaultBackoffPolicy } - manager := &Manager{broker: broker, ctx: ctx, cancel: cancel} + manager := &Manager{ + broker: broker, + backoff: managerConfig.BackoffPolicy, + ctx: ctx, + cancel: cancel, + } + for i := range numWorkers { manager.wg.Add(1) workerConfig := WorkerConfig{ ID: i + 1, Broker: manager.broker, - Backoff: managerConfig.BackoffPolicy, + Backoff: manager.backoff, WG: &manager.wg, } manager.workers = append(manager.workers, wf(workerConfig)) @@ -142,7 +151,7 @@ func (m *Manager) RegisterTask(taskName string, handler TaskHandlerFunc) { // } func (m *Manager) PublishTask(taskName string, args TaskArgs, maxRetry int) error { if taskName == "" { - return errors.New("task name must not be empty") + return ErrEmptyTaskName } task := Task{ @@ -185,3 +194,11 @@ func (m *Manager) Stop() { m.wg.Wait() log.Println("All workers have shut down.") } + +func (m *Manager) Workers() []Worker { + return m.workers +} + +func (m *Manager) BackoffPolicy() Backoff { + return m.backoff +} diff --git a/tests/manager_test.go b/tests/manager_test.go new file mode 100644 index 0000000..4844d6d --- /dev/null +++ b/tests/manager_test.go @@ -0,0 +1,112 @@ +package taskqueue_test + +import ( + "testing" + "time" + + "github.com/KengoWada/taskqueue" + "github.com/stretchr/testify/assert" +) + +func TestManager(t *testing.T) { + t.Run("should initialize manager with workers and default backoff", func(t *testing.T) { + mockBroker := NewMockBroker(1) + tests := []struct { + numOfWorkers int + expected int + setBackoff bool + }{ + {numOfWorkers: -1, expected: 1, setBackoff: true}, + {numOfWorkers: 0, expected: 1, setBackoff: false}, + {numOfWorkers: 5, expected: 5, setBackoff: true}, + {numOfWorkers: 10, expected: 10, setBackoff: false}, + } + + backoffPolicy := &taskqueue.BackoffPolicy{ + BaseDelay: 5 * time.Millisecond, // small delay for fast test + MaxDelay: 20 * time.Millisecond, + UseJitter: false, + JitterRangeMs: 0, + } + + for _, tt := range tests { + var manager *taskqueue.Manager + var expectedBackoff *taskqueue.BackoffPolicy + if tt.setBackoff { + manager = taskqueue.NewManager(mockBroker, MockWorkerFactory, tt.numOfWorkers, taskqueue.WithBackoffPolicy(backoffPolicy)) + expectedBackoff = backoffPolicy + } else { + manager = taskqueue.NewManager(mockBroker, MockWorkerFactory, tt.numOfWorkers) + expectedBackoff = &taskqueue.DefaultBackoffPolicy + } + + assert.Equal(t, tt.expected, len(manager.Workers())) + assert.Equal(t, expectedBackoff, manager.BackoffPolicy().(*taskqueue.BackoffPolicy)) + } + }) + + t.Run("should register handler with all workers", func(t *testing.T) { + mockBroker := NewMockBroker(1) + numOfWorkers := 5 + manager := taskqueue.NewManager(mockBroker, MockWorkerFactory, numOfWorkers) + + taskName := "task_name" + manager.RegisterTask(taskName, func(ta taskqueue.TaskArgs) error { return nil }) + + workers := manager.Workers() + assert.Equal(t, numOfWorkers, len(workers)) + + for _, worker := range workers { + w := worker.(*MockWorker) + task, exists := w.Handler[taskName] + assert.True(t, exists) + assert.Nil(t, task(taskqueue.TaskArgs{})) + } + }) + + t.Run("should publish task to the broker", func(t *testing.T) { + mockBroker := NewMockBroker(1) + numOfWorkers := 5 + manager := taskqueue.NewManager(mockBroker, MockWorkerFactory, numOfWorkers) + + // Invalid task name as empty string + err := manager.PublishTask("", taskqueue.TaskArgs{}, 3) + assert.Equal(t, taskqueue.ErrEmptyTaskName, err) + + // Valid task name and arguments + err = manager.PublishTask("task_name", taskqueue.TaskArgs{}, 2) + assert.Nil(t, err) + + // Broker fails to publish task + mockBroker.badPublish = true + err = manager.PublishTask("task_name", taskqueue.TaskArgs{}, 2) + assert.NotNil(t, err) + assert.Equal(t, errSimulated, err) + }) + + t.Run("should start and stop all workers", func(t *testing.T) { + mockBroker := NewMockBroker(1) + numOfWorkers := 5 + manager := taskqueue.NewManager(mockBroker, MockWorkerFactory, numOfWorkers) + + manager.Start() + + time.Sleep(100 * time.Millisecond) + + workers := manager.Workers() + assert.Equal(t, numOfWorkers, len(workers)) + for _, worker := range workers { + w := worker.(*MockWorker) + assert.True(t, w.Started) + } + + manager.Stop() + + time.Sleep(100 * time.Millisecond) + + for _, worker := range workers { + w := worker.(*MockWorker) + assert.False(t, w.Started) + } + }) +} diff --git a/tests/utils_test.go b/tests/utils_test.go index 8dbc273..5a3f608 100644 --- a/tests/utils_test.go +++ b/tests/utils_test.go @@ -1,6 +1,7 @@ package taskqueue_test import ( + "context" "errors" "sync" @@ -9,6 +10,34 @@ import ( var errSimulated = errors.New("simulated error") +type MockWorker struct { + id int + Handler taskqueue.HandlerRegistry + Started bool + wg *sync.WaitGroup +} + +func (w *MockWorker) Register(name string, handler taskqueue.TaskHandlerFunc) { + w.Handler[name] = handler +} + +func (w *MockWorker) Start(ctx context.Context) { + w.Started = true + defer func() { w.Started = false }() + defer w.wg.Done() + + <-ctx.Done() +} + +func MockWorkerFactory(cfg taskqueue.WorkerConfig) taskqueue.Worker { + return &MockWorker{ + id: cfg.ID, + wg: cfg.WG, + Handler: make(taskqueue.HandlerRegistry), + Started: false, + } +} + type MockBroker struct { mu sync.Mutex tasks chan taskqueue.Task diff --git a/worker.go b/worker.go index bb61f84..4e254f8 100644 --- a/worker.go +++ b/worker.go @@ -29,6 +29,8 @@ type Worker interface { Start(ctx context.Context) } +type HandlerRegistry map[string]TaskHandlerFunc + // WorkerFactory defines a function that creates a new Worker instance using the provided configuration. // // This allows users to customize how Workers are constructed, enabling injection of custom @@ -87,7 +89,7 @@ type DefaultWorker struct { id int broker Broker backoff Backoff - handlers map[string]TaskHandlerFunc + handlers HandlerRegistry wg *sync.WaitGroup } @@ -108,7 +110,7 @@ func DefaultWorkerFactory(cfg WorkerConfig) Worker { id: cfg.ID, broker: cfg.Broker, backoff: cfg.Backoff, - handlers: make(map[string]TaskHandlerFunc), + handlers: make(HandlerRegistry), wg: cfg.WG, } }