diff --git a/examples/README.md b/examples/README.md index 60e537e..96ae4ac 100644 --- a/examples/README.md +++ b/examples/README.md @@ -6,6 +6,18 @@ This directory contains examples demonstrating various aspects of the [go-pkgz/p ## Available Examples +### [basic](./basic) +Minimal "hello world" example to get started quickly. Demonstrates: +- Simplest pool creation and usage +- Submitting work items +- Basic metrics + +### [chunking](./chunking) +Shows how to use WithChunkFn for consistent work distribution by key. Demonstrates: +- Key-based routing (same key always goes to same worker) +- Per-key aggregation without synchronization +- Worker ID tracking via context + ### [tokenizer_stateful](./tokenizer_stateful) Shows how to use stateful workers where each worker maintains its own independent state (word frequency counters). Demonstrates: - Worker state isolation @@ -53,11 +65,23 @@ Shows how to handle and categorize errors in parallel processing. Demonstrates: - Timing information tracking - Statistical reporting on errors +### [pool_completion](./pool_completion) +Shows how to use the pool completion callback for final aggregation. Demonstrates: +- Pool completion callback (WithPoolCompleteFn) +- Final cleanup when all workers finish +- Difference between worker and pool completion callbacks + ## Running Examples Each example can be run from its directory: ```bash -cd tokenizer_stateful +cd basic +go run main.go + +cd ../chunking +go run main.go + +cd ../tokenizer_stateful go run main.go -file input.txt cd ../tokenizer_stateless @@ -77,15 +101,19 @@ go run main.go cd ../collector_errors go run main.go -workers 8 -jobs 100 -error-rate 0.3 + +cd ../pool_completion +go run main.go ``` ## Common Patterns While the examples are simplified, they showcase important pool package features: +- Basic pool usage (basic) +- Consistent work distribution by key (chunking) - Worker state management (stateful vs stateless) - Result collection strategies - Error handling approaches - Metrics and monitoring -- Work distribution patterns - Middleware integration - Multi-stage processing pipelines \ No newline at end of file diff --git a/examples/basic/README.md b/examples/basic/README.md new file mode 100644 index 0000000..f294882 --- /dev/null +++ b/examples/basic/README.md @@ -0,0 +1,66 @@ +# Basic Example + +Minimal "hello world" example demonstrating the simplest pool usage. + +## What it demonstrates + +- Creating a pool with `pool.New[T]` +- Using `pool.WorkerFunc` adapter for simple functions +- Submitting work items with `Submit()` +- Closing pool and waiting with `Close()` +- Basic metrics via `Metrics().GetStats()` + +## Running + +```bash +go run main.go +``` + +## Output + +``` +processing: date +processing: apple +processing: banana +processing: cherry +processing: elderberry + +done: processed 5 items in 102ms +``` + +## Code walkthrough + +### Create the pool + +```go +p := pool.New[string](3, pool.WorkerFunc[string](func(_ context.Context, item string) error { + fmt.Printf("processing: %s\n", item) + time.Sleep(50 * time.Millisecond) + return nil +})) +``` + +- `pool.New[string](3, ...)` creates a pool with 3 workers processing strings +- `pool.WorkerFunc` adapts a function to the `Worker` interface + +### Start, submit, close + +```go +p.Go(ctx) // start workers +p.Submit("apple") // submit work (not thread-safe) +p.Close(ctx) // close input and wait for completion +``` + +### Get metrics + +```go +stats := p.Metrics().GetStats() +fmt.Printf("processed %d items in %v\n", stats.Processed, stats.TotalTime) +``` + +## Next steps + +After understanding this basic example, explore: +- [chunking](../chunking) - consistent work distribution by key +- [tokenizer_stateful](../tokenizer_stateful) - workers with state +- [middleware](../middleware) - adding retry, timeout, validation diff --git a/examples/basic/go.mod b/examples/basic/go.mod new file mode 100644 index 0000000..d80b649 --- /dev/null +++ b/examples/basic/go.mod @@ -0,0 +1,9 @@ +module examples/basic + +go 1.24.0 + +require github.com/go-pkgz/pool v0.7.0 + +require golang.org/x/sync v0.19.0 // indirect + +replace github.com/go-pkgz/pool => ../.. diff --git a/examples/basic/go.sum b/examples/basic/go.sum new file mode 100644 index 0000000..becd57e --- /dev/null +++ b/examples/basic/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/basic/main.go b/examples/basic/main.go new file mode 100644 index 0000000..f5e9c25 --- /dev/null +++ b/examples/basic/main.go @@ -0,0 +1,42 @@ +// Example basic demonstrates the simplest possible pool usage. +// This is a minimal "hello world" example to get started quickly. +package main + +import ( + "context" + "fmt" + "time" + + "github.com/go-pkgz/pool" +) + +func main() { + // create a pool with 3 workers using a simple function + p := pool.New[string](3, pool.WorkerFunc[string](func(_ context.Context, item string) error { + fmt.Printf("processing: %s\n", item) + time.Sleep(50 * time.Millisecond) // simulate work + return nil + })) + + // start the pool + ctx := context.Background() + if err := p.Go(ctx); err != nil { + fmt.Printf("failed to start: %v\n", err) + return + } + + // submit work items + items := []string{"apple", "banana", "cherry", "date", "elderberry"} + for _, item := range items { + p.Submit(item) + } + + // close and wait for completion + if err := p.Close(ctx); err != nil { + fmt.Printf("error: %v\n", err) + } + + // print stats + stats := p.Metrics().GetStats() + fmt.Printf("\ndone: processed %d items in %v\n", stats.Processed, stats.TotalTime.Round(time.Millisecond)) +} diff --git a/examples/chunking/README.md b/examples/chunking/README.md new file mode 100644 index 0000000..76f3726 --- /dev/null +++ b/examples/chunking/README.md @@ -0,0 +1,88 @@ +# Chunking Example + +Demonstrates `WithChunkFn` for consistent work distribution by key. + +## What it demonstrates + +- Key-based routing with `WithChunkFn` +- Same key always goes to the same worker (consistent hashing) +- Per-key aggregation without synchronization +- Getting worker ID from context with `metrics.WorkerID(ctx)` + +## Use cases + +- Aggregating events by user/session ID +- Processing messages by partition key +- Any scenario where items with the same key must be handled by the same worker + +## Running + +```bash +go run main.go +``` + +## Output + +``` +worker 1: user=charlie action=login +worker 2: user=alice action=login +worker 1: user=charlie action=view_page +worker 2: user=bob action=login +worker 2: user=alice action=view_page +... + +worker assignment (each user always goes to same worker): + worker 1: charlie(4) + worker 2: bob(3) alice(4) + +processed 11 events in 76ms +``` + +Notice: charlie always goes to worker 1, alice and bob always go to worker 2. + +## Code walkthrough + +### Define chunk function + +```go +p := pool.New[Event](3, worker).WithChunkFn(func(e Event) string { + return e.UserID // route by user ID +}) +``` + +The chunk function extracts a key from each item. Items with the same key are routed to the same worker using consistent hashing (FNV-1a). + +### Get worker ID in worker + +```go +import "github.com/go-pkgz/pool/metrics" + +func worker(ctx context.Context, e Event) error { + workerID := metrics.WorkerID(ctx) + // workerID is consistent for this user +} +``` + +### Benefits + +Without chunking: +- Workers compete for items randomly +- Aggregating by key requires synchronization (mutex/channel) + +With chunking: +- Same key → same worker, always +- Per-key state can be local to worker (no locks needed) +- Useful for building per-user counters, session state, etc. + +## How it works + +1. `WithChunkFn` creates per-worker channels (instead of shared channel) +2. On `Submit()`, the chunk function extracts the key +3. Key is hashed with FNV-1a: `hash(key) % numWorkers` +4. Item is sent to that worker's dedicated channel + +## Related examples + +- [basic](../basic) - simplest pool usage +- [tokenizer_stateful](../tokenizer_stateful) - stateful workers +- [direct_chain](../direct_chain) - multi-stage pipelines diff --git a/examples/chunking/go.mod b/examples/chunking/go.mod new file mode 100644 index 0000000..f0cb5f9 --- /dev/null +++ b/examples/chunking/go.mod @@ -0,0 +1,9 @@ +module examples/chunking + +go 1.24.0 + +require github.com/go-pkgz/pool v0.7.0 + +require golang.org/x/sync v0.19.0 // indirect + +replace github.com/go-pkgz/pool => ../.. diff --git a/examples/chunking/go.sum b/examples/chunking/go.sum new file mode 100644 index 0000000..becd57e --- /dev/null +++ b/examples/chunking/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/chunking/main.go b/examples/chunking/main.go new file mode 100644 index 0000000..b1a1ff6 --- /dev/null +++ b/examples/chunking/main.go @@ -0,0 +1,90 @@ +// Example chunking demonstrates WithChunkFn for consistent work distribution. +// Items with the same key always go to the same worker, enabling per-key +// aggregation without synchronization. Useful for grouping events by user, +// session, or any other key. +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-pkgz/pool" + "github.com/go-pkgz/pool/metrics" +) + +// Event represents something happening for a user +type Event struct { + UserID string + Action string +} + +func main() { + // track which worker processes which users (for demonstration) + var mu sync.Mutex + workerUsers := make(map[int]map[string]int) // workerID -> userID -> count + + // create pool with chunking by UserID + // this ensures all events for a user go to the same worker + p := pool.New[Event](3, pool.WorkerFunc[Event](func(ctx context.Context, e Event) error { + // get worker ID from context + workerID := metrics.WorkerID(ctx) + + mu.Lock() + if workerUsers[workerID] == nil { + workerUsers[workerID] = make(map[string]int) + } + workerUsers[workerID][e.UserID]++ + mu.Unlock() + + fmt.Printf("worker %d: user=%s action=%s\n", workerID, e.UserID, e.Action) + time.Sleep(10 * time.Millisecond) + return nil + })).WithChunkFn(func(e Event) string { + return e.UserID // route by user ID + }) + + ctx := context.Background() + if err := p.Go(ctx); err != nil { + fmt.Printf("failed to start: %v\n", err) + return + } + + // submit events for different users + // notice: same user's events will always go to the same worker + events := []Event{ + {"alice", "login"}, + {"bob", "login"}, + {"charlie", "login"}, + {"alice", "view_page"}, + {"bob", "view_page"}, + {"alice", "click_button"}, + {"charlie", "view_page"}, + {"bob", "logout"}, + {"alice", "logout"}, + {"charlie", "click_button"}, + {"charlie", "logout"}, + } + + for _, e := range events { + p.Submit(e) + } + + if err := p.Close(ctx); err != nil { + fmt.Printf("error: %v\n", err) + } + + // show which worker handled which users + fmt.Printf("\nworker assignment (each user always goes to same worker):\n") + for workerID, users := range workerUsers { + fmt.Printf(" worker %d: ", workerID) + for user, count := range users { + fmt.Printf("%s(%d) ", user, count) + } + fmt.Println() + } + + stats := p.Metrics().GetStats() + fmt.Printf("\nprocessed %d events in %v\n", stats.Processed, stats.TotalTime.Round(time.Millisecond)) +} diff --git a/examples/collector_errors/go.sum b/examples/collector_errors/go.sum index 8f48f08..becd57e 100644 --- a/examples/collector_errors/go.sum +++ b/examples/collector_errors/go.sum @@ -2,12 +2,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/collector_errors/main.go b/examples/collector_errors/main.go index 0625946..9bc8752 100644 --- a/examples/collector_errors/main.go +++ b/examples/collector_errors/main.go @@ -1,3 +1,6 @@ +// Example collector_errors demonstrates error collection and categorization patterns. +// It shows how to capture both successes and failures with detailed metadata, +// group errors by type, and calculate timing statistics for analysis. package main import ( @@ -6,7 +9,6 @@ import ( "flag" "fmt" "math/rand" - "os" "sort" "strings" "time" @@ -45,12 +47,12 @@ func main() { // start the pool if err := p.Go(ctx); err != nil { fmt.Printf("Failed to start pool: %v\n", err) - os.Exit(1) + return } // submit jobs in the background, this is usually done in a separate goroutine go func() { - for i := 0; i < *jobs; i++ { + for i := range *jobs { jobID := fmt.Sprintf("job-%03d", i+1) p.Submit(jobID) } @@ -70,7 +72,7 @@ func main() { }() // collect results using the collector's Iter method - var results []Result + results := make([]Result, 0, *jobs) for result, err := range collector.Iter() { if err != nil { fmt.Printf("Collector error: %v\n", err) @@ -187,7 +189,7 @@ func worker(p workerParam) func(ctx context.Context, jobID string) error { } if p.verbose { - fmt.Printf("❌ %s failed: %v\n", jobID, err) + fmt.Printf("[FAIL] %s: %v\n", jobID, err) } // submit error result to collector @@ -204,7 +206,7 @@ func worker(p workerParam) func(ctx context.Context, jobID string) error { } if p.verbose { - fmt.Printf("✅ %s completed successfully\n", jobID) + fmt.Printf("[OK] %s completed\n", jobID) } // submit success result to collector diff --git a/examples/collectors_chain/go.sum b/examples/collectors_chain/go.sum index c33ea5f..becd57e 100644 --- a/examples/collectors_chain/go.sum +++ b/examples/collectors_chain/go.sum @@ -1,15 +1,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-pkgz/pool v0.5.0 h1:fP0WpEGMAcFEBQ7l7aAZsh7RBkzx34FVgufJoVvDTYY= -github.com/go-pkgz/pool v0.5.0/go.mod h1:e1qn5EYmXshPcOk2buL2ZC20w7RTAWUgbug+L2SyH7I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/collectors_chain/main.go b/examples/collectors_chain/main.go index be08905..21645e7 100644 --- a/examples/collectors_chain/main.go +++ b/examples/collectors_chain/main.go @@ -1,3 +1,6 @@ +// Example collectors_chain demonstrates multi-stage pipeline using collectors for +// coordination between pools. Each stage has independent scaling and uses iterator-based +// streaming for type-safe data transformation between pools. package main import ( @@ -96,7 +99,7 @@ func newSquarePool(ctx context.Context, workers int) *squarePool { // ProcessStrings demonstrates chaining multiple pools together to create a processing pipeline. // Each pool runs concurrently and processes items as they become available from the previous stage. -func ProcessStrings(ctx context.Context, strings []string) ([]finalData, error) { +func ProcessStrings(ctx context.Context, inputs []string) ([]finalData, error) { // create all pools before starting any processing counter := newCounterPool(ctx, 2) multiplier := newMultiplierPool(ctx, 4) @@ -111,8 +114,8 @@ func ProcessStrings(ctx context.Context, strings []string) ([]finalData, error) // first goroutine feeds input data into the pipeline // we use a goroutine to simulate a real-world scenario where data is coming from an external source go func() { - for i := range strings { - fmt.Printf("submitting: %q\n", strings[i]) + for i := range inputs { + fmt.Printf("submitting: %q\n", inputs[i]) counter.WorkerGroup.Submit(stringData{idx: i, ts: time.Now()}) time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond) } @@ -142,7 +145,7 @@ func ProcessStrings(ctx context.Context, strings []string) ([]finalData, error) }() // collect final results until all work is done - var results []finalData + results := make([]finalData, 0, len(inputs)) // iter will stop on completion of squares pool which is the last in the chain // this is a blocking operation and will return when all pools are done // we don't need to wait for each pool to finish explicitly, the iter handles it diff --git a/examples/direct_chain/go.sum b/examples/direct_chain/go.sum index c33ea5f..becd57e 100644 --- a/examples/direct_chain/go.sum +++ b/examples/direct_chain/go.sum @@ -1,15 +1,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-pkgz/pool v0.5.0 h1:fP0WpEGMAcFEBQ7l7aAZsh7RBkzx34FVgufJoVvDTYY= -github.com/go-pkgz/pool v0.5.0/go.mod h1:e1qn5EYmXshPcOk2buL2ZC20w7RTAWUgbug+L2SyH7I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/direct_chain/main.go b/examples/direct_chain/main.go index 109fee5..8dfb2fc 100644 --- a/examples/direct_chain/main.go +++ b/examples/direct_chain/main.go @@ -1,3 +1,6 @@ +// Example direct_chain demonstrates multi-stage processing pipeline with direct +// pool-to-pool submission using Send() (thread-safe). Workers in one pool submit +// directly to the next pool, with pool completion callbacks triggering downstream closure. package main import ( @@ -100,7 +103,7 @@ func ProcessStrings(ctx context.Context, input []string) ([]finalData, error) { pCounter.Close(ctx) }() - var results []finalData + results := make([]finalData, 0, 100) // preallocate for expected results for v := range collector.Iter() { results = append(results, v) } diff --git a/examples/middleware/go.sum b/examples/middleware/go.sum index aecd43f..21e0ee5 100644 --- a/examples/middleware/go.sum +++ b/examples/middleware/go.sum @@ -2,15 +2,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= -golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= -golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/middleware/main.go b/examples/middleware/main.go index 98262c7..429bc3c 100644 --- a/examples/middleware/main.go +++ b/examples/middleware/main.go @@ -1,4 +1,6 @@ -// file: examples/middleware/main.go +// Example middleware demonstrates built-in middleware (Validator, Retry, Recovery, +// RateLimiter) and custom middleware implementation (structured logging). +// Shows middleware stacking and composition for robust error handling. package main import ( @@ -82,7 +84,7 @@ func runPool(ctx context.Context, p *pool.WorkerGroup[Task], cfg config) error { // demonstrate rate limiting cfg.logger.Info("submitting rate-limited tasks") start := time.Now() - for i := 0; i < 10; i++ { + for i := range 10 { p.Submit(Task{ID: fmt.Sprintf("rate-%d", i), Priority: 3, Payload: "rate limited task"}) } @@ -103,7 +105,7 @@ func makePool(cfg config) *pool.WorkerGroup[Task] { return pool.New[Task](cfg.workers, makeWorker()).Use( middleware.Validator(makeValidator()), // validate tasks middleware.Retry[Task](cfg.retries, time.Second), // retry failed tasks - middleware.Recovery[Task](func(p interface{}) { // recover from panics + middleware.Recovery[Task](func(p any) { // recover from panics cfg.logger.Error("panic recovered", "error", fmt.Sprint(p)) }), middleware.RateLimiter[Task](5, 3), // rate limit: 5 tasks/second with burst of 3 diff --git a/examples/parallel_files/go.sum b/examples/parallel_files/go.sum index 8f48f08..becd57e 100644 --- a/examples/parallel_files/go.sum +++ b/examples/parallel_files/go.sum @@ -2,12 +2,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/parallel_files/main.go b/examples/parallel_files/main.go index c390b89..52832d1 100644 --- a/examples/parallel_files/main.go +++ b/examples/parallel_files/main.go @@ -1,3 +1,6 @@ +// Example parallel_files demonstrates chunk-based file processing with stateful workers. +// Each worker maintains local word counts with custom metrics tracking (long vs short words). +// Worker completion callbacks merge results from per-worker state. package main import ( @@ -72,7 +75,8 @@ func main() { // start pool processing if err := p.Go(ctx); err != nil { - log.Fatal(err) + log.Printf("failed to start pool: %v", err) + return } // process files @@ -80,8 +84,9 @@ func main() { if err != nil || info.IsDir() { return err } - if matched, err := filepath.Match(*pattern, filepath.Base(path)); err != nil || !matched { - return err + matched, matchErr := filepath.Match(*pattern, filepath.Base(path)) + if matchErr != nil || !matched { + return matchErr } file, err := os.Open(path) diff --git a/examples/pool_completion/README.md b/examples/pool_completion/README.md new file mode 100644 index 0000000..a382873 --- /dev/null +++ b/examples/pool_completion/README.md @@ -0,0 +1,95 @@ +# Pool Completion Example + +Demonstrates `WithPoolCompleteFn` callback that runs once when all workers finish. + +## What it demonstrates + +- Pool completion callback with `WithPoolCompleteFn` +- Difference between worker completion and pool completion +- Final aggregation/cleanup when all work is done + +## Worker vs Pool completion + +| Callback | When it runs | Use case | +|----------|-------------|----------| +| `WithWorkerCompleteFn` | Once per worker, when that worker finishes | Collect per-worker results | +| `WithPoolCompleteFn` | Once, when ALL workers have finished | Final aggregation, cleanup, trigger downstream | + +## Running + +```bash +go run main.go +``` + +## Output + +``` +worker 2 completed +worker 0 completed +worker 1 completed + +all workers completed, total items processed: 10 +performing final cleanup... + +final stats: processed=10, time=66ms +``` + +Notice: worker callbacks fire individually, pool callback fires once at the end. + +## Code walkthrough + +### Setup both callbacks + +```go +p := pool.New[int](3, worker). + WithWorkerCompleteFn(func(_ context.Context, workerID int, _ pool.Worker[int]) error { + // called 3 times (once per worker) + fmt.Printf("worker %d completed\n", workerID) + return nil + }). + WithPoolCompleteFn(func(_ context.Context) error { + // called once when ALL workers done + fmt.Println("all workers completed") + return nil + }) +``` + +### Typical use cases + +**Worker completion** - collect per-worker state: +```go +WithWorkerCompleteFn(func(ctx context.Context, id int, w pool.Worker[T]) error { + results := w.(*MyWorker).GetResults() + collector.Submit(results) + return nil +}) +``` + +**Pool completion** - final actions: +```go +WithPoolCompleteFn(func(ctx context.Context) error { + // close downstream collectors + collector.Close() + // trigger next stage + nextPool.Close(ctx) + // send notification + notifyComplete() + return nil +}) +``` + +## Pipeline coordination + +Pool completion is essential for chaining pools. See [direct_chain](../direct_chain): + +```go +pool1 := pool.New[A](...).WithPoolCompleteFn(func(ctx context.Context) error { + return pool2.Close(ctx) // close next pool when this one finishes +}) +``` + +## Related examples + +- [tokenizer_stateful](../tokenizer_stateful) - worker completion for result collection +- [direct_chain](../direct_chain) - pool completion for pipeline coordination +- [collectors_chain](../collectors_chain) - alternative coordination via collectors diff --git a/examples/pool_completion/go.mod b/examples/pool_completion/go.mod new file mode 100644 index 0000000..aa4a784 --- /dev/null +++ b/examples/pool_completion/go.mod @@ -0,0 +1,9 @@ +module examples/pool_completion + +go 1.24.0 + +require github.com/go-pkgz/pool v0.7.0 + +require golang.org/x/sync v0.19.0 // indirect + +replace github.com/go-pkgz/pool => ../.. diff --git a/examples/pool_completion/go.sum b/examples/pool_completion/go.sum new file mode 100644 index 0000000..becd57e --- /dev/null +++ b/examples/pool_completion/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/pool_completion/main.go b/examples/pool_completion/main.go new file mode 100644 index 0000000..4f73385 --- /dev/null +++ b/examples/pool_completion/main.go @@ -0,0 +1,60 @@ +// Example pool_completion demonstrates the pool completion callback (WithPoolCompleteFn). +// This callback executes once when ALL workers have finished, useful for final +// aggregation, cleanup, or triggering downstream processes. +package main + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/go-pkgz/pool" +) + +func main() { + // track total items processed across all workers + var totalProcessed atomic.Int64 + + // create a pool with worker completion and pool completion callbacks + p := pool.New[int](3, pool.WorkerFunc[int](func(_ context.Context, _ int) error { + // simulate some work + time.Sleep(10 * time.Millisecond) + totalProcessed.Add(1) + return nil + })). + WithContinueOnError(). + WithWorkerCompleteFn(func(_ context.Context, workerID int, _ pool.Worker[int]) error { + // called when each individual worker completes + fmt.Printf("worker %d completed\n", workerID) + return nil + }). + WithPoolCompleteFn(func(_ context.Context) error { + // called ONCE when ALL workers have finished + // useful for final aggregation, cleanup, or triggering downstream processes + fmt.Printf("\nall workers completed, total items processed: %d\n", totalProcessed.Load()) + fmt.Println("performing final cleanup...") + return nil + }) + + // start the pool + ctx := context.Background() + if err := p.Go(ctx); err != nil { + fmt.Printf("failed to start pool: %v\n", err) + return + } + + // submit some work + for i := 1; i <= 10; i++ { + p.Submit(i) + } + + // close and wait for completion + if err := p.Close(ctx); err != nil { + fmt.Printf("pool error: %v\n", err) + } + + // print final metrics + stats := p.Metrics().GetStats() + fmt.Printf("\nfinal stats: processed=%d, time=%v\n", stats.Processed, stats.TotalTime.Round(time.Millisecond)) +} diff --git a/examples/tokenizer_stateful/go.sum b/examples/tokenizer_stateful/go.sum index 1fd44af..becd57e 100644 --- a/examples/tokenizer_stateful/go.sum +++ b/examples/tokenizer_stateful/go.sum @@ -1,15 +1,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-pkgz/pool v0.3.0 h1:aN5/ZhBbMPGXj+naZ6De2KNqg0D2Svpc7U1cYEue9t8= -github.com/go-pkgz/pool v0.3.0/go.mod h1:e1qn5EYmXshPcOk2buL2ZC20w7RTAWUgbug+L2SyH7I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/tokenizer_stateful/main.go b/examples/tokenizer_stateful/main.go index 094e873..afe3357 100644 --- a/examples/tokenizer_stateful/main.go +++ b/examples/tokenizer_stateful/main.go @@ -1,3 +1,6 @@ +// Example tokenizer_stateful demonstrates stateful workers where each worker +// maintains independent state (word frequency counters). It shows worker completion +// callbacks for result collection and per-worker statistics tracking. package main import ( @@ -36,8 +39,7 @@ func (w *TokenizingWorker) Do(ctx context.Context, line string) error { } // split line into words and clean them up - words := strings.Fields(line) - for _, word := range words { + for word := range strings.FieldsSeq(line) { select { case <-ctx.Done(): return ctx.Err() @@ -100,7 +102,8 @@ func main() { // start the pool if err := p.Go(ctx); err != nil { - log.Fatal(err) + log.Printf("failed to start pool: %v", err) + return } // read file line by line and submit to pool diff --git a/examples/tokenizer_stateless/go.sum b/examples/tokenizer_stateless/go.sum index 1fd44af..becd57e 100644 --- a/examples/tokenizer_stateless/go.sum +++ b/examples/tokenizer_stateless/go.sum @@ -1,15 +1,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-pkgz/pool v0.3.0 h1:aN5/ZhBbMPGXj+naZ6De2KNqg0D2Svpc7U1cYEue9t8= -github.com/go-pkgz/pool v0.3.0/go.mod h1:e1qn5EYmXshPcOk2buL2ZC20w7RTAWUgbug+L2SyH7I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/tokenizer_stateless/main.go b/examples/tokenizer_stateless/main.go index 5fec5ca..b530e4f 100644 --- a/examples/tokenizer_stateless/main.go +++ b/examples/tokenizer_stateless/main.go @@ -1,3 +1,6 @@ +// Example tokenizer_stateless demonstrates stateless workers using WorkerFunc adapter. +// Workers share a collector for result gathering. This pattern works well when +// workers don't need to maintain state between processing calls. package main import ( @@ -44,8 +47,7 @@ func main() { } // split line into words and submit each word - words := strings.Fields(line) - for _, word := range words { + for word := range strings.FieldsSeq(line) { // check context between words select { case <-ctx.Done(): @@ -71,7 +73,8 @@ func main() { // start the pool if err := p.Go(ctx); err != nil { - log.Fatal(err) + log.Printf("failed to start pool: %v", err) + return } // read file line by line and submit to pool diff --git a/metrics/metrics.go b/metrics/metrics.go index 59a267b..03c97f2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -184,7 +184,9 @@ func (m *Value) IncDropped(wid int) { m.workerStats[wid].Dropped++ } -// GetStats returns combined stats from all workers +// GetStats returns combined stats from all workers. +// For accurate results, call this method after pool completion (Close/Wait returned). +// Calling during active processing may return inconsistent values due to concurrent writes. func (m *Value) GetStats() Stats { var result Stats diff --git a/middleware/middleware_test.go b/middleware/middleware_test.go index 134c5f4..d1ea492 100644 --- a/middleware/middleware_test.go +++ b/middleware/middleware_test.go @@ -201,6 +201,85 @@ func TestValidate(t *testing.T) { }) } +func TestRetry_InvalidValues(t *testing.T) { + t.Run("zero attempts uses default", func(t *testing.T) { + var attempts atomic.Int32 + worker := pool.WorkerFunc[string](func(_ context.Context, v string) error { + attempts.Add(1) + return nil + }) + + p := pool.New[string](1, worker).Use(Retry[string](0, 0)) + require.NoError(t, p.Go(context.Background())) + + p.Submit("test") + require.NoError(t, p.Close(context.Background())) + assert.Equal(t, int32(1), attempts.Load(), "should process with default attempts") + }) + + t.Run("negative values use defaults", func(t *testing.T) { + var attempts atomic.Int32 + worker := pool.WorkerFunc[string](func(_ context.Context, v string) error { + if attempts.Add(1) <= 2 { + return errors.New("temporary error") + } + return nil + }) + + p := pool.New[string](1, worker).Use(Retry[string](-1, -1)) + require.NoError(t, p.Go(context.Background())) + + p.Submit("test") + require.NoError(t, p.Close(context.Background())) + assert.Equal(t, int32(3), attempts.Load(), "should retry with default max attempts") + }) +} + +func TestTimeout_InvalidValue(t *testing.T) { + t.Run("zero timeout uses default", func(t *testing.T) { + worker := pool.WorkerFunc[string](func(_ context.Context, v string) error { + return nil + }) + + p := pool.New[string](1, worker).Use(Timeout[string](0)) + require.NoError(t, p.Go(context.Background())) + + p.Submit("test") + require.NoError(t, p.Close(context.Background())) + }) + + t.Run("negative timeout uses default", func(t *testing.T) { + worker := pool.WorkerFunc[string](func(_ context.Context, v string) error { + return nil + }) + + p := pool.New[string](1, worker).Use(Timeout[string](-1)) + require.NoError(t, p.Go(context.Background())) + + p.Submit("test") + require.NoError(t, p.Close(context.Background())) + }) +} + +func TestRecovery_PanicWithInt(t *testing.T) { + var recovered any + worker := pool.WorkerFunc[string](func(_ context.Context, v string) error { + panic(42) + }) + + p := pool.New[string](1, worker).Use(Recovery[string](func(p any) { + recovered = p + })) + require.NoError(t, p.Go(context.Background())) + + p.Submit("test") + err := p.Close(context.Background()) + require.Error(t, err) + assert.Contains(t, err.Error(), "panic recovered") + assert.Contains(t, err.Error(), "42") + assert.Equal(t, 42, recovered) +} + func TestRateLimiter(t *testing.T) { t.Run("allows tasks within rate limit", func(t *testing.T) { var processed atomic.Int32 diff --git a/pool.go b/pool.go index 02bfd6c..a60db01 100644 --- a/pool.go +++ b/pool.go @@ -40,7 +40,7 @@ type WorkerGroup[T any] struct { sharedBatchCh chan []T // shared batch channel eg *errgroup.Group - activated bool + activated atomic.Bool ctx context.Context sendMu sync.Mutex @@ -180,9 +180,12 @@ func (p *WorkerGroup[T]) WithContinueOnError() *WorkerGroup[T] { // WithBatchSize enables item batching with specified size. // Items are accumulated until batch is full before processing. -// Set to 0 to disable batching. +// Set to 0 to disable batching. Negative values are treated as 0. // Default: 10 func (p *WorkerGroup[T]) WithBatchSize(size int) *WorkerGroup[T] { + if size < 0 { + size = 0 + } p.batchSize = size if size > 0 { // initialize accumulators with capacity @@ -268,10 +271,9 @@ func (p *WorkerGroup[T]) Send(v T) { // Go activates the pool and starts worker goroutines. // Must be called before submitting items. func (p *WorkerGroup[T]) Go(ctx context.Context) error { - if p.activated { - return fmt.Errorf("workers poll already activated") + if !p.activated.CompareAndSwap(false, true) { + return fmt.Errorf("workers pool already activated") } - defer func() { p.activated = true }() var egCtx context.Context p.eg, egCtx = errgroup.WithContext(ctx) @@ -327,12 +329,12 @@ func (p *WorkerGroup[T]) workerProc(wCtx context.Context, r workerRequest[T]) fu processItem := func(v T) error { waitTime := time.Since(lastActivity) r.m.AddWaitTime(r.id, waitTime) - lastActivity = time.Now() procEndTmr := r.m.StartTimer(r.id, metrics.TimerProc) - defer procEndTmr() if err := worker.Do(wCtx, v); err != nil { + procEndTmr() + lastActivity = time.Now() // update after processing completes r.m.IncErrors(r.id) totalErrs++ if !p.continueOnError { @@ -341,6 +343,8 @@ func (p *WorkerGroup[T]) workerProc(wCtx context.Context, r workerRequest[T]) fu lastErr = fmt.Errorf("worker %d failed: %w", r.id, err) return nil // continue on error } + procEndTmr() + lastActivity = time.Now() // update after processing completes r.m.IncProcessed(r.id) return nil } @@ -349,16 +353,16 @@ func (p *WorkerGroup[T]) workerProc(wCtx context.Context, r workerRequest[T]) fu processBatch := func(items []T) error { waitTime := time.Since(lastActivity) r.m.AddWaitTime(r.id, waitTime) - lastActivity = time.Now() procEndTmr := r.m.StartTimer(r.id, metrics.TimerProc) - defer procEndTmr() for _, v := range items { if err := worker.Do(wCtx, v); err != nil { r.m.IncErrors(r.id) totalErrs++ if !p.continueOnError { + procEndTmr() + lastActivity = time.Now() // update after processing completes return fmt.Errorf("worker %d failed: %w", r.id, err) } lastErr = fmt.Errorf("worker %d failed: %w", r.id, err) @@ -366,6 +370,8 @@ func (p *WorkerGroup[T]) workerProc(wCtx context.Context, r workerRequest[T]) fu } r.m.IncProcessed(r.id) } + procEndTmr() + lastActivity = time.Now() // update after processing completes return nil } @@ -437,49 +443,93 @@ func (p *WorkerGroup[T]) finishWorker(ctx context.Context, id int, worker Worker } // Close pool. Has to be called by consumer as the indication of "all records submitted". -// The call is blocking till all processing completed by workers. After this call poll can't be reused. -// Returns an error if any happened during the run +// The call is blocking till all processing completed by workers or context is cancelled. +// After this call pool can't be reused. Returns an error if any happened during the run. +// Note: Close always closes channels to ensure workers can exit, even if context times out. +// Workers must respect either the context or channel closure to exit cleanly. func (p *WorkerGroup[T]) Close(ctx context.Context) error { - // if context canceled, return immediately - switch { - case ctx.Err() != nil: - return ctx.Err() - default: - } - - // flush any remaining items in accumulators - if p.batchSize > 0 { - for i, acc := range p.accumulators { - if len(acc) > 0 { - // ensure we flush any non-empty accumulator, regardless of size - if p.chunkFn == nil { - p.sharedBatchCh <- acc - } else { - p.workerBatchCh[i] <- acc - } - p.accumulators[i] = nil // help GC - } - } - } + ctxErr := p.flushAccumulators(ctx) + // always close channels to allow workers to exit close(p.sharedCh) close(p.sharedBatchCh) for i := range p.poolSize { close(p.workersCh[i]) close(p.workerBatchCh[i]) } - return p.eg.Wait() + + // if context timed out during flush, return that error + if ctxErr != nil { + return ctxErr + } + + // wait for workers with context respect + done := make(chan error, 1) + go func() { + done <- p.eg.Wait() + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// flushAccumulators flushes remaining batch items. Returns context error if flush was interrupted. +func (p *WorkerGroup[T]) flushAccumulators(ctx context.Context) error { + // if context already canceled, skip flush + if ctx.Err() != nil { + return ctx.Err() + } + + if p.batchSize == 0 { + return nil + } + + for i, acc := range p.accumulators { + if len(acc) == 0 { + continue + } + + ch := p.sharedBatchCh + if p.chunkFn != nil { + ch = p.workerBatchCh[i] + } + + select { + case ch <- acc: + case <-ctx.Done(): + return ctx.Err() + case <-p.ctx.Done(): + // pool context cancelled, workers exiting + } + p.accumulators[i] = nil // help GC + } + return nil } // Wait till workers completed and the result channel closed. +// Respects context cancellation and timeouts. func (p *WorkerGroup[T]) Wait(ctx context.Context) error { - // if context canceled, return immediately - switch { - case ctx.Err() != nil: + // if context already cancelled, return immediately + if ctx.Err() != nil { + return ctx.Err() + } + + // wait for workers with context respect + done := make(chan error, 1) + go func() { + done <- p.eg.Wait() + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): return ctx.Err() - default: } - return p.eg.Wait() } // Metrics returns combined metrics from all workers diff --git a/pool_test.go b/pool_test.go index d522bf0..3c7c6f4 100644 --- a/pool_test.go +++ b/pool_test.go @@ -141,10 +141,13 @@ func TestPool_StatefulWorker(t *testing.T) { count int } + var totalProcessed atomic.Int32 + workerMaker := func() Worker[string] { w := &statefulWorker{} return WorkerFunc[string](func(_ context.Context, _ string) error { w.count++ + totalProcessed.Add(1) time.Sleep(time.Millisecond) // even with sleep it's safe return nil }) @@ -158,6 +161,7 @@ func TestPool_StatefulWorker(t *testing.T) { p.Submit("test") } assert.NoError(t, p.Close(context.Background())) + assert.Equal(t, int32(100), totalProcessed.Load(), "all items should be processed by stateful workers") } func TestPool_WithWorkerChanSize(t *testing.T) { @@ -1602,3 +1606,145 @@ func TestPool_BatchedSend(t *testing.T) { assert.Equal(t, int32(items), stage1.Load(), "stage1 count should match input") assert.Equal(t, int32(items), stage2.Load(), "stage2 count should match input") } + +func TestPool_DoubleGoActivation(t *testing.T) { + worker := WorkerFunc[int](func(context.Context, int) error { return nil }) + p := New[int](2, worker) + + require.NoError(t, p.Go(context.Background())) + + // second Go() call should return error + err := p.Go(context.Background()) + require.Error(t, err) + assert.Contains(t, err.Error(), "already activated") + + require.NoError(t, p.Close(context.Background())) +} + +func TestPool_NegativeBatchSize(t *testing.T) { + var processed atomic.Int32 + worker := WorkerFunc[int](func(context.Context, int) error { + processed.Add(1) + return nil + }) + + // negative batch size should be treated as 0 (direct mode, no batching) + p := New[int](2, worker).WithBatchSize(-1) + require.NoError(t, p.Go(context.Background())) + + p.Submit(1) + p.Submit(2) + p.Submit(3) + + require.NoError(t, p.Close(context.Background())) + assert.Equal(t, int32(3), processed.Load(), "all items should be processed") +} + +func TestPool_CloseRespectsContextTimeout(t *testing.T) { + started := make(chan struct{}) + worker := WorkerFunc[int](func(ctx context.Context, v int) error { + close(started) + <-ctx.Done() // block until pool context is cancelled + return ctx.Err() + }) + + // t.Context() is cancelled when test ends, ensuring worker cleanup + p := New[int](1, worker).WithBatchSize(0) + require.NoError(t, p.Go(t.Context())) + p.Submit(1) + + <-started // wait for worker to start processing + + // close with short timeout - should return timeout error, not hang + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := p.Close(ctx) + require.Error(t, err) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + +func TestPool_WaitRespectsContextTimeout(t *testing.T) { + started := make(chan struct{}) + worker := WorkerFunc[int](func(ctx context.Context, v int) error { + close(started) + <-ctx.Done() // block until pool context is cancelled + return ctx.Err() + }) + + // t.Context() is cancelled when test ends, ensuring worker cleanup + p := New[int](1, worker).WithBatchSize(0) + require.NoError(t, p.Go(t.Context())) + p.Submit(1) + + <-started // wait for worker to start processing + + // wait with short timeout - should return timeout error, not hang + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := p.Wait(ctx) + require.Error(t, err) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + +func TestPool_CloseWithCancelledContext(t *testing.T) { + worker := WorkerFunc[int](func(context.Context, int) error { return nil }) + p := New[int](1, worker) + require.NoError(t, p.Go(context.Background())) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // pre-cancel + + err := p.Close(ctx) + require.ErrorIs(t, err, context.Canceled) +} + +func TestPool_CloseFlushDoesNotDeadlock(t *testing.T) { + // worker that fails immediately, causing workers to exit + worker := WorkerFunc[int](func(ctx context.Context, v int) error { + return errors.New("fail immediately") + }) + + p := New[int](1, worker).WithBatchSize(10) + require.NoError(t, p.Go(context.Background())) + + // submit items but don't fill batch - these sit in accumulator + for i := range 5 { + p.Submit(i) + } + + // close with timeout - should not deadlock trying to flush to dead worker + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + err := p.Close(ctx) + // expect error (either from worker failure or timeout), but not hang + require.Error(t, err) +} + +func TestPool_WorkerCompleteFnErrorWithExistingError(t *testing.T) { + workerErr := errors.New("worker error") + completeErr := errors.New("complete callback error") + + worker := WorkerFunc[int](func(context.Context, int) error { + return workerErr + }) + + var completeCalled atomic.Bool + p := New[int](1, worker). + WithContinueOnError(). + WithWorkerCompleteFn(func(context.Context, int, Worker[int]) error { + completeCalled.Store(true) + return completeErr + }) + + require.NoError(t, p.Go(context.Background())) + p.Submit(1) + err := p.Close(context.Background()) + + // with continueOnError, we expect to get an error + require.Error(t, err) + // worker complete function should have been called + assert.True(t, completeCalled.Load(), "worker complete function should be called") +}