Day 27: Worker Pools: Managing Goroutines Effectively

Venkat Annangi
Venkat Annangi
16/07/2025 04:31 6 min read 8 views
#108 days of golang #worker pools # managing goroutines
Day 27: Worker Pools: Managing Goroutines Effectively

Introduction

Worker pools are a powerful pattern for controlling concurrency and managing resource consumption in Go applications. They allow you to process tasks concurrently while limiting the number of goroutines running simultaneously.

Basic Worker Pool Implementation

Simple Worker Pool

 
 
type WorkerPool struct {
    tasks   chan func()     workers int     wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    pool := &WorkerPool{
        tasks:   make(chan func()),         workers: workers,
    }
    pool.Start()
    return pool
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for task := range p.tasks {
                task()
            }
        }(i)
    }
}

func (p *WorkerPool) Submit(task func()) {
    p.tasks <- task
}

func (p *WorkerPool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

Usage Example

 
 
func main() {
    pool := NewWorkerPool(3)
    
    // Submit tasks     for i := 0; i < 10; i++ {
        taskID := i
        pool.Submit(func() {
            fmt.Printf("Processing task %d\n", taskID)
            time.Sleep(time.Second)
        })
    }
    
    pool.Stop()
}

Advanced Worker Pool Patterns

1. Worker Pool with Results

 
 
type Job struct {
    ID   int     Data interface{}
}

type Result struct {
    JobID  int     Output interface{}
    Error  error
}

type ResultWorkerPool struct {
    workers    int     jobs       chan Job
    results    chan Result
    done       chan struct{}
    wg         sync.WaitGroup
}

func NewResultWorkerPool(workers int) *ResultWorkerPool {
    return &ResultWorkerPool{
        workers: workers,
        jobs:    make(chan Job),
        results: make(chan Result),
        done:    make(chan struct{}),
    }
}

func (p *ResultWorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for job := range p.jobs {
                result := Result{JobID: job.ID}
                
                // Process the job                 output, err := processJob(job)
                result.Output = output
                result.Error = err
                
                // Send result                 p.results <- result
            }
        }(i)
    }
}

func (p *ResultWorkerPool) SubmitJob(job Job) {
    p.jobs <- job
}

func (p *ResultWorkerPool) Results() <-chan Result {
    return p.results
}

func (p *ResultWorkerPool) Stop() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

2. Rate-Limited Worker Pool

 
 
type RateLimitedPool struct {
    workers     int     rateLimit   time.Duration
    tasks       chan func()     rateLimiter *time.Ticker
    wg          sync.WaitGroup
}

func NewRateLimitedPool(workers int, rateLimit time.Duration) *RateLimitedPool {
    return &RateLimitedPool{
        workers:     workers,
        rateLimit:   rateLimit,
        tasks:       make(chan func()),         rateLimiter: time.NewTicker(rateLimit),
    }
}

func (p *RateLimitedPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for task := range p.tasks {
                <-p.rateLimiter.C // Wait for rate limiter                 task()
            }
        }()
    }
}

3. Priority Worker Pool

 
 
type Priority int 
const (
    LowPriority Priority = iota     MediumPriority
    HighPriority
)

type PriorityTask struct {
    priority Priority
    task     func() }

type PriorityWorkerPool struct {
    workers   int     tasks     map[Priority]chan func()     wg        sync.WaitGroup
}

func NewPriorityWorkerPool(workers int) *PriorityWorkerPool {
    pool := &PriorityWorkerPool{
        workers: workers,
        tasks: map[Priority]chan func(){
            LowPriority:    make(chan func(), 100),             MediumPriority: make(chan func(), 100),             HighPriority:   make(chan func(), 100),         },
    }
    pool.Start()
    return pool
}

func (p *PriorityWorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for {
                // Check priorities in order                 select {
                case task := <-p.tasks[HighPriority]:
                    task()
                    continue                 default:
                }

                select {
                case task := <-p.tasks[MediumPriority]:
                    task()
                    continue                 default:
                }

                select {
                case task := <-p.tasks[LowPriority]:
                    task()
                default:
                    time.Sleep(time.Millisecond)
                }
            }
        }()
    }
}

Error Handling and Recovery

1. Worker Pool with Error Recovery

 
 
type RecoverableWorkerPool struct {
    tasks    chan func() error     errors   chan error
    workers  int     recovery func(interface{}) }

func (p *RecoverableWorkerPool) worker() {
    for task := range p.tasks {
        func() {
            defer func() {
                if r := recover(); r != nil {
                    if p.recovery != nil {
                        p.recovery(r)
                    }
                    p.errors <- fmt.Errorf("panic recovered: %v", r)
                }
            }()
            
            if err := task(); err != nil {
                p.errors <- err
            }
        }()
    }
}

2. Graceful Shutdown

 
 
type GracefulPool struct {
    tasks      chan func()     shutdown   chan struct{}
    completed  chan struct{}
    maxTimeout time.Duration
}

