跳到主要内容

GMP 模型是怎么处理阻塞操作的

系统调用的分类和处理策略

在 GMP 模型中,系统调用和阻塞操作的处理是调度器设计的核心部分。Go 运行时需要确保当某些 Goroutine 被阻塞时,其他 Goroutine 仍然能够正常执行,从而最大化 CPU 利用率。

gopark 和 goready 机制详解

goparkgoready 是 Go 运行时中处理 Goroutine 阻塞和唤醒的核心函数:

gopark 和 goready 的代码示例

// 模拟 gopark/goready 的使用场景
package main

import (
"fmt"
"runtime"
"sync"
"time"
)

// 自定义条件变量示例,展示 gopark/goready 的使用模式
type CustomCond struct {
L sync.Locker
waiters []chan struct{}
mu sync.Mutex
}

func NewCustomCond(l sync.Locker) *CustomCond {
return &CustomCond{L: l}
}

func (c *CustomCond) Wait() {
// 创建一个专属的等待 channel
waiter := make(chan struct{})

c.mu.Lock()
c.waiters = append(c.waiters, waiter)
c.mu.Unlock()

// 释放外部锁
c.L.Unlock()

// 这里相当于 gopark,Goroutine 会阻塞等待
fmt.Printf("Goroutine %d: 进入等待状态\n", getGoroutineID())
<-waiter
fmt.Printf("Goroutine %d: 被唤醒,重新获取锁\n", getGoroutineID())

// 重新获取外部锁
c.L.Lock()
}

func (c *CustomCond) Signal() {
c.mu.Lock()
defer c.mu.Unlock()

if len(c.waiters) > 0 {
// 选择第一个等待者唤醒(相当于 goready)
waiter := c.waiters[0]
c.waiters = c.waiters[1:]

fmt.Printf("发送信号唤醒一个等待的 Goroutine\n")
close(waiter) // 唤醒等待的 Goroutine
}
}

func (c *CustomCond) Broadcast() {
c.mu.Lock()
defer c.mu.Unlock()

fmt.Printf("广播信号唤醒所有等待的 Goroutines\n")
// 唤醒所有等待者
for _, waiter := range c.waiters {
close(waiter)
}
c.waiters = nil
}

// 获取当前 Goroutine ID(仅用于演示)
func getGoroutineID() int {
buf := make([]byte, 64)
n := runtime.Stack(buf, false)
// 简化的ID提取,实际应用中不推荐
return int(buf[10]) - int('0')
}

func demonstrateCustomCond() {
var mu sync.Mutex
cond := NewCustomCond(&mu)
ready := false

var wg sync.WaitGroup

// 启动多个等待者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

mu.Lock()
defer mu.Unlock()

for !ready {
fmt.Printf("Worker %d: 条件未满足,进入等待\n", id)
cond.Wait() // 这里会导致 Goroutine 阻塞
fmt.Printf("Worker %d: 被唤醒,检查条件\n", id)
}

fmt.Printf("Worker %d: 条件满足,开始工作\n", id)
}(i)
}

// 主 Goroutine 等待一下,然后改变条件
time.Sleep(2 * time.Second)

mu.Lock()
ready = true
mu.Unlock()

// 广播唤醒所有等待者
cond.Broadcast()

wg.Wait()
fmt.Println("所有 worker 完成")
}

阻塞型系统调用的处理机制

Handoff 机制(移交机制)

实际代码示例:文件操作

package main

import (
"fmt"
"os"
"runtime"
"sync"
"time"
)

