Golang并发
理解并发
并发(Concurrency):多个任务在同一时间段内交替执行(逻辑上同时)
V.S.
并行(Parallelism):多个任务在同一时刻真正同时执行(物理上同时)
1.1 为什么需要并发?
// 没有并发的世界:一个Web服务器
// 假设每个请求处理需要1秒,那么:
// 第1个用户:等1秒
// 第2个用户:等2秒(排队)
// 第1000个用户:等1000秒
// 有并发的世界:
// 第1个用户:等1秒
// 第2个用户:等1秒(同时处理)
// 第1000个用户:等1秒(同时处理)1.2 Go 的并发哲学
"不要通过共享内存来通信,而要通过通信来共享内存"
❌ 传统方式:多个人共用一个笔记本,需要排队写(加锁)
✅ Go 的方式:每个人有自己的笔记本,需要时传纸条(channel)
Goroutine —— Go 的超轻量级线程
2.1 什么是 Goroutine?
线程(Thread) = 雇一个正式员工(成本高,一个约1-8MB内存)
Goroutine = 雇一个临时工(成本极低,一个只要约2KB内存)
这意味着:
- 1GB内存大约能创建 50万个 goroutine
- 但只能创建约 1000 个线程
- goroutine 是 Go 运行时管理的,不是操作系统管理的
2.2 第一个 Goroutine
package main
import (
"fmt"
"time"
)
// 普通函数
func sayHello() {
fmt.Println("Hello, 我是 goroutine!")
}
func main() {
// ============ 普通调用(串行)============
// sayHello() // 这是普通调用,main 会等它执行完
// ============ goroutine 调用(并发)============
go sayHello() // 加一个 go 关键字,就创建了一个 goroutine!
// 就这么简单!
// 重要:main 函数本身也是一个 goroutine(主 goroutine)
// 如果 main 结束了,所有其他 goroutine 都会被强制终止!
time.Sleep(1 * time.Second) // 暂时用 Sleep 等待,后面会学更好的方式
fmt.Println("main 结束了")
}执行结果:
Hello, 我是 goroutine!
main 结束了2.3 理解 Goroutine 的执行顺序
package main
import (
"fmt"
"time"
)
func printNumbers(name string) {
for i := 1; i <= 5; i++ {
fmt.Printf("[%s] 数字: %d\n", name, i)
time.Sleep(100 * time.Millisecond) // 模拟耗时操作
}
}
func main() {
// 启动两个 goroutine
go printNumbers("协程A")
go printNumbers("协程B")
// 主 goroutine 等待
time.Sleep(1 * time.Second)
fmt.Println("=== 主程序结束 ===")
}可能的运行结果(每次可能不同!):
[协程B] 数字: 1
[协程A] 数字: 1
[协程A] 数字: 2
[协程B] 数字: 2
[协程B] 数字: 3
[协程A] 数字: 3
...关键理解:
- goroutine 的执行顺序是不确定的(由 Go 调度器决定)
- 每次运行结果可能不同
- 这就是并发的本质 —— 你不能假设执行顺序
2.4 匿名函数创建 Goroutine
package main
import (
"fmt"
"time"
)
func main() {
// 方式1:使用命名函数
go sayHi("张三")
// 方式2:使用匿名函数(更常用)
go func() {
fmt.Println("我是匿名函数 goroutine")
}() // 注意最后的 () 表示立即调用
// 方式3:匿名函数带参数
name := "李四"
go func(n string) {
fmt.Printf("你好,%s\n", n)
}(name) // 把 name 作为参数传入
// ⚠️ 方式4:闭包陷阱(版本差异点!)
// 在 Go 1.22 之前,这里有 Bug;在 Go 1.22+,这里是安全的。
for i := 0; i < 5; i++ {
go func() {
// [Go < 1.22] ❌ i 是共享的,结果全是 5
// [Go ≥ 1.22] ✅ 每次循环 i 都是独立的,结果 0,1,2,3,4
fmt.Println("方式4 - i =", i)
}()
}
// ✅ 方式5(正确方式):通过参数传值
for i := 0; i < 5; i++ {
go func(n int) {
fmt.Println("正确方式 - n =", n) // ✅ n 是每个 goroutine 自己的副本
}(i) // 把当前的 i 值传进去
}
time.Sleep(1 * time.Second)
}
func sayHi(name string) {
fmt.Printf("你好,%s\n", name)
}传参:为什么有时候要传,有时候不用?
方式 3(传参):
go func(n string) { ... }(name)这里我们将外部变量 name 的值拷贝了一份,传给了匿名函数的参数 n。此时,n 是这个 Goroutine 私有的,外部怎么变都跟 n 没关系。
go 关键字执行的那一瞬间,Go 语言会把外部变量(如 name 或 i)的当前值复制一份(Snapshot)。
闭包(不传参直接用): 如果在匿名函数内部直接使用外部变量(如方式 4 中的 i),这叫做闭包(Closure)。函数“捕获”了外部变量的引用(地址)。
P.S. 注意版本
旧版本 (Go < 1.22):
- 机制: 整个
for循环只有一个i变量(内存地址不变)。 - 结果: 主线程跑得太快,瞬间把
i加到了 5。等 Goroutine 拿起望远镜看的时候,看到的都是 5。 - 对策: 必须使用方式 5(传参)来强制拷贝。
新版本 (Go ≥ 1.22):
- 机制: 编译器做了优化,每次循环都会创建一个全新的
i。 - 结果: 即使是闭包,每个 Goroutine 盯着的也是属于它那一轮的那个
i。输出正常的0, 1, 2, 3, 4。 - 注意: 虽然新版修复了,但在写库代码或兼容老项目时,为了保险,依然推荐使用方式 5。
在方式5中
for i := 0; i < 5; i++ {
go func(n int) {
fmt.Println("正确方式 - n =", n)
}(i) // <--- 关键在这里!
}go 关键字执行的那一瞬间,Go 语言会把当前的 i 的值(比如 0),复制一份给参数 n。
虽然 Goroutine 还没开始运行,但它口袋里已经揣好了属于它自己的那个数字 0。
等到 Goroutine 运行时,它打印的是它口袋里的 n,而不是外部那个已经变成 5 的 i。
2.5 闭包陷阱详细解(注意版本)
错误代码:
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // 所有 goroutine 共享同一个变量 i
}()
}
执行过程:
时间线 →→→→→→→→→→→→→→→→→→→→→→→
主goroutine: i=0 → i=1 → i=2 → i=3 → i=4 → i=5(循环结束)
goroutine1: 打印 i → 5
goroutine2: 打印 i → 5
goroutine3: 打印 i → 5
goroutine4: 打印 i → 5
goroutine5: 打印 i → 5
↑ 全部打印5!原因:goroutine 创建很快,但调度执行有延迟。等 goroutine 真正执行时,循环早已结束,i 已经变成 5。
正确代码:
for i := 0; i < 5; i++ {
go func(n int) { // 参数 n 是 i 的副本
fmt.Println(n)
}(i) // 调用时立即把当前 i 的值复制给 n
}
执行过程:
主goroutine: i=0(复制0给n) → i=1(复制1给n) → i=2(复制2给n) → ...
goroutine1(n=0): 打印 0
goroutine2(n=1): 打印 1
goroutine3(n=2): 打印 2
...Channel —— Goroutine 之间的通信管道
3.1 为什么需要 Channel?
生活类比:
你(main goroutine)派小王(goroutine)去买咖啡。
没有 channel 的情况:
你: "小王去买咖啡"(go buyCoffee())
你: 继续干活...然后走了(main结束)
小王: 买到了!但你已经走了...拿着咖啡不知道给谁
有 channel 的情况:
你们之间有一个"传递窗口"(channel)
你: "小王去买咖啡,买到了放传递窗口"
你: 在传递窗口这里等着...
小王: 买到了!放到传递窗口
你: 从传递窗口拿到咖啡
Channel = goroutine 之间传递数据的管道/传递窗口
3.2 Channel 基础操作
package main
import "fmt"
func main() {
// ========== 创建 channel ==========
// 语法:make(chan 数据类型)
ch := make(chan string) // 创建一个传递 string 的 channel
// ========== 发送和接收 ==========
// 发送:ch <- 数据
// 接收:数据 := <-ch
// 启动一个 goroutine 发送数据
go func() {
ch <- "你好,我是来自 goroutine 的消息" // 发送数据到 channel
fmt.Println("goroutine: 消息已发送")
}()
// 主 goroutine 接收数据
msg := <-ch // 从 channel 接收数据(会阻塞,直到收到数据)
fmt.Println("main: 收到消息 →", msg)
}输出:
goroutine: 消息已发送
main: 收到消息 → 你好,我是来自 goroutine 的消息3.3 Channel 的阻塞特性(重要!)
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
fmt.Println("goroutine: 准备发送数据...")
ch <- 42 // 发送。如果没人接收,这里会阻塞等待
fmt.Println("goroutine: 数据已发送")
}()
fmt.Println("main: 准备接收数据...")
value := <-ch // 接收。如果没有数据,这里会阻塞等待
fmt.Println("main: 收到数据 →", value)
}Channel 阻塞规则(无缓冲 channel):
发送方(ch <- data):
→ 如果没有接收方在等待,发送方会【阻塞】,直到有人来接收
接收方(data := <-ch):
→ 如果没有发送方发送数据,接收方会【阻塞】,直到有人来发送
就像面对面递东西:
- 你伸出手递东西,对方没来 → 你就一直举着手等
- 你伸出手要接东西,对方没来 → 你就一直举着手等
- 双方都到了 → 一手交钱一手交货,各走各的
3.4 Channel 的方向
package main
import "fmt"
// 只能发送的 channel(只写)
func producer(ch chan<- string) {
ch <- "产品A"
ch <- "产品B"
ch <- "产品C"
close(ch) // 发送完毕,关闭 channel
}
// 只能接收的 channel(只读)
func consumer(ch <-chan string) {
for item := range ch { // range 会一直接收,直到 channel 被关闭
fmt.Println("消费了:", item)
}
}
func main() {
ch := make(chan string)
go producer(ch) // 双向 channel 自动转为 只写
consumer(ch) // 双向 channel 自动转为 只读
}Channel 方向总结:
chan string → 双向 channel(可读可写)
chan<- string → 只写 channel(只能发送)——箭头指向 chan,数据进入
<-chan string → 只读 channel(只能接收)——箭头远离 chan,数据出来
记忆技巧:看箭头方向
chan<- 箭头指向chan = 数据流入chan = 只能写/发送
<-chan 箭头离开chan = 数据流出chan = 只能读/接收
3.5 带缓冲的 Channel
package main
import "fmt"
func main() {
// ========== 无缓冲 channel ==========
// ch1 := make(chan int) // 必须有人接收,发送方才能继续
// ========== 有缓冲 channel ==========
ch2 := make(chan int, 3) // 缓冲区大小为3,最多存3个数据
// 可以连续发送3个数据,不需要有人接收
ch2 <- 10
ch2 <- 20
ch2 <- 30
// ch2 <- 40 // 如果再发第4个,就会阻塞(缓冲满了)
fmt.Println("缓冲区长度:", len(ch2)) // 3(当前有3个数据)
fmt.Println("缓冲区容量:", cap(ch2)) // 3(最多能存3个)
// 接收数据(先进先出 FIFO)
fmt.Println(<-ch2) // 10
fmt.Println(<-ch2) // 20
fmt.Println(<-ch2) // 30
}使用场景:
无缓冲:需要严格同步的场景(发送方要确认接收方收到)
有缓冲:允许一定程度异步的场景(提高吞吐量)
3.6 关闭 Channel 和 range 遍历
package main
import "fmt"
func main() {
ch := make(chan int, 5)
// 发送数据
go func() {
for i := 1; i <= 5; i++ {
ch <- i
fmt.Printf("发送: %d\n", i)
}
close(ch) // 发送完毕一定要关闭!否则接收方会永远等待(死锁)
}()
// ========== 接收方式1:for range(推荐)==========
for value := range ch {
// range 会自动检测 channel 是否关闭
// 关闭后自动退出循环
fmt.Printf("接收: %d\n", value)
}
fmt.Println("所有数据接收完毕")
// ========== 接收方式2:ok 判断 ==========
ch2 := make(chan string, 2)
go func() {
ch2 <- "hello"
ch2 <- "world"
close(ch2)
}()
for {
value, ok := <-ch2
if !ok {
// ok 为 false 表示 channel 已关闭且没有数据了
fmt.Println("channel 已关闭")
break
}
fmt.Println("收到:", value)
}
}关闭 Channel 的规则:
- 只有发送方应该关闭 channel(谁发送,谁关闭)
- 接收方不应该关闭 channel
- 关闭已经关闭的 channel 会 panic
- 向已关闭的 channel 发送数据会 panic
从已关闭的 channel 接收数据:
- 如果还有数据,正常接收
- 如果没有数据了,返回零值(int→0, string→"", bool→false)
3.7 select 多路复用
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
// goroutine 1:2秒后发送
go func() {
time.Sleep(2 * time.Second)
ch1 <- "来自 channel 1 的数据"
}()
// goroutine 2:1秒后发送
go func() {
time.Sleep(1 * time.Second)
ch2 <- "来自 channel 2 的数据"
}()
// select 同时监听多个 channel
// 哪个先来就处理哪个
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("收到:", msg1)
case msg2 := <-ch2:
fmt.Println("收到:", msg2)
}
}
}输出:
收到: 来自 channel 2 的数据 (1秒后)
收到: 来自 channel 1 的数据 (2秒后)select 就像你同时等多个外卖:
美团外卖(ch1)
饿了么外卖(ch2)
select {
case 饭 := <-美团: //哪个先到
吃(饭) //就先吃哪个
case 饭 := <-饿了么:
吃(饭)
}3.8 select 的高级用法
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
// ========== 1. 带超时的 select ==========
fmt.Println("--- 超时示例 ---")
select {
case data := <-ch:
fmt.Println("收到数据:", data)
case <-time.After(2 * time.Second): // 2秒后超时
fmt.Println("超时了!没有收到数据")
}
// ========== 2. 带 default 的非阻塞 select ==========
fmt.Println("\n--- 非阻塞示例 ---")
select {
case data := <-ch:
fmt.Println("收到数据:", data)
default:
// 如果所有 channel 都没有数据,立即执行 default
fmt.Println("没有数据可读,不等了")
}
// ========== 3. 定时器 + select 实现定期任务 ==========
fmt.Println("\n--- 定时任务示例 ---")
ticker := time.NewTicker(500 * time.Millisecond) // 每500ms触发一次
defer ticker.Stop()
done := make(chan bool)
go func() {
time.Sleep(2 * time.Second)
done <- true
}()
for {
select {
case <-done:
fmt.Println("任务完成,退出")
return
case t := <-ticker.C:
fmt.Println("定时任务执行:", t.Format("15:04:05.000"))
}
}
}
default:
普通的 select确实是设计用来“等待”channel 数据的。但是,一旦加上了 default 分支,select 的行为模式就完全变了。
当 select 语句中所有其他的 case(无论是读还是写)都无法立即执行(即会被阻塞)时,default 分支就会立即触发。
它的核心作用就是把“阻塞等待”变成了“非阻塞检查”。
情况 1:没有 Default(阻塞模式)
select {
case msg := <-ch:
fmt.Println("收到数据:", msg)
}
// 如果 ch 是空的,程序永远不会执行到这一行(除非 ch 被关闭)情况2:有 Default(非阻塞模式)
Go 运行时会检查 ch:
- 如果有数据:执行
case,读取数据。 - 如果没数据:Go 运行时发现读取
ch会导致阻塞,于是立刻放弃等待,转而执行default。
select {
case msg := <-ch:
fmt.Println("收到数据:", msg)
default:
fmt.Println("通道没数据,我不想等,我先溜了")
}
// 这一行会立刻被执行常见用途:
- 非阻塞接收(探测)
select {
case msg := <-ch:
// 有数据才处理
process(msg)
default:
// 没数据也不阻塞,继续干别的
doOtherWork()
}- 非阻塞发送(防止阻塞)
select {
case ch <- value:
// 发送成功
default:
// 通道满或没有接收者,不阻塞,直接丢弃或处理
log.Println("发送失败,通道繁忙")
}WaitGroup —— 等待一组 Goroutine 完成
在前面的讲解中,我们一直在使用time.Sleep作为权宜之计,但这有很多的问题:
- 不知道该等多久
- 等太久浪费时间
- 等太短可能 goroutine 还没执行完
WaitGroup 就是来解决这个问题的!
4.1 WaitGroup 基础
在 Go 语言的并发编程中,sync.WaitGroup 是一个非常实用的“计数器”,它的核心作用是等待一组并发任务(goroutine)全部完成后,再继续执行后面的代码。
如果不使用 WaitGroup,主线程(main goroutine)可能会在子任务还没跑完时就提前结束,导致程序直接退出。
WaitGroup 只有三个简单的方法,就像是在维护一个计数器:
Add(int):计数器 加 N。通常在启动 goroutine 之前调用,表示“我有 N 个任务要做了”。Done():计数器 减 1。通常在 goroutine 内部的逻辑结束后调用,表示“这个任务做完了”。Wait():阻塞主线程。直到计数器归零,才会解除阻塞,继续往下走。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup // 创建 WaitGroup(零值可用,不需要初始化)
// 启动3个 goroutine
for i := 1; i <= 3; i++ {
wg.Add(1) // 计数器 +1:告诉 WaitGroup "我要启动一个 goroutine"
go func(id int) {
defer wg.Done() // 计数器 -1:当 goroutine 完成时调用(用 defer 保证一定执行)
fmt.Printf("Worker %d: 开始工作\n", id)
time.Sleep(time.Duration(id) * time.Second) // 模拟不同耗时
fmt.Printf("Worker %d: 工作完成\n", id)
}(i)
}
fmt.Println("Main: 等待所有 worker 完成...")
wg.Wait() // 阻塞,直到计数器变为 0
fmt.Println("Main: 所有 worker 都完成了!")
}输出:
Main: 等待所有 worker 完成...
Worker 1: 开始工作
Worker 3: 开始工作
Worker 2: 开始工作
Worker 1: 工作完成 (1秒后)
Worker 2: 工作完成 (2秒后)
Worker 3: 工作完成 (3秒后)
Main: 所有 worker 都完成了!WaitGroup 就像餐厅的"叫号系统":
wg.Add(1) → 来了一桌客人,等待人数 +1
wg.Done() → 一桌客人吃完走了,等待人数 -1
wg.Wait() → 服务员说:"等所有客人都吃完,我再打扫关门"
计数器变化:
Add(1) → 计数器: 0→1
Add(1) → 计数器: 1→2
Add(1) → 计数器: 2→3
Done() → 计数器: 3→2
Done() → 计数器: 2→1
Done() → 计数器: 1→0
Wait() → 检测到计数器=0,不再阻塞,继续执行
4.2 WaitGroup 常见避坑
- 传递指针,而非值: 在函数间传递
WaitGroup时,必须使用指针。如果你直接传值,函数内部会复制一个新的计数器,导致主线程的Wait()永远接收不到归零信号,引发 Deadlock(死锁)。下面还会讲 - Add 的时机: 永远要在
go func()之前调用Add()。如果写在 goroutine 内部,可能会出现主线程已经运行到Wait(),而子协程还没来得及Add的情况,导致程序误以为没有任务直接跳过。 - 计数器不能为负数: 如果
Done()的调用次数超过了Add()的次数,程序会直接 panic。这就是为什么建议使用defer wg.Done()来确保即使函数出错也能正常减去计数。
4.3 WaitGroup 传递给函数(必须用指针)
package main
import (
"fmt"
"sync"
"time"
)
// 必须传指针 *sync.WaitGroup
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成时计数器 -1
fmt.Printf("Worker %d: 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d: 完成\n", id)
}
// 错误!:传值会复制一份,Done() 操作的是副本,原来的计数器不变 → 死锁!
// func worker(id int, wg sync.WaitGroup) { ... }
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg) // 传指针!
}
wg.Wait()
fmt.Println("全部完成")
}在 Go 语言中,所有的参数传递都是值传递。
如果你传值 (sync.WaitGroup): 这就好比你手里有一个原件计数器,但在交给子协程时,你把它放进复印机,“复印”了一份交给它。
- 子协程在复印件上按下了
Done(),复印件归零了。 - 但主线程(main)手里拿的还是原件,原件的计数器依然是 3。
- 主线程执行
wg.Wait()时,它只盯着原件看。原件永远不会变成 0,于是主线程永远等下去,程序报错 Deadlock(死锁)。
如果你传指针 (*sync.WaitGroup): 这就好比你把计数器的家庭地址告诉了子协程。大家顺着地址找到的是同一个实体。子协程减 1,主线程看到的也是减了 1。
在实际开发中,除了传指针,还有一种更“稳妥”且常见的写法:使用闭包。
如果你不想显式传递指针,可以直接在 main 函数的作用域内启动协程,这样就不存在传参拷贝的问题了:
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
// 使用闭包直接引用外部变量 wg
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d 运行中\n", id)
}(i)
}
wg.Wait()
}4.4 实战:并发下载文件
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 模拟下载文件
func downloadFile(filename string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("开始下载: %s\n", filename)
// 模拟下载耗时(1-3秒随机)
duration := time.Duration(rand.Intn(3)+1) * time.Second
time.Sleep(duration)
fmt.Printf("下载完成: %s (耗时 %v)\n", filename, duration)
}
func main() {
files := []string{
"photo1.jpg",
"video.mp4",
"document.pdf",
"music.mp3",
"archive.zip",
}
var wg sync.WaitGroup
start := time.Now()
for _, file := range files {
wg.Add(1)
go downloadFile(file, &wg)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("\n 所有文件下载完成!总耗时: %v\n", elapsed)
fmt.Println("(如果串行下载,至少需要5秒以上)")
}竞态检测(Race Condition Detection)
5.1 什么是竞态条件(Race Condition)?
你和室友共用一个银行账户(余额 1000 元)
【正常情况 —— 串行操作】:
你:查看余额 1000 → 取 500 → 余额更新为 500
室友:查看余额 500 → 取 300 → 余额更新为 200
最终余额:200
【竞态条件 —— 并发操作】:
你:查看余额 1000 → 准备取 500...
室友:查看余额 1000 → 准备取 300...(你还没取完,他看到的还是1000)
你:余额更新为 1000 - 500 = 500
室友:余额更新为 1000 - 300 = 700(覆盖了你的结果!)
最终余额:700 (实际应该是 200!凭空多了 500 元!)
这就是竞态条件:
多个 goroutine 同时读写同一个变量,结果取决于执行顺序,导致程序行为不可预测。
代码演示:
package main
import (
"fmt"
"sync"
)
func main() {
counter := 0
var wg sync.WaitGroup
// 启动 1000 个 goroutine,每个对 counter +1
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 竞态!多个 goroutine 同时读写 counter
}()
}
wg.Wait()
fmt.Println("期望值: 1000")
fmt.Println("实际值:", counter) // 每次运行结果可能不同!
}多次运行可能的结果:

