跳到主要内容

Go 的锁饥饿模式是怎么样的

1. 饥饿模式触发条件

核心触发逻辑

const starvationThresholdNs = 1e6  // 1毫秒 = 1,000,000纳秒

// 在 lockSlow() 函数中
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false

for {
// ... 其他逻辑

// 被唤醒后检查等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime() // 记录开始等待时间
}

// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

// 被唤醒后计算等待时间,判断是否需要饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

old = m.state
if old&mutexStarving != 0 {
// 已经在饥饿模式下...
} else if starving && old&mutexLocked != 0 {
// 需要设置饥饿模式
new |= mutexStarving
}
}
}

2. 饥饿模式的触发时机图解

3. 饥饿模式判断的详细流程

4. 什么情况下容易触发饥饿模式

场景1:高并发写入场景

package main

import (
"sync"
"time"
"fmt"
)

func demonstrateStarvation() {
var mu sync.Mutex
var counter int

// 持锁时间较长的goroutine
go func() {
for i := 0; i < 5; i++ {
mu.Lock()
time.Sleep(2 * time.Millisecond) // 持锁2ms
counter++
fmt.Printf("Long holder: %d\n", counter)
mu.Unlock()
time.Sleep(1 * time.Millisecond) // 短暂释放
}
}()

// 大量竞争者
for i := 0; i < 10; i++ {
go func(id int) {
for j := 0; j < 3; j++ {
start := time.Now()
mu.Lock()
waitTime := time.Since(start)

counter++
fmt.Printf("Goroutine %d: waited %v, counter=%d\n",
id, waitTime, counter)

// 如果等待时间超过1ms,很可能触发了饥饿模式
if waitTime > time.Millisecond {
fmt.Printf("*** Goroutine %d likely triggered starvation mode! ***\n", id)
}

mu.Unlock()
}
}(i)
}

time.Sleep(100 * time.Millisecond)
}

场景2:新来的Goroutine不断抢占

场景3:CPU密集型任务中的锁竞争

func cpuIntensiveWithLock() {
var mu sync.Mutex
var sharedData int

// CPU密集型任务
worker := func(id int) {
for i := 0; i < 1000; i++ {
// 先做一些CPU密集计算
result := 0
for j := 0; j < 10000; j++ {
result += j * j
}

// 然后竞争锁
start := time.Now()
mu.Lock()
waitTime := time.Since(start)

sharedData += result % 1000

// 长时间持锁会增加其他goroutine的等待时间
time.Sleep(time.Microsecond * 1500) // 1.5ms

mu.Unlock()

if waitTime > time.Millisecond {
fmt.Printf("Worker %d: waited %v (likely starvation mode)\n",
id, waitTime)
}
}
}

// 启动多个worker
for i := 0; i < 8; i++ {
go worker(i)
}
}

5. 饥饿模式的设置过程

// 在 lockSlow() 中设置饥饿模式的关键代码
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false

for {
// ... 前面的逻辑

// 计算新的state值
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 非饥饿模式才尝试获锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 增加等待者计数
}

// 关键:设置饥饿模式标志
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 设置饥饿标志位
}

if awoke {
new &^= mutexWoken
}

// CAS更新state
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果成功获锁,退出循环
if old&(mutexLocked|mutexStarving) == 0 {
break
}

// 需要等待,记录或更新等待时间
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime() // 首次等待
}

// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

// 被唤醒后,检查是否应该进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 注意:starving一旦为true就会保持,直到获锁成功

old = m.state
// ... 后续处理
}
}
}

6. 饥饿模式的状态变化

7. 避免触发饥饿模式的最佳实践

1. 减少锁持有时间

// 不好的做法 - 容易触发饥饿模式
func badPractice() {
mu.Lock()
defer mu.Unlock()

// 长时间持锁操作
time.Sleep(5 * time.Millisecond) // 持锁5ms!
result := heavyComputation() // 重计算
writeToDatabase(result) // 数据库操作
}

// 好的做法 - 减少锁持有时间
func goodPractice() {
// 先做计算
result := heavyComputation()

// 只在必要时持锁
mu.Lock()
sharedState = result // 快速更新共享状态
mu.Unlock()

// 锁外做数据库操作
writeToDatabase(result)
}

2. 使用buffered channel替代锁

// 使用channel避免锁竞争
type SafeCounter struct {
ch chan func()
value int
}

func NewSafeCounter() *SafeCounter {
sc := &SafeCounter{
ch: make(chan func(), 100), // 带缓冲的channel
}

go func() {
for f := range sc.ch {
f() // 串行执行所有操作
}
}()

return sc
}

func (sc *SafeCounter) Increment() {
done := make(chan struct{})
sc.ch <- func() {
sc.value++
close(done)
}
<-done
}

3. 使用读写锁分离读写操作

type SafeMap struct {
mu sync.RWMutex
data map[string]interface{}
}

func (sm *SafeMap) Get(key string) interface{} {
sm.mu.RLock() // 读锁,多个goroutine可以同时读
defer sm.mu.RUnlock()
return sm.data[key]
}

func (sm *SafeMap) Set(key string, value interface{}) {
sm.mu.Lock() // 写锁,独占访问
defer sm.mu.Unlock()
sm.data[key] = value
}

8. 监控饥饿模式的方法

// 自定义Mutex包装器,用于监控
type MonitoredMutex struct {
mu sync.Mutex
starvationCount int64
}

func (mm *MonitoredMutex) Lock() {
start := time.Now()
mm.mu.Lock()
waitTime := time.Since(start)

if waitTime > time.Millisecond {
atomic.AddInt64(&mm.starvationCount, 1)
fmt.Printf("Potential starvation detected: waited %v\n", waitTime)
}
}

func (mm *MonitoredMutex) Unlock() {
mm.mu.Unlock()
}

func (mm *MonitoredMutex) GetStarvationCount() int64 {
return atomic.LoadInt64(&mm.starvationCount)
}

总结

饥饿模式触发的关键条件

  1. 时间阈值:等待时间 ≥ 1毫秒
  2. 竞争失败:被唤醒后仍然无法获取锁
  3. 持续等待:等待时间是累计的,不会重置

容易触发饥饿模式的场景

  • 高并发环境下的频繁锁竞争
  • 持锁时间过长(> 1ms)
  • 新来的goroutine不断抢占
  • CPU密集型任务中的锁使用

预防策略

  • 尽量减少锁的持有时间
  • 使用适当的并发模式(channel、读写锁等)
  • 避免在锁内进行耗时操作
  • 合理设计并发架构

饥饿模式是Go为了保证公平性而设计的机制,虽然会牺牲一些性能,但能防止某些goroutine永远无法获得锁的情况!