func demonstrateFileIO() {
fmt.Printf("开始时 - Goroutines: %d, GOMAXPROCS: %d\n",
runtime.NumGoroutine(), runtime.GOMAXPROCS(0))

var wg sync.WaitGroup

// 创建多个进行文件IO的Goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

filename := fmt.Sprintf("test_%d.txt", id)

// 写文件操作 - 会触发系统调用
file, err := os.Create(filename)
if err != nil {
fmt.Printf("Goroutine %d: 创建文件失败 %v\n", id, err)
return
}
defer os.Remove(filename) // 清理文件

fmt.Printf("Goroutine %d: 开始写文件 - 当前Goroutines: %d\n",
id, runtime.NumGoroutine())

// 写入大量数据,触发多次系统调用
for j := 0; j < 1000; j++ {
file.WriteString(fmt.Sprintf("Line %d from goroutine %d\n", j, id))
}
file.Close()

// 读文件操作 - 又会触发系统调用
fmt.Printf("Goroutine %d: 开始读文件\n", id)
data, err := os.ReadFile(filename)
if err != nil {
fmt.Printf("Goroutine %d: 读取文件失败 %v\n", id, err)
return
}

fmt.Printf("Goroutine %d: 完成文件操作,读取了 %d 字节\n", id, len(data))
}(i)
}

// 同时运行CPU密集型任务
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("CPU任务 %d: 开始计算\n", id)

sum := 0
for j := 0; j < 10000000; j++ {
sum += j
}

fmt.Printf("CPU任务 %d: 计算完成,结果: %d\n", id, sum)
}(i)
}

wg.Wait()
fmt.Printf("结束时 - Goroutines: %d\n", runtime.NumGoroutine())
}

func main() {
demonstrateFileIO()
}

网络 I/O 的特殊处理 - Netpoller

Go 对网络 I/O 采用了特殊的优化策略,使用 netpoller(网络轮询器)来处理网络操作:

下面是网络 I/O 示例

package main

import (
"fmt"
"io"
"net/http"
"runtime"
"sync"
"time"
)

func demonstrateNetworkIO() {
fmt.Printf("开始网络请求测试 - Goroutines: %d\n", runtime.NumGoroutine())

var wg sync.WaitGroup
urls := []string{
"https://httpbin.org/delay/1", // 延迟1秒
"https://httpbin.org/delay/2", // 延迟2秒
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3", // 延迟3秒
}

start := time.Now()

for i, url := range urls {
wg.Add(1)
go func(id int, url string) {
defer wg.Done()

fmt.Printf("Goroutine %d: 开始请求 %s - 当前Goroutines: %d\n",
id, url, runtime.NumGoroutine())

// 这里会触发网络I/O,Goroutine会被netpoller管理
resp, err := http.Get(url)
if err != nil {
fmt.Printf("Goroutine %d: 请求失败 %v\n", id, err)
return
}
defer resp.Body.Close()

// 读取响应体也是I/O操作
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("Goroutine %d: 读取响应失败 %v\n", id, err)
return
}

fmt.Printf("Goroutine %d: 请求完成,响应大小: %d 字节,耗时: %v\n",
id, len(body), time.Since(start))
}(i, url)
}

// 同时运行其他任务证明没有被阻塞
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
fmt.Printf("后台任务 %d: 执行中... %d/5\n", id, j+1)
time.Sleep(500 * time.Millisecond)
}
}(i)
}

wg.Wait()
fmt.Printf("所有任务完成,总耗时: %v,最终Goroutines: %d\n",
time.Since(start), runtime.NumGoroutine())
}

func main() {
demonstrateNetworkIO()
}

我来为你补充一个清晰的 timeline,对比 Handoff 和 Netpoller 机制的异同点:

Handoff vs Netpoller 时序对比

相同点

特性HandoffNetpoller
目标避免阻塞调度器避免阻塞调度器
核心思想隔离阻塞操作隔离阻塞操作
调度连续性保持其他 Goroutine 执行保持其他 Goroutine 执行

关键差异

维度Handoff 机制Netpoller 机制
适用场景文件 I/O、同步系统调用网络 I/O(TCP/UDP socket)
线程处理阻塞当前 M 线程不阻塞 M 线程
资源开销可能创建新的 OS 线程复用现有线程
实现方式线程移交 (hand off)事件驱动 (epoll/kqueue)
唤醒机制系统调用返回后主动调度I/O 事件就绪时异步通知
并发数限制受线程数量限制几乎无限制(受 fd 限制)

这里 Netpoller 之所以不用阻塞,是因为利用了 epoll 的异步通知特性,M 线程可以继续执行其他 Goroutine,而不需要等待网络 I/O 完成。