如果哪次等于1000,就是纯碰巧
为什么 counter++ 不是原子操作?
counter++ 在底层其实是三步操作:
- 步骤1:从内存读取 counter 的值到寄存器 (READ)
- 步骤2:在寄存器中将值 +1 (ADD)
- 步骤3:将新值写回内存 (WRITE)

结果:两次 +1 操作,counter 只变成了 1 而不是 2,丢失了一次更新!
5.2 Go 的竞态检测器(Race Detector)
Go 内置了一个非常强大的竞态检测工具,编译时加 -race 标志即可使用。
5.2.1 基本使用
# 运行时检测竞态
go run -race main.go
# 测试时检测竞态
go test -race ./...
# 编译时嵌入竞态检测(生成的二进制文件运行时会检测)
go build -race -o myapp main.go
./myapp5.2.2 检测上面的竞态代码
将之前的代码保存为 main.go,执行:
go run -race main.go
5.3 常见竞态场景与检测
5.3.1 场景一:共享变量并发读写
package main
import (
"fmt"
"sync"
)
// 有竞态!
func raceExample() {
data := make(map[string]int) // map 不是并发安全的!
var wg sync.WaitGroup
// 多个 goroutine 同时写 map
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", n)
data[key] = n // 并发写 map → 竞态!甚至可能 panic!
}(i)
}
wg.Wait()
fmt.Println(data)
}
func main() {
raceExample()
}
输出:fatal error: concurrent map writes
map 的并发写甚至不需要 race detector,直接 panic!
5.3.2 场景二:共享 slice 并发 append
package main
import (
"fmt"
"sync"
)
func main() {
var results []int // slice 不是并发安全的
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
results = append(results, n) // 竞态!
}(i)
}
wg.Wait()
fmt.Println("期望长度: 100")
fmt.Println("实际长度:", len(results)) // 可能不是 100!
}
5.3.3 场景三:结构体字段并发访问
package main
import (
"fmt"
"sync"
)
type User struct {
Name string
Balance int
}
func main() {
user := &User{Name: "张三", Balance: 1000}
var wg sync.WaitGroup
// 多个 goroutine 同时修改同一个结构体字段
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
user.Balance-- // 竞态!并发修改结构体字段
}()
}
wg.Wait()
fmt.Printf("期望: %d, 实际: %d\n", 900, user.Balance)
}5.3.4 场景四:隐蔽的竞态 —— 并发读写 bool/interface
package main
import (
"fmt"
"sync"
"time"
)
// 很多人以为 bool 赋值是原子的 —— 这是错误的!
func main() {
var running bool = true // 即使是 bool,也有竞态!
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(1 * time.Second)
running = false // 一个 goroutine 写
fmt.Println("已设置 running = false")
}()
// 另一个 goroutine 读
wg.Add(1)
go func() {
defer wg.Done()
for running { // 另一个 goroutine 读
time.Sleep(100 * time.Millisecond)
}
fmt.Println("检测到 running 为 false,退出循环")
}()
wg.Wait()
}
5.4 修复竞态的四种方法
5.4.1 方法一:sync.Mutex(互斥锁)
package main
import (
"fmt"
"sync"
)
func main() {
counter := 0
var mu sync.Mutex // 互斥锁
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock() // 🔒 加锁:同一时间只有一个 goroutine 能进入
counter++ // 安全地操作共享变量
mu.Unlock() // 🔓 解锁:让其他 goroutine 可以进入
}()
}
wg.Wait()
fmt.Println("期望值: 1000")
fmt.Println("实际值:", counter) // 一定是 1000
}5.4.2 方法二:sync.RWMutex(读写锁)
核心规则:
- 读锁之间不互斥 → 多个读操作可并发执行
- 写锁与读锁互斥 → 写时阻止所有读
- 写锁与写锁互斥 → 同时只能一个写操作
主要方法
sync.RWMutex 提供了以下 4 个核心方法:
写操作(独占锁):
Lock(): 加写锁。如果已有读锁或写锁,会阻塞直到锁可用。Unlock(): 解写锁。
读操作(共享锁):
RLock(): 加读锁。只要没有写锁(或写锁请求),就可以立即获得。RUnlock(): 解读锁。
代码示例:并发安全的 Map
这是 RWMutex 最经典的使用场景。假设我们有一个配置中心,读取配置的请求非常多,但修改配置的操作很少。
package main
import (
"fmt"
"sync"
"time"
)
// SafeMap 包装了一个普通的 map 和一个 RWMutex
type SafeMap struct {
mu sync.RWMutex // 读写锁
data map[string]string // 被保护的共享数据
}
// Get 读取数据 -> 使用读锁 (RLock)
func (m *SafeMap) Get(key string) string {
m.mu.RLock() // 加读锁(允许多个读者同时进入)
defer m.mu.RUnlock() // 延迟解读锁(函数返回时自动释放)
return m.data[key]
}
// Set 修改数据 -> 使用写锁 (Lock)
func (m *SafeMap) Set(key string, value string) {
m.mu.Lock() // 加写锁(独占,阻止所有读写)
defer m.mu.Unlock() // 确保函数退出时解写锁,即使 panic 也会解锁,防止死锁
m.data[key] = value
}
func main() {
sm := SafeMap{data: make(map[string]string)}
// 模拟写入(写少)
go func() {
for i := 0; i < 5; i++ {
sm.Set("config", fmt.Sprintf("v%d", i))
fmt.Println("Write: Data updated")
time.Sleep(time.Millisecond * 100)
}
}()
// 模拟并发读取(读多)
for i := 0; i < 10; i++ {
go func(id int) {
for {
val := sm.Get("config")
// 多个 reader 可以同时打印,不会互相阻塞
fmt.Printf("Reader %d got: %s\n", id, val)
time.Sleep(time.Millisecond * 50)
}
}(i)
}
time.Sleep(time.Second)
}关键实现细节:写锁优先(防止写饥饿)
在早期的读写锁设计中,容易出现“写饥饿” (Writer Starvation) 现象:如果读请求源源不断,写者可能永远抢不到锁。
Go 语言的 RWMutex 采用了写锁优先的策略来解决这个问题:
- 当一个协程调用
Lock()(申请写锁) 时,它会阻塞等待。 - 关键点: 此时,新的
RLock()(读锁) 请求会被阻塞,即使当前持有锁的也是读者。 - 系统会等待当前已经持有读锁的协程执行完
RUnlock(),然后立即让等待的写者获得锁。 - 这保证了写操作不会因为读流量过大而被无限期推迟。
Mutex vs RWMutex 对比:
sync.Mutex(互斥锁):
Lock() → 独占。不管读还是写,同时只能一个 goroutine。
Unlock()
适用:读写都频繁,或者操作很快
sync.RWMutex(读写锁):
Lock() / Unlock() → 写锁。写的时候独占,所有读写都等待。
RLock() / RUnlock() → 读锁。读的时候共享,多个读可以并发。
适用:读多写少的场景(如配置、缓存)
5.4.3 方法三:sync/atomic(原子操作)
原子操作(Atomic Operation)是指不可分割的操作,就像化学中的"原子"一样,是最小的、不可再分的单位。
在并发编程中,原子操作保证:
- 要么完全执行
- 要么完全不执行
- 不会被其他 goroutine 打断
为什么需要原子操作?
问题场景:数据竞争:
var counter int = 0 // 非原子操作
// 1000 个 goroutine 同时执行 counter++
for i := 0; i < 1000; i++ {
go func() {
counter++ // 危险!不是原子的
}()
}为什么 counter++ 不安全?
因为 counter++ 实际上是 3 个步骤:
- 读取 counter 的值(假设是 5)
- 计算 5 + 1 = 6
- 写回 6 到 counter
在并发时可能发生:
goroutine A: 读取 counter = 5
goroutine B: 读取 counter = 5 ← 还没来得及写回
goroutine A: 计算 5+1 = 6
goroutine B: 计算 5+1 = 6
goroutine A: 写回 6
goroutine B: 写回 6 ← 丢失了一次累加!结果:预期 1000,实际可能只有 900 多。
解决方案:
方案 1:加锁(传统方式)
var mu sync.Mutex
mu.Lock()
counter++
mu.Unlock()缺点:性能开销大(涉及操作系统调度)
方案 2:原子操作
优点:CPU 级别的指令,性能高
atomic.AddInt64(&counter, 1) // 一步到位,无需加锁优点:CPU 级别的指令,性能高
Go 原子操作详解
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64 = 0 // 注意:atomic 操作需要固定位数的类型
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子 +1,无需加锁
}()
}
wg.Wait()
// 读取也要用原子操作
finalValue := atomic.LoadInt64(&counter)
fmt.Println("期望值: 1000")
fmt.Println("实际值:", finalValue) // 一定是 1000
fmt.Println("\n--- 其他原子操作 ---")
// 原子存储
var flag int32
atomic.StoreInt32(&flag, 1)
fmt.Println("flag:", atomic.LoadInt32(&flag)) // 1
// 原子比较并交换 (CAS - Compare And Swap)
// 如果当前值是 1,就换成 2
swapped := atomic.CompareAndSwapInt32(&flag, 1, 2)
fmt.Println("CAS 成功:", swapped) // true
fmt.Println("flag:", atomic.LoadInt32(&flag)) // 2
// 再试一次,当前值是2不是1,所以不会交换
swapped = atomic.CompareAndSwapInt32(&flag, 1, 3)
fmt.Println("CAS 成功:", swapped) // false
fmt.Println("flag:", atomic.LoadInt32(&flag)) // 仍然是 2
// Go 1.19+ 新增的泛型原子类型
var atomicBool atomic.Bool
atomicBool.Store(true)
fmt.Println("atomicBool:", atomicBool.Load()) // true
var atomicValue atomic.Value
atomicValue.Store("hello")
fmt.Println("atomicValue:", atomicValue.Load()) // hello
}Context —— 并发控制的"遥控器"
6.1 为什么需要 Context?
你是项目经理(main goroutine),派了一个团队去做项目:
- 张三负责前端(goroutine 1)
- 李四负责后端(goroutine 2)
- 王五负责数据库(goroutine 3,由李四启动的子goroutine)
问题来了:
- 客户突然说"项目取消了",你怎么通知所有人停下?
- 项目有 deadline(截止时间),到期了怎么让所有人自动停下?
- 你想传递一些项目信息(如项目ID)给所有人
如果没有 Context:
- 你要给每个人单独打电话通知(每个 goroutine 一个 channel)
- 王五是李四启动的,你甚至联系不到他
- 非常混乱!
有了 Context:
- 你有一个"群广播"(context),一键通知所有人
- 支持"到期自动通知"
- 支持"附带信息"
- 子 goroutine 也会自动收到通知(树状传递)
6.2 Context 的基本概念
Context 是 Go 的上下文管理机制,用于:
- 取消信号传递:通知 goroutine 停止工作
- 超时控制:自动取消超时的操作
- 截止时间:设置任务的最后期限
- 传递请求范围的值:(不推荐滥用)
核心接口:
type Context interface {
// 返回 context 的截止时间(如果有)
Deadline() (deadline time.Time, ok bool)
// 返回一个 channel,当 context 被取消时关闭
Done() <-chan struct{}
// 返回取消的原因
Err() error
// 返回与 context 关联的值
Value(key interface{}) interface{}
}6.3 Context 的四种创建方式
1. Background 和 TODO
// Background:根 context,通常用于 main 函数、初始化、测试
ctx := context.Background()
// TODO:当不确定用什么 context 时使用(占位符)
ctx := context.TODO()它们的底层实现完全一致(都是 emptyCtx),只是语义不同:
context.Background():通常作为所有 Context 树的根节点(Root),用于主函数、初始化或测试代码中。
context.TODO():用于当你还不确定要用什么 Context,或者代码还在开发中(To Do)时作为一个占位符。它明确告诉阅读代码的人:“这里未来可能会传入一个具体的 Context” 。
2. WithCancel:手动取消
用于手动控制 Goroutine 的停止。
用法: ctx, cancel := context.WithCancel(parent)
原理:
- 返回一个
cancel函数。调用这个函数会关闭内部的DoneChannel。 - 利用 Go 的 Channel 广播机制:关闭 Channel 会通知所有监听该 Channel 的 Goroutine(
case <-ctx.Done():会立即解除阻塞)。 - 级联取消:
WithCancel会构建父子关系。当父 Context 取消时,它会遍历并取消所有子 Context;但子 Context 取消不会影响父 Context 。
ctx, cancel := context.WithCancel(context.Background())
go func() {
// 监听取消信号
<-ctx.Done()
fmt.Println("任务被取消了:", ctx.Err())
}()
// 手动取消
time.Sleep(1 * time.Second)
cancel() // 调用 cancel() 会关闭 Done() channel3. WithTimeout:超时自动取消
// 3秒后自动取消
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() // 养成好习惯:总是 defer cancel()
go func() {
select {
case <-time.After(5 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("超时取消:", ctx.Err()) // context deadline exceeded
}
}()
time.Sleep(4 * time.Second)4. WithDeadline:指定截止时间
底层逻辑与 WithCancel 类似,只是多了一个定时器。一旦时间到了,会自动调用 cancel 函数
区分取消原因: 可以通过 ctx.Err() 判断任务是因为手动取消(Canceled)还是因为超时(DeadlineExceeded)结束的
// 指定绝对时间点
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
go func() {
<-ctx.Done()
fmt.Println("截止时间到:", ctx.Err())
}()
time.Sleep(3 * time.Second)5. WithValue:传递值(慎用!)
// 传递请求ID
ctx := context.WithValue(context.Background(), "requestID", "12345")
go func(ctx context.Context) {
requestID := ctx.Value("requestID").(string)
fmt.Println("请求ID:", requestID)
}(ctx)警告:
- 不要用 Context 传递函数参数!
- 只用于传递请求范围的元数据(如请求ID、用户身份等)
- 优先使用函数参数,Context 只是补充
6.4 Context 的树状传递(父子关系)
1. Context 树的构成
Context 的树状结构并非一开始就存在,而是通过派生(Derivation)逐步建立的。
- 根节点 (Root): 树的起点通常是
context.Background()(用于 main 函数、初始化)或context.TODO()(用于占位)。它们是空的 Context,没有值,也不会被取消。 - 子节点 (Children): 通过
WithCancel、WithDeadline、WithTimeout或WithValue函数,基于一个“父 Context”创建一个新的“子 Context”。
2. 信号传递:自上而下的取消
这是 Context 最重要的特性。取消信号只向叶子节点方向传递,不可逆流。
传递规则:
- 父死子必亡: 当父 Context 被取消(或超时)时,它会自动关闭自己的
Donechannel。所有基于该父 Context 派生的子 Context(以及孙 Context)都会收到取消信号,它们的Donechannel 也会被关闭。 - 子死父不受影响: 子 Context 被取消时,只会影响它自己以及它的子树,不会影响父 Context 或兄弟 Context。

package main
import (
"context"
"fmt"
"time"
)
func serviceA(ctx context.Context) {
// serviceA 创建子 context,传给 serviceB
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
go serviceB(ctx)
select {
case <-ctx.Done():
fmt.Println("⛔ ServiceA 停止:", ctx.Err())
}
}
func serviceB(ctx context.Context) {
// serviceB 创建子 context,传给 serviceC
ctx = context.WithValue(ctx, "requestID", "req-123")
go serviceC(ctx)
select {
case <-ctx.Done():
fmt.Println("⛔ ServiceB 停止:", ctx.Err())
}
}
func serviceC(ctx context.Context) {
reqID := ctx.Value("requestID") // 可以获取父 context 传递的值
for {
select {
case <-ctx.Done():
fmt.Printf("⛔ ServiceC 停止 (requestID=%v): %v\n", reqID, ctx.Err())
return
default:
fmt.Printf("🔄 ServiceC 工作中 (requestID=%v)\n", reqID)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// 顶层 context
ctx, cancel := context.WithCancel(context.Background())
go serviceA(ctx)
time.Sleep(2 * time.Second)
fmt.Println("\n🔴 Main: 取消顶层 context")
cancel() // 取消顶层,所有子 context 都会被取消!
time.Sleep(1 * time.Second)
fmt.Println("✅ 程序结束")
}Worker Pool —— 并发任务的"流水线工厂"
7.1 为什么需要 Worker Pool?
假设你开了一家快递分拣中心:
没有 Worker Pool(暴力并发):
今天有 10000 个包裹要分拣
你雇了 10000 个临时工,每人分拣一个
结果:场地挤不下,工人互相碰撞,效率反而更低!
有 Worker Pool(流水线模式):
你雇了 10 个固定员工(Worker)
包裹放在传送带上(Job Channel)
每个员工从传送带取一个包裹,分拣完再取下一个
分拣好的包裹放到另一条传送带(Result Channel)
结果:井然有序,效率最高!
Worker Pool 解决的核心问题:
- 资源控制:限制并发数量,防止 goroutine 爆炸(OOM)
- 复用 goroutine:避免频繁创建和销毁的开销
- 背压控制:任务太多时自动排队,不会压垮系统
- 有序收集结果:通过 result channel 统一收集处理结果
7.2 最简单的 Worker Pool
package main
import (
"fmt"
"sync"
"time"
)
// worker 从 jobs channel 接收任务,处理后将结果发送到 results channel
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs { // 不断从 jobs 中取任务,直到 jobs 被关闭
fmt.Printf("Worker %d: 开始处理任务 %d\n", id, job)
time.Sleep(500 * time.Millisecond) // 模拟耗时操作
result := job * 2 // 模拟处理逻辑
fmt.Printf("Worker %d: 任务 %d 完成,结果 = %d\n", id, job, result)
results <- result // 将结果发送到 results channel
}
}
func main() {
const numWorkers = 3 // 工人数量
const numJobs = 10 // 任务数量
jobs := make(chan int, numJobs) // 任务队列(带缓冲)
results := make(chan int, numJobs) // 结果队列(带缓冲)
// ========== 第1步:启动 Worker ==========
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// ========== 第2步:发送任务 ==========
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 所有任务发送完毕,关闭 jobs channel
// ========== 第3步:等待所有 Worker 完成,然后关闭 results ==========
// 用一个额外的 goroutine 来等待,避免阻塞主 goroutine
go func() {
wg.Wait()
close(results) // 所有 worker 完成后,关闭 results channel
}()
// ========== 第4步:收集结果 ==========
var total int
for result := range results {
total += result
}
fmt.Printf("\n所有任务完成!结果总和 = %d\n", total)
}输出示例:
Worker 3: 开始处理任务 1
Worker 1: 开始处理任务 2
Worker 2: 开始处理任务 3
Worker 3: 任务 1 完成,结果 = 2
Worker 3: 开始处理任务 4
Worker 1: 任务 2 完成,结果 = 4
Worker 1: 开始处理任务 5
Worker 2: 任务 3 完成,结果 = 6
Worker 2: 开始处理任务 6
...
所有任务完成!结果总和 = 110执行流程图解:
时间 →→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→
Worker1: [任务1处理中...] [任务4处理中...] [任务7处理中...] [任务10]
Worker2: [任务2处理中...] [任务5处理中...] [任务8处理中...]
Worker3: [任务3处理中...] [任务6处理中...] [任务9处理中...]
3个 Worker 并发处理 10 个任务,每个 Worker 处理完一个就取下一个7.3 理解 Worker Pool 的关键细节
为什么用 for job := range jobs ?
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
// 处理任务...
}
// range 退出 = jobs channel 已关闭且数据已读完
// 此时 worker 自然退出,defer wg.Done() 执行
}range 会持续从 channel 读取数据。当 jobs 被 close() && 缓冲区清空后,range 自动退出循环。这比手动检查 ok 更简洁。
为什么要单开一个 goroutine 等待并关闭 results ?
// 错误做法:在主 goroutine 中先 Wait 再 range
wg.Wait() // 阻塞在这里等 worker 完成
close(results)
for r := range results { ... } // 永远执行不到这里!因为 worker 往 results 发送时也会阻塞
// 正确做法:开一个 goroutine 去等待
go func() {
wg.Wait()
close(results)
}()
for r := range results { ... } // 主 goroutine 可以一边消费 results 一边等主 goroutine 有两件事要做:消费 results 和 等待 wg 归零。如果串行做,会导致死锁——worker 写 results 时可能阻塞(没人读),而主 goroutine 在 wg.Wait() 等 worker 完成。所以需要并行:主 goroutine 负责消费 results,另一个 goroutine 负责等待并关闭。
7.4 带 Context 取消的 Worker Pool
在实际场景中,我们往往需要在任务执行过程中取消所有 Worker(比如用户取消请求、超时等)。
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Job 表示一个任务
type Job struct {
ID int
Payload string
}
// Result 表示任务的处理结果
type Result struct {
Job Job
Output string
WorkerID int
}
func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
// 收到取消信号,立即退出
fmt.Printf("Worker %d: 收到取消信号,退出\n", id)
return
case job, ok := <-jobs:
if !ok {
// jobs channel 已关闭,没有更多任务了
fmt.Printf("Worker %d: 没有更多任务,退出\n", id)
return
}
// 模拟处理任务(可能耗时)
fmt.Printf("Worker %d: 处理任务 %d (%s)\n", id, job.ID, job.Payload)
duration := time.Duration(rand.Intn(3)+1) * time.Second
// 在处理过程中也要检查取消信号
select {
case <-ctx.Done():
fmt.Printf("Worker %d: 任务 %d 处理中被取消\n", id, job.ID)
return
case <-time.After(duration):
// 处理完成
results <- Result{
Job: job,
Output: fmt.Sprintf("结果_%s_已处理", job.Payload),
WorkerID: id,
}
}
}
}
}
func main() {
// 创建可取消的 context,3秒后自动取消
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
const numWorkers = 3
jobs := make(chan Job, 10)
results := make(chan Result, 10)
// 启动 Worker
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(ctx, w, jobs, results, &wg)
}
// 发送任务(可能发到一半就被取消了)
go func() {
for i := 1; i <= 20; i++ {
select {
case <-ctx.Done():
fmt.Println("任务发送被取消,停止发送")
close(jobs)
return
case jobs <- Job{ID: i, Payload: fmt.Sprintf("数据_%d", i)}:
}
}
close(jobs)
}()
// 等待所有 worker 结束并关闭 results
go func() {
wg.Wait()
close(results)
}()
// 收集结果
var count int
for result := range results {
count++
fmt.Printf("收到结果: 任务%d 由Worker%d完成 → %s\n",
result.Job.ID, result.WorkerID, result.Output)
}
fmt.Printf("\n总共完成 %d 个任务(3秒超时)\n", count)
}关键改进点:
select双监听:Worker 同时监听ctx.Done()和jobs,取消信号到来时优先退出- 任务处理中也检查取消:模拟耗时操作时用
select监听,而不是裸time.Sleep - 发送端也检查取消:避免向已经无人消费的 channel 继续发送导致阻塞
7.5 动态调整 Worker 数量
有时候任务量波动很大,我们希望根据负载动态增加或减少 Worker。
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// Pool 是一个可动态调整大小的 Worker Pool
type Pool struct {
jobs chan int // 任务队列
results chan int // 结果队列
workerNum int64 // 当前 worker 数量(原子操作)
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewPool 创建一个新的 Worker Pool
func NewPool(bufferSize int) *Pool {
ctx, cancel := context.WithCancel(context.Background())
return &Pool{
jobs: make(chan int, bufferSize),
results: make(chan int, bufferSize),
ctx: ctx,
cancel: cancel,
}
}
// AddWorker 添加一个 Worker
func (p *Pool) AddWorker() {
p.wg.Add(1)
id := atomic.AddInt64(&p.workerNum, 1)
go func(workerID int64) {
defer p.wg.Done()
defer atomic.AddInt64(&p.workerNum, -1)
fmt.Printf("🟢 Worker %d 上线\n", workerID)
for {
select {
case <-p.ctx.Done():
fmt.Printf("🔴 Worker %d 下线(Pool关闭)\n", workerID)
return
case job, ok := <-p.jobs:
if !ok {
fmt.Printf("🔴 Worker %d 下线(无更多任务)\n", workerID)
return
}
// 处理任务
time.Sleep(200 * time.Millisecond)
p.results <- job * 10
}
}
}(id)
}
// Submit 提交任务
func (p *Pool) Submit(job int) {
p.jobs <- job
}
// Shutdown 优雅关闭
func (p *Pool) Shutdown() {
close(p.jobs) // 先关闭任务队列,让 worker 自然退出
p.wg.Wait() // 等待所有 worker 完成
close(p.results)
}
// ForceShutdown 强制关闭(通过 context 取消)
func (p *Pool) ForceShutdown() {
p.cancel() // 发送取消信号
p.wg.Wait() // 等待所有 worker 响应取消
close(p.results)
}
func main() {
pool := NewPool(20)
// 一开始启动 2 个 Worker
fmt.Println("=== 初始:2 个 Worker ===")
pool.AddWorker()
pool.AddWorker()
// 提交一批任务
go func() {
for i := 1; i <= 15; i++ {
pool.Submit(i)
fmt.Printf("提交任务 %d\n", i)
}
}()
time.Sleep(1 * time.Second)
// 发现任务堆积,动态增加 Worker
fmt.Println("\n=== 负载增加,扩容到 5 个 Worker ===")
pool.AddWorker()
pool.AddWorker()
pool.AddWorker()
fmt.Printf("当前 Worker 数: %d\n", atomic.LoadInt64(&pool.workerNum))
// 在另一个 goroutine 收集结果
go func() {
for result := range pool.results {
fmt.Printf("收到结果: %d\n", result)
}
}()
time.Sleep(3 * time.Second)
// 优雅关闭
fmt.Println("\n=== 关闭 Pool ===")
pool.Shutdown()
fmt.Println(" Pool 已关闭")
}7.6 实战:并发爬虫 Worker Pool
模拟一个并发 URL 爬取场景,用 Worker Pool 控制并发请求数量。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// CrawlJob 爬取任务
type CrawlJob struct {
URL string
}
// CrawlResult 爬取结果
type CrawlResult struct {
URL string
StatusCode int
Body string
Duration time.Duration
WorkerID int
Err error
}
// crawlWorker 爬虫 Worker
func crawlWorker(id int, jobs <-chan CrawlJob, results chan<- CrawlResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
start := time.Now()
// 模拟 HTTP 请求(实际项目中使用 http.Get)
time.Sleep(time.Duration(rand.Intn(2000)+500) * time.Millisecond)
// 模拟结果
statusCode := 200
if rand.Float32() < 0.1 { // 10% 概率失败
statusCode = 500
}
results <- CrawlResult{
URL: job.URL,
StatusCode: statusCode,
Body: fmt.Sprintf("<html>%s的内容</html>", job.URL),
Duration: time.Since(start),
WorkerID: id,
}
}
}
func main() {
// 模拟待爬取的 URL 列表
urls := []string{
"https://example.com/page/1",
"https://example.com/page/2",
"https://example.com/page/3",
"https://example.com/page/4",
"https://example.com/page/5",
"https://example.com/page/6",
"https://example.com/page/7",
"https://example.com/page/8",
"https://example.com/page/9",
"https://example.com/page/10",
}
const numWorkers = 3 // 限制并发数为 3,避免被目标网站封禁
jobs := make(chan CrawlJob, len(urls))
results := make(chan CrawlResult, len(urls))
// 启动 Worker
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go crawlWorker(w, jobs, results, &wg)
}
// 发送任务
start := time.Now()
for _, url := range urls {
jobs <- CrawlJob{URL: url}
}
close(jobs)
// 等待完成并关闭结果通道
go func() {
wg.Wait()
close(results)
}()
// 收集并统计结果
var successCount, failCount int
for result := range results {
if result.StatusCode == 200 {
successCount++
fmt.Printf("✅ [Worker%d] %s → %d (耗时 %v)\n",
result.WorkerID, result.URL, result.StatusCode, result.Duration)
} else {
failCount++
fmt.Printf("❌ [Worker%d] %s → %d (耗时 %v)\n",
result.WorkerID, result.URL, result.StatusCode, result.Duration)
}
}
elapsed := time.Since(start)
fmt.Printf("\n========== 爬取统计 ==========\n")
fmt.Printf("总 URL 数: %d\n", len(urls))
fmt.Printf("成功: %d, 失败: %d\n", successCount, failCount)
fmt.Printf("并发数: %d\n", numWorkers)
fmt.Printf("总耗时: %v\n", elapsed)
fmt.Printf("(串行预估耗时: %v+)\n", time.Duration(len(urls))*time.Second)
}输出示例:
✅ [Worker2] https://example.com/page/2 → 200 (耗时 732ms)
✅ [Worker1] https://example.com/page/1 → 200 (耗时 1.15s)
✅ [Worker3] https://example.com/page/3 → 200 (耗时 1.85s)
✅ [Worker2] https://example.com/page/4 → 200 (耗时 1.02s)
❌ [Worker1] https://example.com/page/5 → 500 (耗时 580ms)
...
========== 爬取统计 ==========
总 URL 数: 10
成功: 9, 失败: 1
并发数: 3
总耗时: 5.2s
(串行预估耗时: 10s+)7.7 Worker Pool 模式总结
核心三件套:
jobs channel → N 个 Worker goroutine → results channel
(任务入口) (并发处理) (结果出口)设计要点:
Worker 数量:根据任务类型选择
- CPU 密集型:Worker 数 ≈ CPU 核心数(
runtime.NumCPU()) - IO 密集型:Worker 数可以更多(10~100),因为大部分时间在等待 IO
- CPU 密集型:Worker 数 ≈ CPU 核心数(
Channel 缓冲区大小:
- 太小:任务生产者会频繁阻塞
- 太大:占用过多内存
- 经验值:Worker 数量的 2~5 倍
优雅关闭顺序:
- 停止发送新任务(
close(jobs)) - 等待 Worker 处理完剩余任务(
wg.Wait()) - 关闭结果通道(
close(results))
- 停止发送新任务(
- 错误处理:Result 结构体中包含
Err字段,让调用者决定如何处理
与直接创建 goroutine 的对比:

评论 (0)