Day 26: Mutex and RWMutex: Locking and Synchronization

Venkat Annangi
Venkat Annangi
16/07/2025 04:15 7 min read 7 views
# RWMutex #108 days of golang #mutex #Concurrency Control
Day 26: Mutex and RWMutex: Locking and Synchronization

Introduction

In concurrent programming, protecting shared resources from simultaneous access is crucial. Go's sync package provides two powerful types of mutexes: sync.Mutex and sync.RWMutex. This comprehensive guide explores how to effectively use these synchronization primitives to prevent race conditions and ensure data consistency in concurrent programs.

Understanding Race Conditions

Let's start with a practical example of why we need mutexes:

type BankAccount struct {
    balance int }

// This code has a race condition func (a *BankAccount) UnsafeTransfer(amount int) {
    currentBalance := a.balance
    // Simulate some processing time     time.Sleep(time.Millisecond)
    a.balance = currentBalance + amount
}

func main() {
    account := &BankAccount{balance: 100}
    
    // Run 100 concurrent transfers     for i := 0; i < 100; i++ {
        go account.UnsafeTransfer(1)
    }
    // Final balance will be unpredictable }

Mutex: The Basic Lock

Simple Mutex Implementation

 
 
type SafeBankAccount struct {
    mu      sync.Mutex
    balance int }

func (a *SafeBankAccount) Transfer(amount int) {
    a.mu.Lock()
    defer a.mu.Unlock()
    
    a.balance += amount
}

func (a *SafeBankAccount) Balance() int {
    a.mu.Lock()
    defer a.mu.Unlock()
    return a.balance
}

Advanced Mutex Patterns

1. Structured Data Access

 
 
type UserSession struct {
    mu sync.Mutex
    data struct {
        lastAccess time.Time
        userID     string         settings   map[string]interface{}
    }
}

func (s *UserSession) UpdateLastAccess() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.data.lastAccess = time.Now()
}

func (s *UserSession) GetSessionInfo() (time.Time, string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.data.lastAccess, s.data.userID
}

2. Mutex with Timeout

 
 
type TimedMutex struct {
    mu      sync.Mutex
    timeout time.Duration
}

func (tm *TimedMutex) LockWithTimeout() error {
    c := make(chan struct{})
    go func() {
        tm.mu.Lock()
        close(c)
    }()

    select {
    case <-c:
        return nil     case <-time.After(tm.timeout):
        return fmt.Errorf("failed to acquire lock within %v", tm.timeout)
    }
}

func (tm *TimedMutex) Unlock() {
    tm.mu.Unlock()
}

RWMutex: Optimizing for Readers

Basic RWMutex Implementation

 
 
type ConfigStore struct {
    mu     sync.RWMutex
    config map[string]interface{}
}

func (cs *ConfigStore) Get(key string) (interface{}, bool) {
    cs.mu.RLock()
    defer cs.mu.RUnlock()
    val, exists := cs.config[key]
    return val, exists
}

func (cs *ConfigStore) Set(key string, value interface{}) {
    cs.mu.Lock()
    defer cs.mu.Unlock()
    cs.config[key] = value
}

func (cs *ConfigStore) GetMultiple(keys []string) map[string]interface{} {
    cs.mu.RLock()
    defer cs.mu.RUnlock()
    
    result := make(map[string]interface{})
    for _, key := range keys {
        if val, exists := cs.config[key]; exists {
            result[key] = val
        }
    }
    return result
}

Advanced RWMutex Patterns

1. Cached Resource Manager

 
 
type ResourceManager struct {
    mu       sync.RWMutex
    cache    map[string]*Resource
    loadFunc func(string) (*Resource, error) }

func (rm *ResourceManager) GetResource(id string) (*Resource, error) {
    // Try read lock first     rm.mu.RLock()
    if resource, exists := rm.cache[id]; exists {
        rm.mu.RUnlock()
        return resource, nil     }
    rm.mu.RUnlock()

    // Need to load resource - acquire write lock     rm.mu.Lock()
    defer rm.mu.Unlock()

    // Double-check after write lock (another goroutine might have loaded it)     if resource, exists := rm.cache[id]; exists {
        return resource, nil     }

    // Load and cache the resource     resource, err := rm.loadFunc(id)
    if err != nil {
        return nil, err
    }
    rm.cache[id] = resource
    return resource, nil }

2. Sharded Data Store

 
 
type ShardedStore struct {
    shards    []*Shard
    shardMask int }

type Shard struct {
    mu   sync.RWMutex
    data map[string]interface{}
}

func NewShardedStore(shardCount int) *ShardedStore {
    shards := make([]*Shard, shardCount)
    for i := range shards {
        shards[i] = &Shard{
            data: make(map[string]interface{}),
        }
    }
    return &ShardedStore{
        shards:    shards,
        shardMask: shardCount - 1,
    }
}

func (s *ShardedStore) getShard(key string) *Shard {
    hash := fnv.New32a()
    hash.Write([]byte(key))
    return s.shards[hash.Sum32()&uint32(s.shardMask)]
}