func (p *GracefulPool) Shutdown() error {
    close(p.shutdown)
    
    select {
    case <-p.completed:
        return nil     case <-time.After(p.maxTimeout):
        return errors.New("shutdown timeout exceeded")
    }
}

func (p *GracefulPool) worker() {
    for {
        select {
        case <-p.shutdown:
            return         case task := <-p.tasks:
            task()
        }
    }
}

Monitoring and Metrics

1. Worker Pool with Metrics

 
 
type PoolMetrics struct {
    ActiveWorkers   int64     CompletedTasks  int64     FailedTasks     int64     QueuedTasks     int64     ProcessingTime  time.Duration
}

type MetricWorkerPool struct {
    metrics  *PoolMetrics
    tasks    chan func()     workers  int     mu       sync.RWMutex
}

func (p *MetricWorkerPool) worker() {
    for task := range p.tasks {
        atomic.AddInt64(&p.metrics.ActiveWorkers, 1)
        start := time.Now()
        
        func() {
            defer func() {
                atomic.AddInt64(&p.metrics.ActiveWorkers, -1)
                if r := recover(); r != nil {
                    atomic.AddInt64(&p.metrics.FailedTasks, 1)
                    return                 }
                atomic.AddInt64(&p.metrics.CompletedTasks, 1)
                atomic.AddInt64(&p.metrics.QueuedTasks, -1)
                
                p.mu.Lock()
                p.metrics.ProcessingTime += time.Since(start)
                p.mu.Unlock()
            }()
            task()
        }()
    }
}

Best Practices and Patterns

1. Task Batching

 
 
type BatchWorkerPool struct {
    batchSize int     tasks     chan interface{}
    results   chan []interface{}
}

func (p *BatchWorkerPool) processBatch(tasks []interface{}) {
    if len(tasks) == 0 {
        return     }
    
    results := make([]interface{}, len(tasks))
    for i, task := range tasks {
        // Process task and store result         results[i] = processTask(task)
    }
    
    p.results <- results
}

func (p *BatchWorkerPool) worker() {
    batch := make([]interface{}, 0, p.batchSize)
    
    for task := range p.tasks {
        batch = append(batch, task)
        
        if len(batch) >= p.batchSize {
            p.processBatch(batch)
            batch = batch[:0]
        }
    }
    
    // Process remaining tasks     if len(batch) > 0 {
        p.processBatch(batch)
    }
}

2. Dynamic Pool Sizing

 
 
type DynamicPool struct {
    minWorkers     int     maxWorkers     int     currentWorkers int     tasks          chan func()     metrics        *PoolMetrics
}

func (p *DynamicPool) adjustWorkerCount() {
    for {
        time.Sleep(time.Second)
        
        queueSize := atomic.LoadInt64(&p.metrics.QueuedTasks)
        activeWorkers := atomic.LoadInt64(&p.metrics.ActiveWorkers)
        
        switch {
        case queueSize > int64(p.currentWorkers) && p.currentWorkers < p.maxWorkers:
            // Add workers             p.addWorkers(1)
        case queueSize == 0 && p.currentWorkers > p.minWorkers:
            // Remove workers             p.removeWorkers(1)
        }
    }
}

Testing Strategies

1. Load Testing

 
 
func TestWorkerPoolUnderLoad(t *testing.T) {
    pool := NewWorkerPool(5)
    taskCount := 1000     
    start := time.Now()
    var completed atomic.Int64
    
    for i := 0; i < taskCount; i++ {
        pool.Submit(func() {
            time.Sleep(time.Millisecond)
            completed.Add(1)
        })
    }
    
    pool.Stop()
    duration := time.Since(start)
    
    if completed.Load() != int64(taskCount) {
        t.Errorf("Expected %d completed tasks, got %d", taskCount, completed.Load())
    }
    
    t.Logf("Processed %d tasks in %v", taskCount, duration)
}

2. Stress Testing

 
 
func TestWorkerPoolStress(t *testing.T) {
    pool := NewWorkerPool(10)
    var wg sync.WaitGroup
    
    // Multiple goroutines submitting tasks     for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                pool.Submit(func() {
                    time.Sleep(time.Millisecond)
                })
            }
        }()
    }
    
    wg.Wait()
    pool.Stop()
}

Conclusion

Worker pools are essential for managing concurrent workloads efficiently in Go applications. Key takeaways:

  1. Choose the Right Pattern
    • Simple worker pools for basic concurrent tasks
    • Result-based pools when output collection is needed
    • Priority pools for task scheduling
    • Rate-limited pools for controlled processing
  2. Consider Implementation Details
    • Error handling and recovery
    • Graceful shutdown
    • Monitoring and metrics
    • Dynamic sizing
  3. Follow Best Practices
    • Proper resource management
    • Error handling
    • Monitoring and metrics
    • Testing under load
  4. Testing
    • Load testing
    • Stress testing
    • Edge cases
    • Resource leaks

Comments