Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
JitterRangeMs: 500,
}

manager := taskqueue.NewManager(broker, 5, taskqueue.WithBackoffPolicy(backoffPolicy))
manager := taskqueue.NewManager(broker, taskqueue.DefaultWorkerFactory, 5, taskqueue.WithBackoffPolicy(backoffPolicy))
manager.RegisterTask(sendEmailTaskName, sendEmailTask)
manager.Start()

Expand Down
61 changes: 35 additions & 26 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ManagerConfig struct {
// coordinating task consumption and processing, and gracefully stopping workers.
type Manager struct {
broker Broker
workers []*Worker
workers []Worker

ctx context.Context // The context used for managing the lifecycle of the Manager.
cancel context.CancelFunc // The cancel function associated with the context, used to stop the Manager.
Expand All @@ -43,56 +43,65 @@ func WithBackoffPolicy(bp *BackoffPolicy) ManagerOption {
}
}

// NewManager creates and returns a new Manager instance. The Manager is responsible for
// managing workers that process tasks from the provided Broker. It allows for a custom
// number of workers and optional configuration such as a backoff policy for task retries.
//
// The function accepts a Broker, the number of workers to create, and an optional list of
// ManagerOption functions for additional configuration. If no backoff policy is provided,
// a default backoff policy will be applied.
// NewManager creates a new Manager instance responsible for coordinating and supervising workers.
//
// Parameters:
// - broker: The Broker to be used for publishing and consuming tasks.
// - numWorkers: The number of workers to be created (defaults to 1 if less than or equal to 0).
// - opts: A variadic list of ManagerOption functions for additional configuration.
// - broker: The Broker used by all workers to fetch and process tasks.
// - wf: A WorkerFactory function used to create each Worker with a provided WorkerConfig.
// - numWorkers: The number of workers to spawn. If set to 0 or less, DefaultNumOfWorkers is used.
// - opts: Optional functional configuration parameters for customizing Manager behavior (e.g. backoff policy).
//
// The Manager sets up a cancellable context for controlling the lifecycle of its workers,
// and assigns a shared WaitGroup to coordinate shutdowns. If no BackoffPolicy is provided in
// the options, a DefaultBackoffPolicy is used.
//
// Each worker is created using the WorkerFactory and given a unique ID, shared broker,
// backoff policy, and reference to the Manager's WaitGroup.
//
// Returns:
// - A pointer to the created Manager instance.
//
// Example usage:
// A pointer to a fully initialized Manager ready to register task handlers and start processing.
//
// Example:
//
// broker := NewRabbitMQBroker(...)
// manager := NewManager(broker, 3, WithBackoffPolicy(customBackoffPolicy))
func NewManager(broker Broker, numWorkers int, opts ...ManagerOption) *Manager {
// mgr := NewManager(broker, MyWorkerFactory(), 5, WithBackoff(customBackoff))
// mgr.RegisterTask("send_email", emailHandler)
// mgr.Start()
func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...ManagerOption) *Manager {
ctx, cancel := context.WithCancel(context.Background())

if numWorkers <= 0 {
numWorkers = DefaultNumOfWorkers
}

cfg := &ManagerConfig{}
managerConfig := &ManagerConfig{}
for _, opt := range opts {
opt(cfg)
opt(managerConfig)
}

// Set default backoff if cfg.BackoffPolicy == nil
if cfg.BackoffPolicy == nil {
cfg.BackoffPolicy = &BackoffPolicy{
if managerConfig.BackoffPolicy == nil {
managerConfig.BackoffPolicy = &BackoffPolicy{
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
UseJitter: true,
JitterRangeMs: 300,
}
}

m := &Manager{broker: broker, ctx: ctx, cancel: cancel}
for i := 1; i <= numWorkers; i++ {
m.wg.Add(1)
worker := NewWorker(i, broker, cfg.BackoffPolicy, &m.wg)
m.workers = append(m.workers, worker)
manager := &Manager{broker: broker, ctx: ctx, cancel: cancel}
for i := range numWorkers {
manager.wg.Add(1)
workerConfig := WorkerConfig{
ID: i + 1,
Broker: manager.broker,
Backoff: managerConfig.BackoffPolicy,
WG: &manager.wg,
}
manager.workers = append(manager.workers, wf(workerConfig))
}

return m
return manager
}

// RegisterTask registers a task handler for the specified task name across all workers managed by the Manager.
Expand Down
18 changes: 12 additions & 6 deletions tests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestWorkerProcesses(t *testing.T) {
t.Run("should run task successfully", func(t *testing.T) {
mockBroker := NewMockBroker(1)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg}
worker := taskqueue.DefaultWorkerFactory(cfg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -69,7 +70,8 @@ func TestWorkerProcesses(t *testing.T) {

mockBroker := NewMockBroker(5)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, backoff, wg)
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: backoff, WG: wg}
worker := taskqueue.DefaultWorkerFactory(cfg)
worker.Register(failTaskName, failTaskHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -97,7 +99,8 @@ func TestWorkerProcesses(t *testing.T) {
t.Run("should handle failing task with no backoff", func(t *testing.T) {
mockBroker := NewMockBroker(5)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg}
worker := taskqueue.DefaultWorkerFactory(cfg)
worker.Register(failTaskName, failTaskHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -125,7 +128,8 @@ func TestWorkerProcesses(t *testing.T) {
t.Run("should not execute task with no handler", func(t *testing.T) {
mockBroker := NewMockBroker(1)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg}
worker := taskqueue.DefaultWorkerFactory(cfg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -153,7 +157,8 @@ func TestWorkerProcesses(t *testing.T) {
mockBroker := NewMockBroker(1)
mockBroker.badConsume = true
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg}
worker := taskqueue.DefaultWorkerFactory(cfg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -174,7 +179,8 @@ func TestWorkerProcesses(t *testing.T) {
mockBroker := NewMockBroker(1)
mockBroker.closeChannel = true
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg}
worker := taskqueue.DefaultWorkerFactory(cfg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
159 changes: 135 additions & 24 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,150 @@ import (
"time"
)

// Worker represents a single worker that processes tasks from a broker.
// It is responsible for consuming tasks, executing handlers, and managing retries.
type Worker struct {
id int // The unique identifier for the worker.
broker Broker // The broker from which tasks are consumed.
backoff *BackoffPolicy // The backoff policy for retrying tasks in case of failure.
handlers map[string]TaskHandlerFunc // A map of task names to their corresponding handler functions.
wg *sync.WaitGroup // A WaitGroup to synchronize worker shutdown and wait for all tasks to finish.
// Worker defines the contract for a task-processing worker.
//
// Implementations of Worker are responsible for registering task handlers
// and starting the task execution loop, typically consuming tasks from a Broker.
//
// Methods:
// - Register: Associates a task name with a handler function.
// - Start: Begins processing tasks using the provided context for cancellation.
//
// This interface allows different worker implementations to be plugged into the Manager,
// enabling customizable behavior such as logging, metrics, or concurrency models.
//
// Example usage:
//
// type MyWorker struct { ... }
// func (w *MyWorker) Register(name string, handler TaskHandlerFunc) { ... }
// func (w *MyWorker) Start(ctx context.Context) { ... }
type Worker interface {
Register(name string, handler TaskHandlerFunc)
Start(ctx context.Context)
}

// NewWorker creates a new worker instance with the given ID, broker, backoff policy, and WaitGroup.
// The worker is responsible for consuming tasks from the broker, executing the appropriate handlers,
// and handling retries in case of failures.
func NewWorker(id int, broker Broker, backoff *BackoffPolicy, wg *sync.WaitGroup) *Worker {
return &Worker{
id: id,
broker: broker,
backoff: backoff,
// WorkerFactory defines a function that creates a new Worker instance using the provided configuration.
//
// This allows users to customize how Workers are constructed, enabling injection of custom
// dependencies (e.g., loggers, metrics, settings) without modifying the Manager.
//
// The WorkerConfig contains common fields such as worker ID, broker, backoff policy, and a wait group.
//
// Example:
//
// func MyWorkerFactory(cfg WorkerConfig) Worker {
// return &MyCustomWorker{
// id: cfg.ID,
// broker: cfg.Broker,
// backoff: cfg.Backoff,
// wg: cfg.WG,
// logger: myLogger, // custom dependency
// }
// }
type WorkerFactory func(cfg WorkerConfig) Worker

// WorkerConfig provides the configuration necessary to initialize a Worker.
//
// It is passed to the WorkerFactory function by the Manager to ensure that all workers
// have the required shared dependencies and context-specific data.
//
// Fields:
// - ID: A unique identifier for the worker, typically assigned by the Manager.
// - Broker: The Broker instance used to retrieve and dispatch tasks.
// - Backoff: The policy used for retrying tasks on failure.
// - WG: A shared WaitGroup used by the Manager to coordinate worker shutdown.
//
// This struct is designed to be extended if additional shared dependencies need to
// be passed to workers in the future.
type WorkerConfig struct {
ID int
Broker Broker
Backoff *BackoffPolicy
WG *sync.WaitGroup
}

// DefaultWorker is the standard implementation of the Worker interface.
//
// It is responsible for consuming tasks from the provided Broker, executing them using
// registered handler functions, and retrying failed tasks based on a backoff policy.
//
// Fields:
// - id: A unique identifier for this worker instance.
// - broker: The Broker used to fetch tasks for execution.
// - backoff: The BackoffPolicy used to delay retries on task failure.
// - handlers: A map of task names to their associated TaskHandlerFunc implementations.
// - wg: A shared WaitGroup used by the Manager to coordinate worker shutdown.
//
// DefaultWorker is intended to be created using a WorkerFactory and managed by a Manager.
// It supports context-based cancellation for graceful shutdown.
type DefaultWorker struct {
id int
broker Broker
backoff *BackoffPolicy
handlers map[string]TaskHandlerFunc
wg *sync.WaitGroup
}

// DefaultWorkerFactory creates and returns a new instance of DefaultWorker using the provided WorkerConfig.
//
// The worker is initialized with:
// - A unique ID
// - A broker for task consumption
// - A backoff policy for retrying failed tasks
// - A shared WaitGroup for graceful shutdown coordination
// - An empty handler map ready for task registration
//
// This function returns a Worker interface, allowing the DefaultWorker to be used polymorphically.
//
// Typically used within a WorkerFactory passed to the Manager.
func DefaultWorkerFactory(cfg WorkerConfig) Worker {
return &DefaultWorker{
id: cfg.ID,
broker: cfg.Broker,
backoff: cfg.Backoff,
handlers: make(map[string]TaskHandlerFunc),
wg: wg,
wg: cfg.WG,
}
}

// Register registers a task handler function for a specific task name.
// The handler will be invoked when a task with the specified name is consumed from the broker.
func (w *Worker) Register(name string, handler TaskHandlerFunc) {
// Register registers a task handler for a specific task name.
//
// The handler function will be associated with the provided task name and
// invoked when a task with that name is received by the worker.
//
// Parameters:
// - name: The unique name of the task being registered.
// - handler: The TaskHandlerFunc that will handle the task when it is dispatched.
//
// This method allows users to dynamically register multiple tasks for a worker,
// enabling the worker to handle a variety of task types.
//
// Example:
//
// worker.Register("send_email", sendEmailHandler)
func (w *DefaultWorker) Register(name string, handler TaskHandlerFunc) {
w.handlers[name] = handler
}

// Start begins the worker's task-consuming loop. It continuously consumes tasks from the broker and
// invokes the registered task handlers. If a task handler returns an error, the task may be retried
// based on the backoff policy and retry logic. The method ensures graceful shutdown using a context and WaitGroup.
func (w *Worker) Start(ctx context.Context) {
// Start begins processing tasks for the worker and continuously listens for new tasks
// from the broker until the provided context is cancelled or the task channel is closed.
//
// It runs a task consumption loop that will:
// - Consume tasks from the broker
// - Attempt to handle each task using the corresponding registered handler
// - Retry failed tasks according to the worker's backoff policy (if provided)
// - Exit when the context is cancelled or the task channel is closed
//
// Parameters:
// - ctx: The context used to control the lifetime of the worker. When the context
// is cancelled, the worker will shut down gracefully.
//
// The worker will log information about task successes, failures, retries, and backoff delays.
//
// Example:
//
// worker.Start(ctx)
func (w *DefaultWorker) Start(ctx context.Context) {
defer w.wg.Done()

tasks, err := w.broker.Consume()
Expand Down