Introduction
Worker pools are a powerful pattern for controlling concurrency and managing resource consumption in Go applications. They allow you to process tasks concurrently while limiting the number of goroutines running simultaneously.
Basic Worker Pool Implementation
Simple Worker Pool
type WorkerPool struct {
tasks chan func() workers int wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
pool := &WorkerPool{
tasks: make(chan func()), workers: workers,
}
pool.Start()
return pool
}
func (p *WorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for task := range p.tasks {
task()
}
}(i)
}
}
func (p *WorkerPool) Submit(task func()) {
p.tasks <- task
}
func (p *WorkerPool) Stop() {
close(p.tasks)
p.wg.Wait()
}
Usage Example
func main() {
pool := NewWorkerPool(3)
// Submit tasks for i := 0; i < 10; i++ {
taskID := i
pool.Submit(func() {
fmt.Printf("Processing task %d\n", taskID)
time.Sleep(time.Second)
})
}
pool.Stop()
}
Advanced Worker Pool Patterns
1. Worker Pool with Results
type Job struct {
ID int Data interface{}
}
type Result struct {
JobID int Output interface{}
Error error
}
type ResultWorkerPool struct {
workers int jobs chan Job
results chan Result
done chan struct{}
wg sync.WaitGroup
}
func NewResultWorkerPool(workers int) *ResultWorkerPool {
return &ResultWorkerPool{
workers: workers,
jobs: make(chan Job),
results: make(chan Result),
done: make(chan struct{}),
}
}
func (p *ResultWorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for job := range p.jobs {
result := Result{JobID: job.ID}
// Process the job output, err := processJob(job)
result.Output = output
result.Error = err
// Send result p.results <- result
}
}(i)
}
}
func (p *ResultWorkerPool) SubmitJob(job Job) {
p.jobs <- job
}
func (p *ResultWorkerPool) Results() <-chan Result {
return p.results
}
func (p *ResultWorkerPool) Stop() {
close(p.jobs)
p.wg.Wait()
close(p.results)
}
2. Rate-Limited Worker Pool
type RateLimitedPool struct {
workers int rateLimit time.Duration
tasks chan func() rateLimiter *time.Ticker
wg sync.WaitGroup
}
func NewRateLimitedPool(workers int, rateLimit time.Duration) *RateLimitedPool {
return &RateLimitedPool{
workers: workers,
rateLimit: rateLimit,
tasks: make(chan func()), rateLimiter: time.NewTicker(rateLimit),
}
}
func (p *RateLimitedPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.tasks {
<-p.rateLimiter.C // Wait for rate limiter task()
}
}()
}
}
3. Priority Worker Pool
type Priority int
const (
LowPriority Priority = iota MediumPriority
HighPriority
)
type PriorityTask struct {
priority Priority
task func() }
type PriorityWorkerPool struct {
workers int tasks map[Priority]chan func() wg sync.WaitGroup
}
func NewPriorityWorkerPool(workers int) *PriorityWorkerPool {
pool := &PriorityWorkerPool{
workers: workers,
tasks: map[Priority]chan func(){
LowPriority: make(chan func(), 100), MediumPriority: make(chan func(), 100), HighPriority: make(chan func(), 100), },
}
pool.Start()
return pool
}
func (p *PriorityWorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
// Check priorities in order select {
case task := <-p.tasks[HighPriority]:
task()
continue default:
}
select {
case task := <-p.tasks[MediumPriority]:
task()
continue default:
}
select {
case task := <-p.tasks[LowPriority]:
task()
default:
time.Sleep(time.Millisecond)
}
}
}()
}
}
Error Handling and Recovery
1. Worker Pool with Error Recovery
type RecoverableWorkerPool struct {
tasks chan func() error errors chan error
workers int recovery func(interface{}) }
func (p *RecoverableWorkerPool) worker() {
for task := range p.tasks {
func() {
defer func() {
if r := recover(); r != nil {
if p.recovery != nil {
p.recovery(r)
}
p.errors <- fmt.Errorf("panic recovered: %v", r)
}
}()
if err := task(); err != nil {
p.errors <- err
}
}()
}
}
2. Graceful Shutdown
type GracefulPool struct {
tasks chan func() shutdown chan struct{}
completed chan struct{}
maxTimeout time.Duration
}
func (p *GracefulPool) Shutdown() error {
close(p.shutdown)
select {
case <-p.completed:
return nil case <-time.After(p.maxTimeout):
return errors.New("shutdown timeout exceeded")
}
}
func (p *GracefulPool) worker() {
for {
select {
case <-p.shutdown:
return case task := <-p.tasks:
task()
}
}
}
Monitoring and Metrics
1. Worker Pool with Metrics
type PoolMetrics struct {
ActiveWorkers int64 CompletedTasks int64 FailedTasks int64 QueuedTasks int64 ProcessingTime time.Duration
}
type MetricWorkerPool struct {
metrics *PoolMetrics
tasks chan func() workers int mu sync.RWMutex
}
func (p *MetricWorkerPool) worker() {
for task := range p.tasks {
atomic.AddInt64(&p.metrics.ActiveWorkers, 1)
start := time.Now()
func() {
defer func() {
atomic.AddInt64(&p.metrics.ActiveWorkers, -1)
if r := recover(); r != nil {
atomic.AddInt64(&p.metrics.FailedTasks, 1)
return }
atomic.AddInt64(&p.metrics.CompletedTasks, 1)
atomic.AddInt64(&p.metrics.QueuedTasks, -1)
p.mu.Lock()
p.metrics.ProcessingTime += time.Since(start)
p.mu.Unlock()
}()
task()
}()
}
}
Best Practices and Patterns
1. Task Batching
type BatchWorkerPool struct {
batchSize int tasks chan interface{}
results chan []interface{}
}
func (p *BatchWorkerPool) processBatch(tasks []interface{}) {
if len(tasks) == 0 {
return }
results := make([]interface{}, len(tasks))
for i, task := range tasks {
// Process task and store result results[i] = processTask(task)
}
p.results <- results
}
func (p *BatchWorkerPool) worker() {
batch := make([]interface{}, 0, p.batchSize)
for task := range p.tasks {
batch = append(batch, task)
if len(batch) >= p.batchSize {
p.processBatch(batch)
batch = batch[:0]
}
}
// Process remaining tasks if len(batch) > 0 {
p.processBatch(batch)
}
}
2. Dynamic Pool Sizing
type DynamicPool struct {
minWorkers int maxWorkers int currentWorkers int tasks chan func() metrics *PoolMetrics
}
func (p *DynamicPool) adjustWorkerCount() {
for {
time.Sleep(time.Second)
queueSize := atomic.LoadInt64(&p.metrics.QueuedTasks)
activeWorkers := atomic.LoadInt64(&p.metrics.ActiveWorkers)
switch {
case queueSize > int64(p.currentWorkers) && p.currentWorkers < p.maxWorkers:
// Add workers p.addWorkers(1)
case queueSize == 0 && p.currentWorkers > p.minWorkers:
// Remove workers p.removeWorkers(1)
}
}
}
Testing Strategies
1. Load Testing
func TestWorkerPoolUnderLoad(t *testing.T) {
pool := NewWorkerPool(5)
taskCount := 1000
start := time.Now()
var completed atomic.Int64
for i := 0; i < taskCount; i++ {
pool.Submit(func() {
time.Sleep(time.Millisecond)
completed.Add(1)
})
}
pool.Stop()
duration := time.Since(start)
if completed.Load() != int64(taskCount) {
t.Errorf("Expected %d completed tasks, got %d", taskCount, completed.Load())
}
t.Logf("Processed %d tasks in %v", taskCount, duration)
}
2. Stress Testing
func TestWorkerPoolStress(t *testing.T) {
pool := NewWorkerPool(10)
var wg sync.WaitGroup
// Multiple goroutines submitting tasks for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
pool.Submit(func() {
time.Sleep(time.Millisecond)
})
}
}()
}
wg.Wait()
pool.Stop()
}
Conclusion
Worker pools are essential for managing concurrent workloads efficiently in Go applications. Key takeaways:
- Choose the Right Pattern
- Simple worker pools for basic concurrent tasks
- Result-based pools when output collection is needed
- Priority pools for task scheduling
- Rate-limited pools for controlled processing
- Consider Implementation Details
- Error handling and recovery
- Graceful shutdown
- Monitoring and metrics
- Dynamic sizing
- Follow Best Practices
- Proper resource management
- Error handling
- Monitoring and metrics
- Testing under load
- Testing
- Load testing
- Stress testing
- Edge cases
- Resource leaks