From c96be22a4687e14c918cf0e372ab8a531a12ce0f Mon Sep 17 00:00:00 2001 From: KengoWada Date: Tue, 29 Apr 2025 13:51:08 +0300 Subject: [PATCH 1/4] feat: Add Worker interface to support customizable worker implementations (#9) --- worker.go | 159 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 135 insertions(+), 24 deletions(-) diff --git a/worker.go b/worker.go index aa8ce9f..c2fa28a 100644 --- a/worker.go +++ b/worker.go @@ -7,39 +7,150 @@ import ( "time" ) -// Worker represents a single worker that processes tasks from a broker. -// It is responsible for consuming tasks, executing handlers, and managing retries. -type Worker struct { - id int // The unique identifier for the worker. - broker Broker // The broker from which tasks are consumed. - backoff *BackoffPolicy // The backoff policy for retrying tasks in case of failure. - handlers map[string]TaskHandlerFunc // A map of task names to their corresponding handler functions. - wg *sync.WaitGroup // A WaitGroup to synchronize worker shutdown and wait for all tasks to finish. +// Worker defines the contract for a task-processing worker. +// +// Implementations of Worker are responsible for registering task handlers +// and starting the task execution loop, typically consuming tasks from a Broker. +// +// Methods: +// - Register: Associates a task name with a handler function. +// - Start: Begins processing tasks using the provided context for cancellation. +// +// This interface allows different worker implementations to be plugged into the Manager, +// enabling customizable behavior such as logging, metrics, or concurrency models. +// +// Example usage: +// +// type MyWorker struct { ... } +// func (w *MyWorker) Register(name string, handler TaskHandlerFunc) { ... } +// func (w *MyWorker) Start(ctx context.Context) { ... } +type Worker interface { + Register(name string, handler TaskHandlerFunc) + Start(ctx context.Context) } -// NewWorker creates a new worker instance with the given ID, broker, backoff policy, and WaitGroup. -// The worker is responsible for consuming tasks from the broker, executing the appropriate handlers, -// and handling retries in case of failures. -func NewWorker(id int, broker Broker, backoff *BackoffPolicy, wg *sync.WaitGroup) *Worker { - return &Worker{ - id: id, - broker: broker, - backoff: backoff, +// WorkerFactory defines a function that creates a new Worker instance using the provided configuration. +// +// This allows users to customize how Workers are constructed, enabling injection of custom +// dependencies (e.g., loggers, metrics, settings) without modifying the Manager. +// +// The WorkerConfig contains common fields such as worker ID, broker, backoff policy, and a wait group. +// +// Example: +// +// func MyWorkerFactory(cfg WorkerConfig) Worker { +// return &MyCustomWorker{ +// id: cfg.ID, +// broker: cfg.Broker, +// backoff: cfg.Backoff, +// wg: cfg.WG, +// logger: myLogger, // custom dependency +// } +// } +type WorkerFactory func(cfg WorkerConfig) Worker + +// WorkerConfig provides the configuration necessary to initialize a Worker. +// +// It is passed to the WorkerFactory function by the Manager to ensure that all workers +// have the required shared dependencies and context-specific data. +// +// Fields: +// - ID: A unique identifier for the worker, typically assigned by the Manager. +// - Broker: The Broker instance used to retrieve and dispatch tasks. +// - Backoff: The policy used for retrying tasks on failure. +// - WG: A shared WaitGroup used by the Manager to coordinate worker shutdown. +// +// This struct is designed to be extended if additional shared dependencies need to +// be passed to workers in the future. +type WorkerConfig struct { + ID int + Broker Broker + Backoff *BackoffPolicy + WG *sync.WaitGroup +} + +// DefaultWorker is the standard implementation of the Worker interface. +// +// It is responsible for consuming tasks from the provided Broker, executing them using +// registered handler functions, and retrying failed tasks based on a backoff policy. +// +// Fields: +// - id: A unique identifier for this worker instance. +// - broker: The Broker used to fetch tasks for execution. +// - backoff: The BackoffPolicy used to delay retries on task failure. +// - handlers: A map of task names to their associated TaskHandlerFunc implementations. +// - wg: A shared WaitGroup used by the Manager to coordinate worker shutdown. +// +// DefaultWorker is intended to be created using a WorkerFactory and managed by a Manager. +// It supports context-based cancellation for graceful shutdown. +type DefaultWorker struct { + id int + broker Broker + backoff *BackoffPolicy + handlers map[string]TaskHandlerFunc + wg *sync.WaitGroup +} + +// DefaultWorkerFactory creates and returns a new instance of DefaultWorker using the provided WorkerConfig. +// +// The worker is initialized with: +// - A unique ID +// - A broker for task consumption +// - A backoff policy for retrying failed tasks +// - A shared WaitGroup for graceful shutdown coordination +// - An empty handler map ready for task registration +// +// This function returns a Worker interface, allowing the DefaultWorker to be used polymorphically. +// +// Typically used within a WorkerFactory passed to the Manager. +func DefaultWorkerFactory(cfg WorkerConfig) Worker { + return &DefaultWorker{ + id: cfg.ID, + broker: cfg.Broker, + backoff: cfg.Backoff, handlers: make(map[string]TaskHandlerFunc), - wg: wg, + wg: cfg.WG, } } -// Register registers a task handler function for a specific task name. -// The handler will be invoked when a task with the specified name is consumed from the broker. -func (w *Worker) Register(name string, handler TaskHandlerFunc) { +// Register registers a task handler for a specific task name. +// +// The handler function will be associated with the provided task name and +// invoked when a task with that name is received by the worker. +// +// Parameters: +// - name: The unique name of the task being registered. +// - handler: The TaskHandlerFunc that will handle the task when it is dispatched. +// +// This method allows users to dynamically register multiple tasks for a worker, +// enabling the worker to handle a variety of task types. +// +// Example: +// +// worker.Register("send_email", sendEmailHandler) +func (w *DefaultWorker) Register(name string, handler TaskHandlerFunc) { w.handlers[name] = handler } -// Start begins the worker's task-consuming loop. It continuously consumes tasks from the broker and -// invokes the registered task handlers. If a task handler returns an error, the task may be retried -// based on the backoff policy and retry logic. The method ensures graceful shutdown using a context and WaitGroup. -func (w *Worker) Start(ctx context.Context) { +// Start begins processing tasks for the worker and continuously listens for new tasks +// from the broker until the provided context is cancelled or the task channel is closed. +// +// It runs a task consumption loop that will: +// - Consume tasks from the broker +// - Attempt to handle each task using the corresponding registered handler +// - Retry failed tasks according to the worker's backoff policy (if provided) +// - Exit when the context is cancelled or the task channel is closed +// +// Parameters: +// - ctx: The context used to control the lifetime of the worker. When the context +// is cancelled, the worker will shut down gracefully. +// +// The worker will log information about task successes, failures, retries, and backoff delays. +// +// Example: +// +// worker.Start(ctx) +func (w *DefaultWorker) Start(ctx context.Context) { defer w.wg.Done() tasks, err := w.broker.Consume() From b30de34678bc56e7c04933d3ff8d0ccfd96aa130 Mon Sep 17 00:00:00 2001 From: KengoWada Date: Tue, 29 Apr 2025 13:52:04 +0300 Subject: [PATCH 2/4] test: Update worker tests to align with new Worker interface (#9) --- tests/worker_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/worker_test.go b/tests/worker_test.go index c9017eb..94f14bc 100644 --- a/tests/worker_test.go +++ b/tests/worker_test.go @@ -35,7 +35,8 @@ func TestWorkerProcesses(t *testing.T) { t.Run("should run task successfully", func(t *testing.T) { mockBroker := NewMockBroker(1) wg := &sync.WaitGroup{} - worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg} + worker := taskqueue.DefaultWorkerFactory(cfg) worker.Register(taskName, taskHandler) ctx, cancel := context.WithCancel(context.Background()) @@ -69,7 +70,8 @@ func TestWorkerProcesses(t *testing.T) { mockBroker := NewMockBroker(5) wg := &sync.WaitGroup{} - worker := taskqueue.NewWorker(1, mockBroker, backoff, wg) + cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: backoff, WG: wg} + worker := taskqueue.DefaultWorkerFactory(cfg) worker.Register(failTaskName, failTaskHandler) ctx, cancel := context.WithCancel(context.Background()) @@ -97,7 +99,8 @@ func TestWorkerProcesses(t *testing.T) { t.Run("should handle failing task with no backoff", func(t *testing.T) { mockBroker := NewMockBroker(5) wg := &sync.WaitGroup{} - worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg} + worker := taskqueue.DefaultWorkerFactory(cfg) worker.Register(failTaskName, failTaskHandler) ctx, cancel := context.WithCancel(context.Background()) @@ -125,7 +128,8 @@ func TestWorkerProcesses(t *testing.T) { t.Run("should not execute task with no handler", func(t *testing.T) { mockBroker := NewMockBroker(1) wg := &sync.WaitGroup{} - worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg} + worker := taskqueue.DefaultWorkerFactory(cfg) worker.Register(taskName, taskHandler) ctx, cancel := context.WithCancel(context.Background()) @@ -153,7 +157,8 @@ func TestWorkerProcesses(t *testing.T) { mockBroker := NewMockBroker(1) mockBroker.badConsume = true wg := &sync.WaitGroup{} - worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg} + worker := taskqueue.DefaultWorkerFactory(cfg) worker.Register(taskName, taskHandler) ctx, cancel := context.WithCancel(context.Background()) @@ -174,7 +179,8 @@ func TestWorkerProcesses(t *testing.T) { mockBroker := NewMockBroker(1) mockBroker.closeChannel = true wg := &sync.WaitGroup{} - worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg} + worker := taskqueue.DefaultWorkerFactory(cfg) worker.Register(taskName, taskHandler) ctx, cancel := context.WithCancel(context.Background()) From 67ed7a41b0e4909d021204be9264017644c2ac7b Mon Sep 17 00:00:00 2001 From: KengoWada Date: Tue, 29 Apr 2025 13:53:06 +0300 Subject: [PATCH 3/4] refactor: Update manager to depend on Worker interface (#9) --- manager.go | 61 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/manager.go b/manager.go index b77f391..fc79282 100644 --- a/manager.go +++ b/manager.go @@ -24,7 +24,7 @@ type ManagerConfig struct { // coordinating task consumption and processing, and gracefully stopping workers. type Manager struct { broker Broker - workers []*Worker + workers []Worker ctx context.Context // The context used for managing the lifecycle of the Manager. cancel context.CancelFunc // The cancel function associated with the context, used to stop the Manager. @@ -43,41 +43,45 @@ func WithBackoffPolicy(bp *BackoffPolicy) ManagerOption { } } -// NewManager creates and returns a new Manager instance. The Manager is responsible for -// managing workers that process tasks from the provided Broker. It allows for a custom -// number of workers and optional configuration such as a backoff policy for task retries. -// -// The function accepts a Broker, the number of workers to create, and an optional list of -// ManagerOption functions for additional configuration. If no backoff policy is provided, -// a default backoff policy will be applied. +// NewManager creates a new Manager instance responsible for coordinating and supervising workers. // // Parameters: -// - broker: The Broker to be used for publishing and consuming tasks. -// - numWorkers: The number of workers to be created (defaults to 1 if less than or equal to 0). -// - opts: A variadic list of ManagerOption functions for additional configuration. +// - broker: The Broker used by all workers to fetch and process tasks. +// - wf: A WorkerFactory function used to create each Worker with a provided WorkerConfig. +// - numWorkers: The number of workers to spawn. If set to 0 or less, DefaultNumOfWorkers is used. +// - opts: Optional functional configuration parameters for customizing Manager behavior (e.g. backoff policy). +// +// The Manager sets up a cancellable context for controlling the lifecycle of its workers, +// and assigns a shared WaitGroup to coordinate shutdowns. If no BackoffPolicy is provided in +// the options, a DefaultBackoffPolicy is used. +// +// Each worker is created using the WorkerFactory and given a unique ID, shared broker, +// backoff policy, and reference to the Manager's WaitGroup. // // Returns: -// - A pointer to the created Manager instance. // -// Example usage: +// A pointer to a fully initialized Manager ready to register task handlers and start processing. +// +// Example: // -// broker := NewRabbitMQBroker(...) -// manager := NewManager(broker, 3, WithBackoffPolicy(customBackoffPolicy)) -func NewManager(broker Broker, numWorkers int, opts ...ManagerOption) *Manager { +// mgr := NewManager(broker, MyWorkerFactory(), 5, WithBackoff(customBackoff)) +// mgr.RegisterTask("send_email", emailHandler) +// mgr.Start() +func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...ManagerOption) *Manager { ctx, cancel := context.WithCancel(context.Background()) if numWorkers <= 0 { numWorkers = DefaultNumOfWorkers } - cfg := &ManagerConfig{} + managerConfig := &ManagerConfig{} for _, opt := range opts { - opt(cfg) + opt(managerConfig) } // Set default backoff if cfg.BackoffPolicy == nil - if cfg.BackoffPolicy == nil { - cfg.BackoffPolicy = &BackoffPolicy{ + if managerConfig.BackoffPolicy == nil { + managerConfig.BackoffPolicy = &BackoffPolicy{ BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, UseJitter: true, @@ -85,14 +89,19 @@ func NewManager(broker Broker, numWorkers int, opts ...ManagerOption) *Manager { } } - m := &Manager{broker: broker, ctx: ctx, cancel: cancel} - for i := 1; i <= numWorkers; i++ { - m.wg.Add(1) - worker := NewWorker(i, broker, cfg.BackoffPolicy, &m.wg) - m.workers = append(m.workers, worker) + manager := &Manager{broker: broker, ctx: ctx, cancel: cancel} + for i := range numWorkers { + manager.wg.Add(1) + workerConfig := WorkerConfig{ + ID: i + 1, + Broker: manager.broker, + Backoff: managerConfig.BackoffPolicy, + WG: &manager.wg, + } + manager.workers = append(manager.workers, wf(workerConfig)) } - return m + return manager } // RegisterTask registers a task handler for the specified task name across all workers managed by the Manager. From 3d3ea1a712a404f6d238071437c06a9abe002ab0 Mon Sep 17 00:00:00 2001 From: KengoWada Date: Tue, 29 Apr 2025 13:53:50 +0300 Subject: [PATCH 4/4] docs: Update example to use DefaultWorkerFactory (#9) --- example/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/main.go b/example/main.go index efb4f40..8ef1e2a 100644 --- a/example/main.go +++ b/example/main.go @@ -39,7 +39,7 @@ func main() { JitterRangeMs: 500, } - manager := taskqueue.NewManager(broker, 5, taskqueue.WithBackoffPolicy(backoffPolicy)) + manager := taskqueue.NewManager(broker, taskqueue.DefaultWorkerFactory, 5, taskqueue.WithBackoffPolicy(backoffPolicy)) manager.RegisterTask(sendEmailTaskName, sendEmailTask) manager.Start()