不同类型阻塞操作的对比

锁阻塞机制

锁阻塞示例代码

package main

import (
"fmt"
"runtime"
"sync"
"time"
)

// 展示不同锁的阻塞行为
func demonstrateLockBlocking() {
fmt.Printf("开始锁竞争测试 - Goroutines: %d\n", runtime.NumGoroutine())

var mu sync.Mutex
var rwmu sync.RWMutex
var wg sync.WaitGroup
counter := 0

// Mutex 竞争场景
fmt.Println("\n=== Mutex 竞争测试 ===")
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

for j := 0; j < 3; j++ {
fmt.Printf("Goroutine %d: 尝试获取 Mutex\n", id)
mu.Lock()
fmt.Printf("Goroutine %d: 获取到 Mutex,开始工作\n", id)

// 模拟临界区工作
oldCounter := counter
time.Sleep(500 * time.Millisecond) // 模拟工作时间
counter = oldCounter + 1

fmt.Printf("Goroutine %d: 完成工作,释放 Mutex (counter=%d)\n", id, counter)
mu.Unlock()
}
}(i)
}

wg.Wait()

// RWMutex 读写竞争场景
fmt.Println("\n=== RWMutex 读写竞争测试 ===")
data := make(map[string]int)
data["key"] = 0

// 启动多个读者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

for j := 0; j < 2; j++ {
fmt.Printf("Reader %d: 尝试获取读锁\n", id)
rwmu.RLock()
fmt.Printf("Reader %d: 获取读锁成功,读取数据: %d\n", id, data["key"])
time.Sleep(300 * time.Millisecond)
rwmu.RUnlock()
fmt.Printf("Reader %d: 释放读锁\n", id)
}
}(i)
}

// 启动写者
wg.Add(1)
go func() {
defer wg.Done()

time.Sleep(500 * time.Millisecond) // 让读者先运行

for i := 0; i < 2; i++ {
fmt.Printf("Writer: 尝试获取写锁\n")
rwmu.Lock()
fmt.Printf("Writer: 获取写锁成功,修改数据\n")
data["key"]++
time.Sleep(400 * time.Millisecond)
fmt.Printf("Writer: 数据修改完成,释放写锁 (新值: %d)\n", data["key"])
rwmu.Unlock()
}
}()

wg.Wait()
fmt.Printf("锁竞争测试完成 - 最终 Goroutines: %d\n", runtime.NumGoroutine())
}

func main() {
demonstrateLockBlocking()
}

Channel 操作的阻塞处理

Channel 阻塞示例

package main

import (
"fmt"
"runtime"
"sync"
"time"
)

func demonstrateChannelBlocking() {
fmt.Printf("开始 Channel 阻塞测试 - Goroutines: %d\n", runtime.NumGoroutine())

// 1. 无缓冲 Channel 阻塞
fmt.Println("\n=== 无缓冲 Channel 测试 ===")
unbufferedCh := make(chan int)
var wg sync.WaitGroup

// 发送者 - 会阻塞等待接收者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
fmt.Printf("发送者: 准备发送 %d\n", i)
unbufferedCh <- i // 这里会阻塞,直到有接收者
fmt.Printf("发送者: 成功发送 %d\n", i)
}
close(unbufferedCh)
fmt.Println("发送者: 关闭 channel")
}()

// 接收者 - 延迟接收
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(1 * time.Second) // 让发送者先阻塞

for data := range unbufferedCh {
fmt.Printf("接收者: 收到数据 %d\n", data)
time.Sleep(500 * time.Millisecond) // 模拟处理时间
}
fmt.Println("接收者: channel 已关闭")
}()

wg.Wait()

// 2. 有缓冲 Channel 溢出阻塞
fmt.Println("\n=== 有缓冲 Channel 溢出测试 ===")
bufferedCh := make(chan string, 2) // 缓冲大小为 2

