From e072a9f0ed1ec975bc110a30e5a206012875e6c1 Mon Sep 17 00:00:00 2001 From: KengoWada Date: Mon, 28 Apr 2025 16:14:20 +0300 Subject: [PATCH 1/6] test: Add mock broker for testing Implemented a mock Broker for testing purposes to simulate task consumption and handle errors for Worker tests. Issue #4 --- tests/utils.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 tests/utils.go diff --git a/tests/utils.go b/tests/utils.go new file mode 100644 index 0000000..8dbc273 --- /dev/null +++ b/tests/utils.go @@ -0,0 +1,48 @@ +package taskqueue_test + +import ( + "errors" + "sync" + + "github.com/KengoWada/taskqueue" +) + +var errSimulated = errors.New("simulated error") + +type MockBroker struct { + mu sync.Mutex + tasks chan taskqueue.Task + publishedTasks []taskqueue.Task + badConsume bool + badPublish bool + closeChannel bool +} + +func NewMockBroker(buffer int) *MockBroker { + return &MockBroker{ + tasks: make(chan taskqueue.Task, buffer), + } +} + +func (m *MockBroker) Publish(task taskqueue.Task) error { + if m.badPublish { + return errSimulated + } + + m.mu.Lock() + defer m.mu.Unlock() + m.publishedTasks = append(m.publishedTasks, task) + m.tasks <- task + return nil +} + +func (m *MockBroker) Consume() (<-chan taskqueue.Task, error) { + if m.badConsume { + return nil, errSimulated + } + + if m.closeChannel { + close(m.tasks) + } + return m.tasks, nil +} From 34c9d48d10c82a9e26dab2036ff4b0a87bb27274 Mon Sep 17 00:00:00 2001 From: KengoWada Date: Mon, 28 Apr 2025 16:14:32 +0300 Subject: [PATCH 2/6] test: Add unit tests for Worker Added unit tests for the Worker to cover task consumption, handler registration, retries, and backoff policy handling. Issue #4 --- tests/worker_test.go | 193 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 tests/worker_test.go diff --git a/tests/worker_test.go b/tests/worker_test.go new file mode 100644 index 0000000..c9017eb --- /dev/null +++ b/tests/worker_test.go @@ -0,0 +1,193 @@ +package taskqueue_test + +import ( + "bytes" + "context" + "errors" + "fmt" + "log" + "sync" + "testing" + "time" + + "github.com/KengoWada/taskqueue" + "github.com/stretchr/testify/assert" +) + +func TestWorkerProcesses(t *testing.T) { + taskName := "test_task" + taskHandlerCalled := false + taskHandler := func(args taskqueue.TaskArgs) error { + taskHandlerCalled = true + return nil + } + + failTaskName := "fail_task" + failCount := 0 + failTaskHandler := func(args taskqueue.TaskArgs) error { + failCount += 1 + fmt.Println("here") + return errors.New("simulated error") + } + + maxRetries := 3 + + t.Run("should run task successfully", func(t *testing.T) { + mockBroker := NewMockBroker(1) + wg := &sync.WaitGroup{} + worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + worker.Register(taskName, taskHandler) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg.Add(1) + go worker.Start(ctx) + + task := taskqueue.Task{ + Name: taskName, + Args: taskqueue.TaskArgs{}, + } + taskHandlerCalled = false + err := mockBroker.Publish(task) + assert.Nil(t, err) + + time.Sleep(100 * time.Millisecond) + cancel() + wg.Wait() + + assert.True(t, taskHandlerCalled) + }) + + t.Run("should handle failing task with backoff", func(t *testing.T) { + backoff := &taskqueue.BackoffPolicy{ + BaseDelay: 5 * time.Millisecond, // small delay for fast test + MaxDelay: 20 * time.Millisecond, + UseJitter: false, + JitterRangeMs: 0, + } + + mockBroker := NewMockBroker(5) + wg := &sync.WaitGroup{} + worker := taskqueue.NewWorker(1, mockBroker, backoff, wg) + worker.Register(failTaskName, failTaskHandler) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg.Add(1) + go worker.Start(ctx) + + task := taskqueue.Task{ + Name: failTaskName, + Args: taskqueue.TaskArgs{}, + MaxRetry: maxRetries, + } + failCount = 0 + err := mockBroker.Publish(task) + assert.Nil(t, err) + + time.Sleep(100 * time.Millisecond) + cancel() + wg.Wait() + + assert.Equal(t, maxRetries+1, failCount) + }) + + t.Run("should handle failing task with no backoff", func(t *testing.T) { + mockBroker := NewMockBroker(5) + wg := &sync.WaitGroup{} + worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + worker.Register(failTaskName, failTaskHandler) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg.Add(1) + go worker.Start(ctx) + + task := taskqueue.Task{ + Name: failTaskName, + Args: taskqueue.TaskArgs{}, + MaxRetry: maxRetries, + } + failCount = 0 + err := mockBroker.Publish(task) + assert.Nil(t, err) + + time.Sleep(100 * time.Millisecond) + cancel() + wg.Wait() + + assert.Equal(t, maxRetries+1, failCount) + }) + + t.Run("should not execute task with no handler", func(t *testing.T) { + mockBroker := NewMockBroker(1) + wg := &sync.WaitGroup{} + worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + worker.Register(taskName, taskHandler) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg.Add(1) + go worker.Start(ctx) + + task := taskqueue.Task{ + Name: "invalid_task_name", + Args: taskqueue.TaskArgs{}, + } + taskHandlerCalled = false + err := mockBroker.Publish(task) + assert.Nil(t, err) + + time.Sleep(100 * time.Millisecond) + cancel() + wg.Wait() + + assert.False(t, taskHandlerCalled) + }) + + t.Run("should log and return after error", func(t *testing.T) { + mockBroker := NewMockBroker(1) + mockBroker.badConsume = true + wg := &sync.WaitGroup{} + worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + worker.Register(taskName, taskHandler) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var logOutput bytes.Buffer + log.SetOutput(&logOutput) + defer log.SetOutput(nil) + + wg.Add(1) + go worker.Start(ctx) + time.Sleep(100 * time.Millisecond) + + assert.Contains(t, logOutput.String(), "Failed to consume tasks: simulated error") + }) + + t.Run("should log when task channel is closed", func(t *testing.T) { + mockBroker := NewMockBroker(1) + mockBroker.closeChannel = true + wg := &sync.WaitGroup{} + worker := taskqueue.NewWorker(1, mockBroker, nil, wg) + worker.Register(taskName, taskHandler) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var logOutput bytes.Buffer + log.SetOutput(&logOutput) + defer log.SetOutput(nil) + + wg.Add(1) + go worker.Start(ctx) + time.Sleep(100 * time.Millisecond) + + assert.Contains(t, logOutput.String(), "Task channel closed, worker exiting...") + }) +} From 5338f4494838177b19890f428d853220d6afc2aa Mon Sep 17 00:00:00 2001 From: KengoWada Date: Mon, 28 Apr 2025 16:14:42 +0300 Subject: [PATCH 3/6] fix: Change log.Fatalf to log.Printf in Worker Replaced log.Fatalf with log.Printf to avoid premature termination. Issue #4 --- worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker.go b/worker.go index 888098b..aa8ce9f 100644 --- a/worker.go +++ b/worker.go @@ -44,7 +44,7 @@ func (w *Worker) Start(ctx context.Context) { tasks, err := w.broker.Consume() if err != nil { - log.Fatalf("Failed to consume tasks: %v", err) + log.Printf("Failed to consume tasks: %v\n", err) return } From cc7e3205073448ea684c75f59eb790005e2fcea3 Mon Sep 17 00:00:00 2001 From: KengoWada Date: Mon, 28 Apr 2025 16:20:36 +0300 Subject: [PATCH 4/6] chore: Add assert library for testing --- go.mod | 8 +++++++- go.sum | 10 ++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7469d11..64bcf16 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,15 @@ module github.com/KengoWada/taskqueue go 1.23.6 -require github.com/redis/go-redis/v9 v9.7.3 +require ( + github.com/redis/go-redis/v9 v9.7.3 + github.com/stretchr/testify v1.10.0 +) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index bc52aa2..2803bb1 100644 --- a/go.sum +++ b/go.sum @@ -4,7 +4,17 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 8842db6fa6490384a5b64ea91ab3234cbbf4a0ea Mon Sep 17 00:00:00 2001 From: KengoWada Date: Mon, 28 Apr 2025 16:50:06 +0300 Subject: [PATCH 5/6] feat: Add codecov.yml to ignore example and tests directories in coverage report --- codecov.yml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 codecov.yml diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..5c48b54 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,3 @@ +ignore: + - "example" + - "tests" From 8969131685cfeac4f86288d4092e84a3fdc07a2e Mon Sep 17 00:00:00 2001 From: KengoWada Date: Mon, 28 Apr 2025 16:53:21 +0300 Subject: [PATCH 6/6] docs: Add badges for test coverage and codecov status to README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 08a8f11..41f0f2f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # TaskQueue: Go-based Distributed Task Queue +[![Test and Coverage](https://github.com/KengoWada/taskqueue/actions/workflows/test.yml/badge.svg?branch=develop)](https://github.com/KengoWada/taskqueue/actions/workflows/test.yml) [![codecov](https://codecov.io/gh/KengoWada/taskqueue/graph/badge.svg?token=Y0VW883H2V)](https://codecov.io/gh/KengoWada/taskqueue) ![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg) + TaskQueue is a Go library that provides a simple and efficient way to manage and execute asynchronous tasks. It's inspired by [Celery](https://docs.celeryq.dev/en/stable/getting-started/introduction.html) and designed to be highly extensible, allowing you to easily distribute tasks across multiple workers. ## ⚠️ Warning: Not Production Ready