func (s *ShardedStore) Get(key string) (interface{}, bool) {
    shard := s.getShard(key)
    shard.mu.RLock()
    defer shard.mu.RUnlock()
    val, exists := shard.data[key]
    return val, exists
}

Performance Optimization Techniques

1. Lock Striping

 
 
type StripedMap struct {
    stripes    []*stripe
    stripeMask int }

type stripe struct {
    mu   sync.RWMutex
    data map[string]interface{}
}

func (sm *StripedMap) getStripe(key string) *stripe {
    hash := fnv.New32a()
    hash.Write([]byte(key))
    return sm.stripes[hash.Sum32()&uint32(sm.stripeMask)]
}

2. Copy-on-Write Pattern

 
 
type COWCache struct {
    mu       sync.RWMutex
    data     map[string]interface{}
    version  uint64 }

func (c *COWCache) Update(updates map[string]interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()

    // Create a new map with current data     newData := make(map[string]interface{}, len(c.data))
    for k, v := range c.data {
        newData[k] = v
    }

    // Apply updates     for k, v := range updates {
        newData[k] = v
    }

    // Switch to new map     c.data = newData
    c.version++
}

Testing Strategies

1. Race Detector Tests

 
 
func TestConcurrentAccess(t *testing.T) {
    store := NewConfigStore()
    var wg sync.WaitGroup

    // Concurrent reads and writes     for i := 0; i < 100; i++ {
        wg.Add(2)
        
        go func(val int) {
            defer wg.Done()
            store.Set(fmt.Sprintf("key-%d", val), val)
        }(i)

        go func(val int) {
            defer wg.Done()
            store.Get(fmt.Sprintf("key-%d", val))
        }(i)
    }

    wg.Wait()
}

2. Stress Testing

 
 
func BenchmarkConcurrentAccess(b *testing.B) {
    store := NewConfigStore()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            key := fmt.Sprintf("key-%d", rand.Intn(1000))
            if rand.Float32() < 0.2 { // 20% writes                 store.Set(key, rand.Int())
            } else { // 80% reads                 store.Get(key)
            }
        }
    })
}

Best Practices and Common Pitfalls

1. Proper Lock Ordering

 
 
func transfer(from, to *Account, amount int) {
    // Prevent deadlocks by consistent lock ordering     if from.id < to.id {
        from.mu.Lock()
        to.mu.Lock()
    } else {
        to.mu.Lock()
        from.mu.Lock()
    }
    defer func() {
        from.mu.Unlock()
        to.mu.Unlock()
    }()

    from.balance -= amount
    to.balance += amount
}

2. Avoiding Mutex Copying

 
 
// BAD: mutex copied by value type BadCounter struct {
    sync.Mutex
    count int }

// GOOD: use pointer receiver or embedded struct type GoodCounter struct {
    mu    sync.Mutex
    count int }

3. Granular Locking

 
 
type UserManager struct {
    mu      sync.RWMutex
    users   map[string]*User
    metrics *Metrics
}

func (um *UserManager) UpdateMetrics() {
    um.mu.RLock()
    userCount := len(um.users)
    um.mu.RUnlock()

    // Don't hold lock while updating metrics     um.metrics.SetUserCount(userCount)
}

Real-World Applications

1. Connection Pool

 
 
type ConnectionPool struct {
    mu          sync.Mutex
    connections []*Connection
    maxSize     int }

func (p *ConnectionPool) GetConnection() (*Connection, error) {
    p.mu.Lock()
    defer p.mu.Unlock()

    if len(p.connections) == 0 {
        if len(p.connections) >= p.maxSize {
            return nil, errors.New("pool exhausted")
        }
        return p.createConnection()
    }

    // Get last connection     conn := p.connections[len(p.connections)-1]
    p.connections = p.connections[:len(p.connections)-1]
    return conn, nil }

2. Rate Limiter

 
 
type RateLimiter struct {
    mu       sync.Mutex
    tokens   int     capacity int     rate     time.Duration
    lastTime time.Time
}

func (r *RateLimiter) Allow() bool {
    r.mu.Lock()
    defer r.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(r.lastTime)
    r.lastTime = now

    // Add tokens based on elapsed time     r.tokens += int(elapsed / r.rate)
    if r.tokens > r.capacity {
        r.tokens = r.capacity
    }

    if r.tokens <= 0 {
        return false     }

    r.tokens--
    return true }

Conclusion

Understanding and properly using Mutex and RWMutex is crucial for writing concurrent Go programs. Key takeaways:

  1. Choose the Right Tool
    • Use Mutex for simple exclusive access
    • Use RWMutex when reads significantly outnumber writes
    • Consider atomic operations for simple counters
  2. Follow Best Practices
    • Keep critical sections small
    • Use defer for unlocking
    • Maintain consistent lock ordering
    • Avoid copying mutexes
  3. Optimize Performance
    • Use lock striping for high concurrency
    • Implement granular locking
    • Consider copy-on-write for read-heavy scenarios
  4. Test Thoroughly
    • Use the race detector
    • Implement stress tests
    • Test edge cases and error conditions

Remember that while mutexes are powerful, they should be used judiciously. Sometimes other synchronization primitives (channels, atomic operations) might be more appropriate for your specific use case.

Comments