wg.Add(1)
go func() {
defer wg.Done()
defer close(bufferedCh)

messages := []string{"msg1", "msg2", "msg3", "msg4"}
for i, msg := range messages {
fmt.Printf("发送者: 准备发送 %s (第%d个)\n", msg, i+1)
bufferedCh <- msg
fmt.Printf("发送者: 成功发送 %s\n", msg)

if i == 1 {
fmt.Println("发送者: 缓冲区已满,下一次发送将阻塞...")
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second) // 让缓冲区先填满

for msg := range bufferedCh {
fmt.Printf("接收者: 收到消息 %s\n", msg)
time.Sleep(1 * time.Second)
}
}()

wg.Wait()

// 3. Select 语句阻塞
fmt.Println("\n=== Select 语句阻塞测试 ===")
ch1 := make(chan string)
ch2 := make(chan string)
done := make(chan bool)

// 选择器 Goroutine
wg.Add(1)
go func() {
defer wg.Done()

for {
select {
case msg1 := <-ch1:
fmt.Printf("Select: 从 ch1 收到 %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf("Select: 从 ch2 收到 %s\n", msg2)
case <-done:
fmt.Println("Select: 收到结束信号")
return
default:
fmt.Println("Select: 所有 channel 都没准备好,使用 default")
time.Sleep(500 * time.Millisecond)
}
}
}()

// 发送数据到不同 channel
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(1 * time.Second)
ch1 <- "hello from ch1"

time.Sleep(1 * time.Second)
ch2 <- "hello from ch2"

time.Sleep(1 * time.Second)
done <- true
}()

wg.Wait()
fmt.Printf("Channel 阻塞测试完成 - 最终 Goroutines: %d\n", runtime.NumGoroutine())
}

func main() {
demonstrateChannelBlocking()
}

定时器相关的阻塞处理

定时器阻塞示例

package main

import (
"context"
"fmt"
"runtime"
"sync"
"time"
)

func demonstrateTimerBlocking() {
fmt.Printf("开始定时器阻塞测试 - Goroutines: %d\n", runtime.NumGoroutine())

var wg sync.WaitGroup

// 1. time.Sleep 阻塞
fmt.Println("\n=== time.Sleep 阻塞测试 ===")
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

sleepDuration := time.Duration(id+1) * time.Second
fmt.Printf("Goroutine %d: 开始睡眠 %v\n", id, sleepDuration)

start := time.Now()
time.Sleep(sleepDuration) // 阻塞当前 Goroutine
elapsed := time.Since(start)

fmt.Printf("Goroutine %d: 睡眠结束,实际耗时 %v\n", id, elapsed)
}(i)
}

// 2. time.Ticker 使用
fmt.Println("\n=== time.Ticker 测试 ===")
wg.Add(1)
go func() {
defer wg.Done()

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

count := 0
for {
select {
case t := <-ticker.C: // 阻塞等待下一次触发
count++
fmt.Printf("Ticker: 第 %d 次触发,时间 %v\n", count, t.Format("15:04:05.000"))

if count >= 5 {
fmt.Println("Ticker: 达到 5 次,停止")
return
}
}
}
}()

// 3. time.After 和 context 超时
fmt.Println("\n=== time.After 和 Context 超时测试 ===")
wg.Add(1)
go func() {
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

select {
case <-time.After(3 * time.Second): // 这个会更晚触发
fmt.Println("time.After: 3秒超时触发")
case <-ctx.Done(): // 这个会先触发
fmt.Printf("Context: 超时触发,错误: %v\n", ctx.Err())
}
}()

// 4. 定时器与其他阻塞操作的混合
fmt.Println("\n=== 定时器与 Channel 混合测试 ===")
ch := make(chan string)

wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 3; i++ {
select {
case msg := <-ch:
fmt.Printf("收到消息: %s\n", msg)
case <-time.After(1 * time.Second):
fmt.Printf("超时 %d: 1秒内没有收到消息\n", i+1)
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
defer close(ch)

// 发送一些消息,但不够频繁
time.Sleep(500 * time.Millisecond)
ch <- "第一条消息"

time.Sleep(1500 * time.Millisecond) // 超过超时时间
ch <- "第二条消息"

time.Sleep(800 * time.Millisecond)
ch <- "第三条消息"
}()

wg.Wait()
fmt.Printf("定时器阻塞测试完成 - 最终 Goroutines: %d\n", runtime.NumGoroutine())
}

