Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ This directory contains examples demonstrating various aspects of the [go-pkgz/p

## Available Examples

### [basic](./basic)
Minimal "hello world" example to get started quickly. Demonstrates:
- Simplest pool creation and usage
- Submitting work items
- Basic metrics

### [chunking](./chunking)
Shows how to use WithChunkFn for consistent work distribution by key. Demonstrates:
- Key-based routing (same key always goes to same worker)
- Per-key aggregation without synchronization
- Worker ID tracking via context

### [tokenizer_stateful](./tokenizer_stateful)
Shows how to use stateful workers where each worker maintains its own independent state (word frequency counters). Demonstrates:
- Worker state isolation
Expand Down Expand Up @@ -53,11 +65,23 @@ Shows how to handle and categorize errors in parallel processing. Demonstrates:
- Timing information tracking
- Statistical reporting on errors

### [pool_completion](./pool_completion)
Shows how to use the pool completion callback for final aggregation. Demonstrates:
- Pool completion callback (WithPoolCompleteFn)
- Final cleanup when all workers finish
- Difference between worker and pool completion callbacks

## Running Examples

Each example can be run from its directory:
```bash
cd tokenizer_stateful
cd basic
go run main.go

cd ../chunking
go run main.go

cd ../tokenizer_stateful
go run main.go -file input.txt

cd ../tokenizer_stateless
Expand All @@ -77,15 +101,19 @@ go run main.go

cd ../collector_errors
go run main.go -workers 8 -jobs 100 -error-rate 0.3

cd ../pool_completion
go run main.go
```

## Common Patterns

While the examples are simplified, they showcase important pool package features:
- Basic pool usage (basic)
- Consistent work distribution by key (chunking)
- Worker state management (stateful vs stateless)
- Result collection strategies
- Error handling approaches
- Metrics and monitoring
- Work distribution patterns
- Middleware integration
- Multi-stage processing pipelines
66 changes: 66 additions & 0 deletions examples/basic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Basic Example

Minimal "hello world" example demonstrating the simplest pool usage.

## What it demonstrates

- Creating a pool with `pool.New[T]`
- Using `pool.WorkerFunc` adapter for simple functions
- Submitting work items with `Submit()`
- Closing pool and waiting with `Close()`
- Basic metrics via `Metrics().GetStats()`

## Running

```bash
go run main.go
```

## Output

```
processing: date
processing: apple
processing: banana
processing: cherry
processing: elderberry

done: processed 5 items in 102ms
```

## Code walkthrough

### Create the pool

```go
p := pool.New[string](3, pool.WorkerFunc[string](func(_ context.Context, item string) error {
fmt.Printf("processing: %s\n", item)
time.Sleep(50 * time.Millisecond)
return nil
}))
```

- `pool.New[string](3, ...)` creates a pool with 3 workers processing strings
- `pool.WorkerFunc` adapts a function to the `Worker` interface

### Start, submit, close

```go
p.Go(ctx) // start workers
p.Submit("apple") // submit work (not thread-safe)
p.Close(ctx) // close input and wait for completion
```

### Get metrics

```go
stats := p.Metrics().GetStats()
fmt.Printf("processed %d items in %v\n", stats.Processed, stats.TotalTime)
```

## Next steps

After understanding this basic example, explore:
- [chunking](../chunking) - consistent work distribution by key
- [tokenizer_stateful](../tokenizer_stateful) - workers with state
- [middleware](../middleware) - adding retry, timeout, validation
9 changes: 9 additions & 0 deletions examples/basic/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module examples/basic

go 1.24.0

require github.com/go-pkgz/pool v0.7.0

require golang.org/x/sync v0.19.0 // indirect

replace github.com/go-pkgz/pool => ../..
10 changes: 10 additions & 0 deletions examples/basic/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
42 changes: 42 additions & 0 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Example basic demonstrates the simplest possible pool usage.
// This is a minimal "hello world" example to get started quickly.
package main

import (
"context"
"fmt"
"time"

"github.com/go-pkgz/pool"
)

func main() {
// create a pool with 3 workers using a simple function
p := pool.New[string](3, pool.WorkerFunc[string](func(_ context.Context, item string) error {
fmt.Printf("processing: %s\n", item)
time.Sleep(50 * time.Millisecond) // simulate work
return nil
}))

// start the pool
ctx := context.Background()
if err := p.Go(ctx); err != nil {
fmt.Printf("failed to start: %v\n", err)
return
}

// submit work items
items := []string{"apple", "banana", "cherry", "date", "elderberry"}
for _, item := range items {
p.Submit(item)
}

// close and wait for completion
if err := p.Close(ctx); err != nil {
fmt.Printf("error: %v\n", err)
}

// print stats
stats := p.Metrics().GetStats()
fmt.Printf("\ndone: processed %d items in %v\n", stats.Processed, stats.TotalTime.Round(time.Millisecond))
}
88 changes: 88 additions & 0 deletions examples/chunking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Chunking Example

Demonstrates `WithChunkFn` for consistent work distribution by key.

## What it demonstrates

- Key-based routing with `WithChunkFn`
- Same key always goes to the same worker (consistent hashing)
- Per-key aggregation without synchronization
- Getting worker ID from context with `metrics.WorkerID(ctx)`

## Use cases

- Aggregating events by user/session ID
- Processing messages by partition key
- Any scenario where items with the same key must be handled by the same worker

## Running

```bash
go run main.go
```

## Output

```
worker 1: user=charlie action=login
worker 2: user=alice action=login
worker 1: user=charlie action=view_page
worker 2: user=bob action=login
worker 2: user=alice action=view_page
...

worker assignment (each user always goes to same worker):
worker 1: charlie(4)
worker 2: bob(3) alice(4)

processed 11 events in 76ms
```

Notice: charlie always goes to worker 1, alice and bob always go to worker 2.

## Code walkthrough

### Define chunk function

```go
p := pool.New[Event](3, worker).WithChunkFn(func(e Event) string {
return e.UserID // route by user ID
})
```

The chunk function extracts a key from each item. Items with the same key are routed to the same worker using consistent hashing (FNV-1a).

### Get worker ID in worker

```go
import "github.com/go-pkgz/pool/metrics"

func worker(ctx context.Context, e Event) error {
workerID := metrics.WorkerID(ctx)
// workerID is consistent for this user
}
```

### Benefits

Without chunking:
- Workers compete for items randomly
- Aggregating by key requires synchronization (mutex/channel)

With chunking:
- Same key → same worker, always
- Per-key state can be local to worker (no locks needed)
- Useful for building per-user counters, session state, etc.

## How it works

1. `WithChunkFn` creates per-worker channels (instead of shared channel)
2. On `Submit()`, the chunk function extracts the key
3. Key is hashed with FNV-1a: `hash(key) % numWorkers`
4. Item is sent to that worker's dedicated channel

## Related examples

- [basic](../basic) - simplest pool usage
- [tokenizer_stateful](../tokenizer_stateful) - stateful workers
- [direct_chain](../direct_chain) - multi-stage pipelines
9 changes: 9 additions & 0 deletions examples/chunking/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module examples/chunking

go 1.24.0

require github.com/go-pkgz/pool v0.7.0

require golang.org/x/sync v0.19.0 // indirect

replace github.com/go-pkgz/pool => ../..
10 changes: 10 additions & 0 deletions examples/chunking/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
90 changes: 90 additions & 0 deletions examples/chunking/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Example chunking demonstrates WithChunkFn for consistent work distribution.
// Items with the same key always go to the same worker, enabling per-key
// aggregation without synchronization. Useful for grouping events by user,
// session, or any other key.
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-pkgz/pool"
"github.com/go-pkgz/pool/metrics"
)

