Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# TaskQueue: Go-based Distributed Task Queue

[![Test and Coverage](https://github.com/KengoWada/taskqueue/actions/workflows/test.yml/badge.svg?branch=develop)](https://github.com/KengoWada/taskqueue/actions/workflows/test.yml) [![codecov](https://codecov.io/gh/KengoWada/taskqueue/graph/badge.svg?token=Y0VW883H2V)](https://codecov.io/gh/KengoWada/taskqueue) ![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)

TaskQueue is a Go library that provides a simple and efficient way to manage and execute asynchronous tasks. It's inspired by [Celery](https://docs.celeryq.dev/en/stable/getting-started/introduction.html) and designed to be highly extensible, allowing you to easily distribute tasks across multiple workers.

## ⚠️ Warning: Not Production Ready
Expand Down
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ignore:
- "example"
- "tests"
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ module github.com/KengoWada/taskqueue

go 1.23.6

require github.com/redis/go-redis/v9 v9.7.3
require (
github.com/redis/go-redis/v9 v9.7.3
github.com/stretchr/testify v1.10.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
48 changes: 48 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package taskqueue_test

import (
"errors"
"sync"

"github.com/KengoWada/taskqueue"
)

var errSimulated = errors.New("simulated error")

type MockBroker struct {
mu sync.Mutex
tasks chan taskqueue.Task
publishedTasks []taskqueue.Task
badConsume bool
badPublish bool
closeChannel bool
}

func NewMockBroker(buffer int) *MockBroker {
return &MockBroker{
tasks: make(chan taskqueue.Task, buffer),
}
}

func (m *MockBroker) Publish(task taskqueue.Task) error {
if m.badPublish {
return errSimulated
}

m.mu.Lock()
defer m.mu.Unlock()
m.publishedTasks = append(m.publishedTasks, task)
m.tasks <- task
return nil
}

func (m *MockBroker) Consume() (<-chan taskqueue.Task, error) {
if m.badConsume {
return nil, errSimulated
}

if m.closeChannel {
close(m.tasks)
}
return m.tasks, nil
}
193 changes: 193 additions & 0 deletions tests/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package taskqueue_test

import (
"bytes"
"context"
"errors"
"fmt"
"log"
"sync"
"testing"
"time"

"github.com/KengoWada/taskqueue"
"github.com/stretchr/testify/assert"
)

func TestWorkerProcesses(t *testing.T) {
taskName := "test_task"
taskHandlerCalled := false
taskHandler := func(args taskqueue.TaskArgs) error {
taskHandlerCalled = true
return nil
}

failTaskName := "fail_task"
failCount := 0
failTaskHandler := func(args taskqueue.TaskArgs) error {
failCount += 1
fmt.Println("here")
return errors.New("simulated error")
}

maxRetries := 3

t.Run("should run task successfully", func(t *testing.T) {
mockBroker := NewMockBroker(1)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg.Add(1)
go worker.Start(ctx)

task := taskqueue.Task{
Name: taskName,
Args: taskqueue.TaskArgs{},
}
taskHandlerCalled = false
err := mockBroker.Publish(task)
assert.Nil(t, err)

time.Sleep(100 * time.Millisecond)
cancel()
wg.Wait()

assert.True(t, taskHandlerCalled)
})

t.Run("should handle failing task with backoff", func(t *testing.T) {
backoff := &taskqueue.BackoffPolicy{
BaseDelay: 5 * time.Millisecond, // small delay for fast test
MaxDelay: 20 * time.Millisecond,
UseJitter: false,
JitterRangeMs: 0,
}

mockBroker := NewMockBroker(5)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, backoff, wg)
worker.Register(failTaskName, failTaskHandler)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg.Add(1)
go worker.Start(ctx)

task := taskqueue.Task{
Name: failTaskName,
Args: taskqueue.TaskArgs{},
MaxRetry: maxRetries,
}
failCount = 0
err := mockBroker.Publish(task)
assert.Nil(t, err)

time.Sleep(100 * time.Millisecond)
cancel()
wg.Wait()

assert.Equal(t, maxRetries+1, failCount)
})

t.Run("should handle failing task with no backoff", func(t *testing.T) {
mockBroker := NewMockBroker(5)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
worker.Register(failTaskName, failTaskHandler)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg.Add(1)
go worker.Start(ctx)

task := taskqueue.Task{
Name: failTaskName,
Args: taskqueue.TaskArgs{},
MaxRetry: maxRetries,
}
failCount = 0
err := mockBroker.Publish(task)
assert.Nil(t, err)

time.Sleep(100 * time.Millisecond)
cancel()
wg.Wait()

assert.Equal(t, maxRetries+1, failCount)
})

t.Run("should not execute task with no handler", func(t *testing.T) {
mockBroker := NewMockBroker(1)
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg.Add(1)
go worker.Start(ctx)

task := taskqueue.Task{
Name: "invalid_task_name",
Args: taskqueue.TaskArgs{},
}
taskHandlerCalled = false
err := mockBroker.Publish(task)
assert.Nil(t, err)

time.Sleep(100 * time.Millisecond)
cancel()
wg.Wait()

assert.False(t, taskHandlerCalled)
})

t.Run("should log and return after error", func(t *testing.T) {
mockBroker := NewMockBroker(1)
mockBroker.badConsume = true
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var logOutput bytes.Buffer
log.SetOutput(&logOutput)
defer log.SetOutput(nil)

wg.Add(1)
go worker.Start(ctx)
time.Sleep(100 * time.Millisecond)

assert.Contains(t, logOutput.String(), "Failed to consume tasks: simulated error")
})

t.Run("should log when task channel is closed", func(t *testing.T) {
mockBroker := NewMockBroker(1)
mockBroker.closeChannel = true
wg := &sync.WaitGroup{}
worker := taskqueue.NewWorker(1, mockBroker, nil, wg)
worker.Register(taskName, taskHandler)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var logOutput bytes.Buffer
log.SetOutput(&logOutput)
defer log.SetOutput(nil)

wg.Add(1)
go worker.Start(ctx)
time.Sleep(100 * time.Millisecond)

assert.Contains(t, logOutput.String(), "Task channel closed, worker exiting...")
})
}
2 changes: 1 addition & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (w *Worker) Start(ctx context.Context) {

tasks, err := w.broker.Consume()
if err != nil {
log.Fatalf("Failed to consume tasks: %v", err)
log.Printf("Failed to consume tasks: %v\n", err)
return
}

Expand Down