func main() {
demonstrateTimerBlocking()
}

GC 相关的阻塞处理

完整的阻塞场景总结

阻塞类型处理机制使用函数唤醒条件性能影响
系统调用Handoffgopark/goready系统调用完成可能创建新线程
网络I/ONetpollergopark/goreadyI/O事件就绪高效,无线程阻塞
Mutex等待队列gopark/goready锁被释放低开销,快速唤醒
Channel等待队列gopark/goready有对应操作高效的CSP实现
定时器定时器堆gopark/goready时间到期系统监控线程处理
GCSTW机制全局暂停GC阶段完成全局影响,但时间短

这个完整的文档涵盖了 GMP 模型处理各种阻塞操作的机制,包括:

  1. 完整的阻塞分类:系统调用、同步原语、Channel、GC、定时器
  2. gopark/goready 机制:详细的调用流程和代码示例
  3. 全局队列交互:负载均衡和工作窃取机制
  4. 各种阻塞场景的具体处理:锁竞争、Channel操作、定时器等
  5. 性能监控和调试:实际可运行的示例代码

监控系统调用的工具示例

package main

import (
"context"
"fmt"
"net/http"
"os"
"runtime"
"sync"
"time"
)

// 监控运行时状态
func monitorRuntime(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)

fmt.Printf("[监控] Goroutines: %d, Threads: %d, GOMAXPROCS: %d\n",
runtime.NumGoroutine(),
runtime.NumGoroutine(), // 注意:这里应该是获取M的数量,但Go没有直接API
runtime.GOMAXPROCS(0))
}
}
}

// 混合负载测试:同时包含不同类型的阻塞操作
func mixedWorkload() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// 启动监控
go monitorRuntime(ctx)

var wg sync.WaitGroup

// 1. 文件I/O密集型任务
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
filename := fmt.Sprintf("temp_%d.txt", id)

for j := 0; j < 5; j++ {
// 创建和写入文件 - 阻塞型系统调用
file, _ := os.Create(filename)
file.WriteString(fmt.Sprintf("Data from goroutine %d, iteration %d\n", id, j))
file.Close()

// 读取文件 - 阻塞型系统调用
os.ReadFile(filename)
time.Sleep(200 * time.Millisecond)
}
os.Remove(filename)
fmt.Printf("文件任务 %d 完成\n", id)
}(i)
}

// 2. 网络I/O任务
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

for j := 0; j < 3; j++ {
// HTTP请求 - 使用netpoller
resp, err := http.Get("https://httpbin.org/delay/1")
if err == nil {
resp.Body.Close()
}
fmt.Printf("网络任务 %d 完成请求 %d\n", id, j+1)
}
}(i)
}

// 3. CPU密集型任务
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

for j := 0; j < 10; j++ {
// 纯计算任务 - 不会阻塞
sum := 0
for k := 0; k < 1000000; k++ {
sum += k
}
fmt.Printf("CPU任务 %d 完成计算 %d\n", id, j+1)
time.Sleep(100 * time.Millisecond)
}
}(i)
}

wg.Wait()
fmt.Println("所有任务完成")
}

func main() {
fmt.Printf("系统信息 - CPU核心: %d, GOMAXPROCS: %d\n",
runtime.NumCPU(), runtime.GOMAXPROCS(0))

mixedWorkload()
}

关键要点总结

  1. 系统调用分离:Go 运行时能够区分阻塞和非阻塞系统调用,采用不同策略处理

  2. Handoff 机制:当 M 因系统调用阻塞时,会释放 P 给其他 M,确保 CPU 不会闲置

  3. Netpoller 优化:网络 I/O 使用专门的轮询器,避免创建过多系统线程

  4. 自动恢复:系统调用完成后,Goroutine 会尝试重新获取 P 继续执行

  5. 负载均衡:通过工作窃取和全局队列,确保所有 P 都能得到充分利用

这种设计使得 Go 能够在面对大量阻塞操作时仍然保持高效的并发执行,这也是 Go 在网络服务和 I/O 密集型应用中表现优异的重要原因。