// Event represents something happening for a user
type Event struct {
UserID string
Action string
}

func main() {
// track which worker processes which users (for demonstration)
var mu sync.Mutex
workerUsers := make(map[int]map[string]int) // workerID -> userID -> count

// create pool with chunking by UserID
// this ensures all events for a user go to the same worker
p := pool.New[Event](3, pool.WorkerFunc[Event](func(ctx context.Context, e Event) error {
// get worker ID from context
workerID := metrics.WorkerID(ctx)

mu.Lock()
if workerUsers[workerID] == nil {
workerUsers[workerID] = make(map[string]int)
}
workerUsers[workerID][e.UserID]++
mu.Unlock()

fmt.Printf("worker %d: user=%s action=%s\n", workerID, e.UserID, e.Action)
time.Sleep(10 * time.Millisecond)
return nil
})).WithChunkFn(func(e Event) string {
return e.UserID // route by user ID
})

ctx := context.Background()
if err := p.Go(ctx); err != nil {
fmt.Printf("failed to start: %v\n", err)
return
}

// submit events for different users
// notice: same user's events will always go to the same worker
events := []Event{
{"alice", "login"},
{"bob", "login"},
{"charlie", "login"},
{"alice", "view_page"},
{"bob", "view_page"},
{"alice", "click_button"},
{"charlie", "view_page"},
{"bob", "logout"},
{"alice", "logout"},
{"charlie", "click_button"},
{"charlie", "logout"},
}

for _, e := range events {
p.Submit(e)
}

if err := p.Close(ctx); err != nil {
fmt.Printf("error: %v\n", err)
}

// show which worker handled which users
fmt.Printf("\nworker assignment (each user always goes to same worker):\n")
for workerID, users := range workerUsers {
fmt.Printf(" worker %d: ", workerID)
for user, count := range users {
fmt.Printf("%s(%d) ", user, count)
}
fmt.Println()
}

stats := p.Metrics().GetStats()
fmt.Printf("\nprocessed %d events in %v\n", stats.Processed, stats.TotalTime.Round(time.Millisecond))
}
Loading