Golang并发
侧边栏壁纸
  • 累计撰写 11 篇文章
  • 累计收到 1 条评论

Golang并发

tqtqtq
2026-02-13 / 0 评论 / 7 阅读 / 正在检测是否收录...

Golang并发

理解并发

并发(Concurrency):多个任务在同一时间段内交替执行(逻辑上同时)

V.S.

并行(Parallelism):多个任务在同一时刻真正同时执行(物理上同时)

1.1 为什么需要并发?

// 没有并发的世界:一个Web服务器
// 假设每个请求处理需要1秒,那么:
// 第1个用户:等1秒
// 第2个用户:等2秒(排队)
// 第1000个用户:等1000秒

// 有并发的世界:
// 第1个用户:等1秒
// 第2个用户:等1秒(同时处理)
// 第1000个用户:等1秒(同时处理)

1.2 Go 的并发哲学

"不要通过共享内存来通信,而要通过通信来共享内存"

❌ 传统方式:多个人共用一个笔记本,需要排队写(加锁)
✅ Go 的方式:每个人有自己的笔记本,需要时传纸条(channel)


Goroutine —— Go 的超轻量级线程

2.1 什么是 Goroutine?

线程(Thread) = 雇一个正式员工(成本高,一个约1-8MB内存)

Goroutine = 雇一个临时工(成本极低,一个只要约2KB内存)

这意味着:

  • 1GB内存大约能创建 50万个 goroutine
  • 但只能创建约 1000 个线程
  • goroutine 是 Go 运行时管理的,不是操作系统管理的

2.2 第一个 Goroutine

package main

import (
    "fmt"
    "time"
)

// 普通函数
func sayHello() {
    fmt.Println("Hello, 我是 goroutine!")
}

func main() {
    // ============ 普通调用(串行)============
    // sayHello()  // 这是普通调用,main 会等它执行完

    // ============ goroutine 调用(并发)============
    go sayHello() // 加一个 go 关键字,就创建了一个 goroutine!
    // 就这么简单!

    // 重要:main 函数本身也是一个 goroutine(主 goroutine)
    // 如果 main 结束了,所有其他 goroutine 都会被强制终止!

    time.Sleep(1 * time.Second) // 暂时用 Sleep 等待,后面会学更好的方式
    fmt.Println("main 结束了")
}

执行结果:

Hello, 我是 goroutine!
main 结束了

2.3 理解 Goroutine 的执行顺序

package main

import (
    "fmt"
    "time"
)

func printNumbers(name string) {
    for i := 1; i <= 5; i++ {
        fmt.Printf("[%s] 数字: %d\n", name, i)
        time.Sleep(100 * time.Millisecond) // 模拟耗时操作
    }
}

func main() {
    // 启动两个 goroutine
    go printNumbers("协程A")
    go printNumbers("协程B")

    // 主 goroutine 等待
    time.Sleep(1 * time.Second)
    fmt.Println("=== 主程序结束 ===")
}

可能的运行结果(每次可能不同!):

[协程B] 数字: 1
[协程A] 数字: 1
[协程A] 数字: 2
[协程B] 数字: 2
[协程B] 数字: 3
[协程A] 数字: 3
...

关键理解:

  1. goroutine 的执行顺序是不确定的(由 Go 调度器决定)
  2. 每次运行结果可能不同
  3. 这就是并发的本质 —— 你不能假设执行顺序

2.4 匿名函数创建 Goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    // 方式1:使用命名函数
    go sayHi("张三")

    // 方式2:使用匿名函数(更常用)
    go func() {
        fmt.Println("我是匿名函数 goroutine")
    }() // 注意最后的 () 表示立即调用

    // 方式3:匿名函数带参数
    name := "李四"
    go func(n string) {
        fmt.Printf("你好,%s\n", n)
    }(name) // 把 name 作为参数传入

    // ⚠️ 方式4:闭包陷阱(版本差异点!)
    // 在 Go 1.22 之前,这里有 Bug;在 Go 1.22+,这里是安全的。
    for i := 0; i < 5; i++ {
        go func() {
            // [Go < 1.22] ❌ i 是共享的,结果全是 5
            // [Go ≥ 1.22] ✅ 每次循环 i 都是独立的,结果 0,1,2,3,4
            fmt.Println("方式4 - i =", i) 
        }()
    }

    // ✅ 方式5(正确方式):通过参数传值
    for i := 0; i < 5; i++ {
        go func(n int) {
            fmt.Println("正确方式 - n =", n) // ✅ n 是每个 goroutine 自己的副本
        }(i) // 把当前的 i 值传进去
    }

    time.Sleep(1 * time.Second)
}

func sayHi(name string) {
    fmt.Printf("你好,%s\n", name)
}

传参:为什么有时候要传,有时候不用?

方式 3(传参):

go func(n string) { ... }(name)

这里我们将外部变量 name拷贝了一份,传给了匿名函数的参数 n。此时,n 是这个 Goroutine 私有的,外部怎么变都跟 n 没关系。

go 关键字执行的那一瞬间,Go 语言会把外部变量(如 namei)的当前值复制一份(Snapshot)。

闭包(不传参直接用): 如果在匿名函数内部直接使用外部变量(如方式 4 中的 i),这叫做闭包(Closure)。函数“捕获”了外部变量的引用(地址)


P.S. 注意版本

旧版本 (Go < 1.22):

  • 机制: 整个 for 循环只有一个 i 变量(内存地址不变)。
  • 结果: 主线程跑得太快,瞬间把 i 加到了 5。等 Goroutine 拿起望远镜看的时候,看到的都是 5。
  • 对策: 必须使用方式 5(传参)来强制拷贝。

新版本 (Go ≥ 1.22):

  • 机制: 编译器做了优化,每次循环都会创建一个全新的 i
  • 结果: 即使是闭包,每个 Goroutine 盯着的也是属于它那一轮的那个 i。输出正常的 0, 1, 2, 3, 4
  • 注意: 虽然新版修复了,但在写库代码兼容老项目时,为了保险,依然推荐使用方式 5

在方式5中

for i := 0; i < 5; i++ {
    go func(n int) {
        fmt.Println("正确方式 - n =", n)
    }(i) // <--- 关键在这里!
}

go 关键字执行的那一瞬间,Go 语言会把当前的 i 的值(比如 0),复制一份给参数 n

虽然 Goroutine 还没开始运行,但它口袋里已经揣好了属于它自己的那个数字 0

等到 Goroutine 运行时,它打印的是它口袋里的 n,而不是外部那个已经变成 5 的 i

2.5 闭包陷阱详细解(注意版本)

错误代码:

for i := 0; i < 5; i++ {
    go func() {
        fmt.Println(i)  // 所有 goroutine 共享同一个变量 i
    }()
}

执行过程:
时间线 →→→→→→→→→→→→→→→→→→→→→→→
主goroutine:  i=0 → i=1 → i=2 → i=3 → i=4 → i=5(循环结束)
goroutine1:                                   打印 i → 5
goroutine2:                                   打印 i → 5
goroutine3:                                   打印 i → 5
goroutine4:                                   打印 i → 5
goroutine5:                                   打印 i → 5
                                              ↑ 全部打印5!

原因:goroutine 创建很快,但调度执行有延迟。等 goroutine 真正执行时,循环早已结束,i 已经变成 5。

正确代码:

for i := 0; i < 5; i++ {
    go func(n int) {  // 参数 n 是 i 的副本
        fmt.Println(n)
    }(i)  // 调用时立即把当前 i 的值复制给 n
}

执行过程:
主goroutine:  i=0(复制0给n) → i=1(复制1给n) → i=2(复制2给n) → ...
goroutine1(n=0):                                     打印 0 
goroutine2(n=1):                                     打印 1 
goroutine3(n=2):                                     打印 2 
...

Channel —— Goroutine 之间的通信管道

3.1 为什么需要 Channel?

生活类比:

你(main goroutine)派小王(goroutine)去买咖啡。

没有 channel 的情况:
你: "小王去买咖啡"(go buyCoffee())
你: 继续干活...然后走了(main结束)
小王: 买到了!但你已经走了...拿着咖啡不知道给谁

有 channel 的情况:
你们之间有一个"传递窗口"(channel)
你: "小王去买咖啡,买到了放传递窗口"
你: 在传递窗口这里等着...
小王: 买到了!放到传递窗口
你: 从传递窗口拿到咖啡

Channel = goroutine 之间传递数据的管道/传递窗口

3.2 Channel 基础操作

package main

import "fmt"

func main() {
    // ========== 创建 channel ==========
    // 语法:make(chan 数据类型)
    ch := make(chan string) // 创建一个传递 string 的 channel

    // ========== 发送和接收 ==========
    // 发送:ch <- 数据
    // 接收:数据 := <-ch

    // 启动一个 goroutine 发送数据
    go func() {
        ch <- "你好,我是来自 goroutine 的消息" // 发送数据到 channel
        fmt.Println("goroutine: 消息已发送")
    }()

    // 主 goroutine 接收数据
    msg := <-ch // 从 channel 接收数据(会阻塞,直到收到数据)
    fmt.Println("main: 收到消息 →", msg)
}

输出:

goroutine: 消息已发送
main: 收到消息 → 你好,我是来自 goroutine 的消息

3.3 Channel 的阻塞特性(重要!)

package main

import "fmt"

func main() {
    ch := make(chan int)

    go func() {
        fmt.Println("goroutine: 准备发送数据...")
        ch <- 42 // 发送。如果没人接收,这里会阻塞等待
        fmt.Println("goroutine: 数据已发送")
    }()

    fmt.Println("main: 准备接收数据...")
    value := <-ch // 接收。如果没有数据,这里会阻塞等待
    fmt.Println("main: 收到数据 →", value)
}

Channel 阻塞规则(无缓冲 channel):

发送方(ch <- data):
→ 如果没有接收方在等待,发送方会【阻塞】,直到有人来接收

接收方(data := <-ch):
→ 如果没有发送方发送数据,接收方会【阻塞】,直到有人来发送

就像面对面递东西:

  • 你伸出手递东西,对方没来 → 你就一直举着手等
  • 你伸出手要接东西,对方没来 → 你就一直举着手等
  • 双方都到了 → 一手交钱一手交货,各走各的

3.4 Channel 的方向

package main

import "fmt"

// 只能发送的 channel(只写)
func producer(ch chan<- string) {
    ch <- "产品A"
    ch <- "产品B"
    ch <- "产品C"
    close(ch) // 发送完毕,关闭 channel
}

// 只能接收的 channel(只读)
func consumer(ch <-chan string) {
    for item := range ch { // range 会一直接收,直到 channel 被关闭
        fmt.Println("消费了:", item)
    }
}

func main() {
    ch := make(chan string)

    go producer(ch) // 双向 channel 自动转为 只写
    consumer(ch)    // 双向 channel 自动转为 只读
}

Channel 方向总结:

chan string → 双向 channel(可读可写)
chan<- string → 只写 channel(只能发送)——箭头指向 chan,数据进入
<-chan string → 只读 channel(只能接收)——箭头远离 chan,数据出来

记忆技巧:看箭头方向
chan<- 箭头指向chan = 数据流入chan = 只能写/发送
<-chan 箭头离开chan = 数据流出chan = 只能读/接收

3.5 带缓冲的 Channel

package main

import "fmt"

func main() {
    // ========== 无缓冲 channel ==========
    // ch1 := make(chan int)  // 必须有人接收,发送方才能继续

    // ========== 有缓冲 channel ==========
    ch2 := make(chan int, 3) // 缓冲区大小为3,最多存3个数据

    // 可以连续发送3个数据,不需要有人接收
    ch2 <- 10
    ch2 <- 20
    ch2 <- 30
    // ch2 <- 40  // 如果再发第4个,就会阻塞(缓冲满了)

    fmt.Println("缓冲区长度:", len(ch2)) // 3(当前有3个数据)
    fmt.Println("缓冲区容量:", cap(ch2)) // 3(最多能存3个)

    // 接收数据(先进先出 FIFO)
    fmt.Println(<-ch2) // 10
    fmt.Println(<-ch2) // 20
    fmt.Println(<-ch2) // 30
}

使用场景:
无缓冲:需要严格同步的场景(发送方要确认接收方收到)
有缓冲:允许一定程度异步的场景(提高吞吐量)

3.6 关闭 Channel 和 range 遍历

package main

import "fmt"

func main() {
    ch := make(chan int, 5)

    // 发送数据
    go func() {
        for i := 1; i <= 5; i++ {
            ch <- i
            fmt.Printf("发送: %d\n", i)
        }
        close(ch) // 发送完毕一定要关闭!否则接收方会永远等待(死锁)
    }()

    // ========== 接收方式1:for range(推荐)==========
    for value := range ch {
        // range 会自动检测 channel 是否关闭
        // 关闭后自动退出循环
        fmt.Printf("接收: %d\n", value)
    }

    fmt.Println("所有数据接收完毕")

    // ========== 接收方式2:ok 判断 ==========
    ch2 := make(chan string, 2)
    go func() {
        ch2 <- "hello"
        ch2 <- "world"
        close(ch2)
    }()

    for {
        value, ok := <-ch2
        if !ok {
            // ok 为 false 表示 channel 已关闭且没有数据了
            fmt.Println("channel 已关闭")
            break
        }
        fmt.Println("收到:", value)
    }
}

关闭 Channel 的规则:

  1. 只有发送方应该关闭 channel(谁发送,谁关闭)
  2. 接收方不应该关闭 channel
  3. 关闭已经关闭的 channel 会 panic
  4. 向已关闭的 channel 发送数据会 panic
  5. 从已关闭的 channel 接收数据:

    • 如果还有数据,正常接收
    • 如果没有数据了,返回零值(int→0, string→"", bool→false)

3.7 select 多路复用

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    // goroutine 1:2秒后发送
    go func() {
        time.Sleep(2 * time.Second)
        ch1 <- "来自 channel 1 的数据"
    }()

    // goroutine 2:1秒后发送
    go func() {
        time.Sleep(1 * time.Second)
        ch2 <- "来自 channel 2 的数据"
    }()

    // select 同时监听多个 channel
    // 哪个先来就处理哪个
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("收到:", msg1)
        case msg2 := <-ch2:
            fmt.Println("收到:", msg2)
        }
    }
}

输出:

收到: 来自 channel 2 的数据    (1秒后)
收到: 来自 channel 1 的数据    (2秒后)

select 就像你同时等多个外卖:

​ 美团外卖(ch1)

​ 饿了么外卖(ch2)

select {
  case 饭 := <-美团:     //哪个先到
      吃(饭)             //就先吃哪个
  case 饭 := <-饿了么:
      吃(饭)
  }

3.8 select 的高级用法

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int)

    // ========== 1. 带超时的 select ==========
    fmt.Println("--- 超时示例 ---")
    select {
    case data := <-ch:
        fmt.Println("收到数据:", data)
    case <-time.After(2 * time.Second): // 2秒后超时
        fmt.Println("超时了!没有收到数据")
    }

    // ========== 2. 带 default 的非阻塞 select ==========
    fmt.Println("\n--- 非阻塞示例 ---")
    select {
    case data := <-ch:
        fmt.Println("收到数据:", data)
    default:
        // 如果所有 channel 都没有数据,立即执行 default
        fmt.Println("没有数据可读,不等了")
    }

    // ========== 3. 定时器 + select 实现定期任务 ==========
    fmt.Println("\n--- 定时任务示例 ---")
    ticker := time.NewTicker(500 * time.Millisecond) // 每500ms触发一次
    defer ticker.Stop()

    done := make(chan bool)
    go func() {
        time.Sleep(2 * time.Second)
        done <- true
    }()

    for {
        select {
        case <-done:
            fmt.Println("任务完成,退出")
            return
        case t := <-ticker.C:
            fmt.Println("定时任务执行:", t.Format("15:04:05.000"))
        }
    }
}

default:

普通的 select确实是设计用来“等待”channel 数据的。但是,一旦加上了 default 分支,select 的行为模式就完全变了。

select 语句中所有其他的 case(无论是读还是写)都无法立即执行(即会被阻塞)时,default 分支就会立即触发。

它的核心作用就是把“阻塞等待”变成了“非阻塞检查”

情况 1:没有 Default(阻塞模式)

select {
case msg := <-ch:
    fmt.Println("收到数据:", msg)
}
// 如果 ch 是空的,程序永远不会执行到这一行(除非 ch 被关闭)

情况2:有 Default(非阻塞模式)

Go 运行时会检查 ch

  1. 如果有数据:执行 case,读取数据。
  2. 如果没数据:Go 运行时发现读取 ch 会导致阻塞,于是立刻放弃等待,转而执行 default
select {
case msg := <-ch:
    fmt.Println("收到数据:", msg)
default:
    fmt.Println("通道没数据,我不想等,我先溜了")
}
// 这一行会立刻被执行

常见用途:

  1. 非阻塞接收(探测)
select {
case msg := <-ch:
    // 有数据才处理
    process(msg)
default:
    // 没数据也不阻塞,继续干别的
    doOtherWork()
}
  1. 非阻塞发送(防止阻塞)
select {
case ch <- value:
    // 发送成功
default:
    // 通道满或没有接收者,不阻塞,直接丢弃或处理
    log.Println("发送失败,通道繁忙")
}

WaitGroup —— 等待一组 Goroutine 完成

在前面的讲解中,我们一直在使用time.Sleep作为权宜之计,但这有很多的问题:

  • 不知道该等多久
  • 等太久浪费时间
  • 等太短可能 goroutine 还没执行完

WaitGroup 就是来解决这个问题的!

4.1 WaitGroup 基础

在 Go 语言的并发编程中,sync.WaitGroup 是一个非常实用的“计数器”,它的核心作用是等待一组并发任务(goroutine)全部完成后,再继续执行后面的代码

如果不使用 WaitGroup,主线程(main goroutine)可能会在子任务还没跑完时就提前结束,导致程序直接退出。

WaitGroup 只有三个简单的方法,就像是在维护一个计数器:

  • Add(int):计数器 加 N。通常在启动 goroutine 之前调用,表示“我有 N 个任务要做了”。
  • Done():计数器 减 1。通常在 goroutine 内部的逻辑结束后调用,表示“这个任务做完了”。
  • Wait()阻塞主线程。直到计数器归零,才会解除阻塞,继续往下走。
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup // 创建 WaitGroup(零值可用,不需要初始化)

    // 启动3个 goroutine
    for i := 1; i <= 3; i++ {
        wg.Add(1) // 计数器 +1:告诉 WaitGroup "我要启动一个 goroutine"

        go func(id int) {
            defer wg.Done() // 计数器 -1:当 goroutine 完成时调用(用 defer 保证一定执行)

            fmt.Printf("Worker %d: 开始工作\n", id)
            time.Sleep(time.Duration(id) * time.Second) // 模拟不同耗时
            fmt.Printf("Worker %d: 工作完成\n", id)
        }(i)
    }

    fmt.Println("Main: 等待所有 worker 完成...")
    wg.Wait() // 阻塞,直到计数器变为 0
    fmt.Println("Main: 所有 worker 都完成了!")
}

输出:

Main: 等待所有 worker 完成...
Worker 1: 开始工作
Worker 3: 开始工作
Worker 2: 开始工作
Worker 1: 工作完成       (1秒后)
Worker 2: 工作完成       (2秒后)
Worker 3: 工作完成       (3秒后)
Main: 所有 worker 都完成了!

WaitGroup 就像餐厅的"叫号系统":

wg.Add(1) → 来了一桌客人,等待人数 +1
wg.Done() → 一桌客人吃完走了,等待人数 -1
wg.Wait() → 服务员说:"等所有客人都吃完,我再打扫关门"

计数器变化:
Add(1) → 计数器: 0→1
Add(1) → 计数器: 1→2
Add(1) → 计数器: 2→3
Done() → 计数器: 3→2
Done() → 计数器: 2→1
Done() → 计数器: 1→0
Wait() → 检测到计数器=0,不再阻塞,继续执行

4.2 WaitGroup 常见避坑

  • 传递指针,而非值: 在函数间传递 WaitGroup 时,必须使用指针。如果你直接传值,函数内部会复制一个新的计数器,导致主线程的 Wait() 永远接收不到归零信号,引发 Deadlock(死锁)。下面还会讲
  • Add 的时机: 永远要在 go func() 之前调用 Add()。如果写在 goroutine 内部,可能会出现主线程已经运行到 Wait(),而子协程还没来得及 Add 的情况,导致程序误以为没有任务直接跳过。
  • 计数器不能为负数: 如果 Done() 的调用次数超过了 Add() 的次数,程序会直接 panic。这就是为什么建议使用 defer wg.Done() 来确保即使函数出错也能正常减去计数。

4.3 WaitGroup 传递给函数(必须用指针)

package main

import (
    "fmt"
    "sync"
    "time"
)

// 必须传指针 *sync.WaitGroup
func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成时计数器 -1

    fmt.Printf("Worker %d: 开始\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d: 完成\n", id)
}

// 错误!:传值会复制一份,Done() 操作的是副本,原来的计数器不变 → 死锁!
// func worker(id int, wg sync.WaitGroup) { ... }

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, &wg) // 传指针!
    }

    wg.Wait()
    fmt.Println("全部完成")
}

在 Go 语言中,所有的参数传递都是值传递

如果你传值 (sync.WaitGroup): 这就好比你手里有一个原件计数器,但在交给子协程时,你把它放进复印机,“复印”了一份交给它。

  • 子协程在复印件上按下了 Done(),复印件归零了。
  • 但主线程(main)手里拿的还是原件,原件的计数器依然是 3。
  • 主线程执行 wg.Wait() 时,它只盯着原件看。原件永远不会变成 0,于是主线程永远等下去,程序报错 Deadlock(死锁)

如果你传指针 (*sync.WaitGroup): 这就好比你把计数器的家庭地址告诉了子协程。大家顺着地址找到的是同一个实体。子协程减 1,主线程看到的也是减了 1。

在实际开发中,除了传指针,还有一种更“稳妥”且常见的写法:使用闭包

如果你不想显式传递指针,可以直接在 main 函数的作用域内启动协程,这样就不存在传参拷贝的问题了:

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        // 使用闭包直接引用外部变量 wg
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Worker %d 运行中\n", id)
        }(i) 
    }

    wg.Wait()
}

4.4 实战:并发下载文件

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 模拟下载文件
func downloadFile(filename string, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("开始下载: %s\n", filename)

    // 模拟下载耗时(1-3秒随机)
    duration := time.Duration(rand.Intn(3)+1) * time.Second
    time.Sleep(duration)

    fmt.Printf("下载完成: %s (耗时 %v)\n", filename, duration)
}

func main() {
    files := []string{
        "photo1.jpg",
        "video.mp4",
        "document.pdf",
        "music.mp3",
        "archive.zip",
    }

    var wg sync.WaitGroup
    start := time.Now()

    for _, file := range files {
        wg.Add(1)
        go downloadFile(file, &wg)
    }

    wg.Wait()

    elapsed := time.Since(start)
    fmt.Printf("\n 所有文件下载完成!总耗时: %v\n", elapsed)
    fmt.Println("(如果串行下载,至少需要5秒以上)")
}

竞态检测(Race Condition Detection)

5.1 什么是竞态条件(Race Condition)?

你和室友共用一个银行账户(余额 1000 元)

【正常情况 —— 串行操作】:
你:查看余额 1000 → 取 500 → 余额更新为 500
室友:查看余额 500 → 取 300 → 余额更新为 200
最终余额:200

【竞态条件 —— 并发操作】:
你:查看余额 1000 → 准备取 500...
室友:查看余额 1000 → 准备取 300...(你还没取完,他看到的还是1000)
你:余额更新为 1000 - 500 = 500
室友:余额更新为 1000 - 300 = 700(覆盖了你的结果!)
最终余额:700 (实际应该是 200!凭空多了 500 元!)

这就是竞态条件:
多个 goroutine 同时读写同一个变量,结果取决于执行顺序,导致程序行为不可预测。

代码演示:

package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var wg sync.WaitGroup

    // 启动 1000 个 goroutine,每个对 counter +1
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // 竞态!多个 goroutine 同时读写 counter
        }()
    }

    wg.Wait()
    fmt.Println("期望值: 1000")
    fmt.Println("实际值:", counter) // 每次运行结果可能不同!
}

多次运行可能的结果:

image-20260211022326573

如果哪次等于1000,就是纯碰巧

为什么 counter++ 不是原子操作?

counter++ 在底层其实是三步操作:

  • 步骤1:从内存读取 counter 的值到寄存器 (READ)
  • 步骤2:在寄存器中将值 +1 (ADD)
  • 步骤3:将新值写回内存 (WRITE)

image-20260211021926984

结果:两次 +1 操作,counter 只变成了 1 而不是 2,丢失了一次更新!

5.2 Go 的竞态检测器(Race Detector)

Go 内置了一个非常强大的竞态检测工具,编译时加 -race 标志即可使用。

5.2.1 基本使用

# 运行时检测竞态
go run -race main.go

# 测试时检测竞态
go test -race ./...

# 编译时嵌入竞态检测(生成的二进制文件运行时会检测)
go build -race -o myapp main.go
./myapp

5.2.2 检测上面的竞态代码

将之前的代码保存为 main.go,执行:

go run -race main.go

image-20260211024332073

5.3 常见竞态场景与检测

5.3.1 场景一:共享变量并发读写

package main

import (
    "fmt"
    "sync"
)

// 有竞态!
func raceExample() {
    data := make(map[string]int) // map 不是并发安全的!
    var wg sync.WaitGroup

    // 多个 goroutine 同时写 map
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", n)
            data[key] = n // 并发写 map → 竞态!甚至可能 panic!
        }(i)
    }
    wg.Wait()
    fmt.Println(data)
}

func main() {
    raceExample()
}

image-20260211024845924

输出:fatal error: concurrent map writes

map 的并发写甚至不需要 race detector,直接 panic!

5.3.2 场景二:共享 slice 并发 append

package main

import (
    "fmt"
    "sync"
)

func main() {
    var results []int // slice 不是并发安全的
    var wg sync.WaitGroup

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            results = append(results, n) // 竞态!
        }(i)
    }

    wg.Wait()
    fmt.Println("期望长度: 100")
    fmt.Println("实际长度:", len(results)) // 可能不是 100!
}

image-20260211025124281

5.3.3 场景三:结构体字段并发访问

package main

import (
    "fmt"
    "sync"
)

type User struct {
    Name    string
    Balance int
}

func main() {
    user := &User{Name: "张三", Balance: 1000}
    var wg sync.WaitGroup

    // 多个 goroutine 同时修改同一个结构体字段
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            user.Balance-- // 竞态!并发修改结构体字段
        }()
    }

    wg.Wait()
    fmt.Printf("期望: %d, 实际: %d\n", 900, user.Balance)
}

5.3.4 场景四:隐蔽的竞态 —— 并发读写 bool/interface

package main

import (
    "fmt"
    "sync"
    "time"
)

// 很多人以为 bool 赋值是原子的 —— 这是错误的!
func main() {
    var running bool = true // 即使是 bool,也有竞态!
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(1 * time.Second)
        running = false // 一个 goroutine 写
        fmt.Println("已设置 running = false")
    }()

    // 另一个 goroutine 读
    wg.Add(1)
    go func() {
        defer wg.Done()
        for running { // 另一个 goroutine 读
            time.Sleep(100 * time.Millisecond)
        }
        fmt.Println("检测到 running 为 false,退出循环")
    }()

    wg.Wait()
}

image-20260211025621366

5.4 修复竞态的四种方法

5.4.1 方法一:sync.Mutex(互斥锁)

package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var mu sync.Mutex // 互斥锁
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            mu.Lock()   // 🔒 加锁:同一时间只有一个 goroutine 能进入
            counter++    // 安全地操作共享变量
            mu.Unlock() // 🔓 解锁:让其他 goroutine 可以进入
        }()
    }

    wg.Wait()
    fmt.Println("期望值: 1000")
    fmt.Println("实际值:", counter) // 一定是 1000
}

5.4.2 方法二:sync.RWMutex(读写锁)

核心规则:

  1. 读锁之间不互斥 → 多个读操作可并发执行
  2. 写锁与读锁互斥 → 写时阻止所有读
  3. 写锁与写锁互斥 → 同时只能一个写操作

主要方法

sync.RWMutex 提供了以下 4 个核心方法:

  • 写操作(独占锁):

    • Lock(): 加写锁。如果已有读锁或写锁,会阻塞直到锁可用。
    • Unlock(): 解写锁。
  • 读操作(共享锁):

    • RLock(): 加读锁。只要没有写锁(或写锁请求),就可以立即获得。
    • RUnlock(): 解读锁。

代码示例:并发安全的 Map

这是 RWMutex 最经典的使用场景。假设我们有一个配置中心,读取配置的请求非常多,但修改配置的操作很少。

package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeMap 包装了一个普通的 map 和一个 RWMutex
type SafeMap struct {
    mu   sync.RWMutex       // 读写锁
    data map[string]string  // 被保护的共享数据
}

// Get 读取数据 -> 使用读锁 (RLock)
func (m *SafeMap) Get(key string) string {
    m.mu.RLock()         // 加读锁(允许多个读者同时进入)
    defer m.mu.RUnlock() // 延迟解读锁(函数返回时自动释放)
    return m.data[key]
}

// Set 修改数据 -> 使用写锁 (Lock)
func (m *SafeMap) Set(key string, value string) {
    m.mu.Lock()         // 加写锁(独占,阻止所有读写)
    defer m.mu.Unlock() // 确保函数退出时解写锁,即使 panic 也会解锁,防止死锁
    m.data[key] = value
}

func main() {
    sm := SafeMap{data: make(map[string]string)}

    // 模拟写入(写少)
    go func() {
        for i := 0; i < 5; i++ {
            sm.Set("config", fmt.Sprintf("v%d", i))
            fmt.Println("Write: Data updated")
            time.Sleep(time.Millisecond * 100)
        }
    }()

    // 模拟并发读取(读多)
    for i := 0; i < 10; i++ {
        go func(id int) {
            for {
                val := sm.Get("config")
                // 多个 reader 可以同时打印,不会互相阻塞
                fmt.Printf("Reader %d got: %s\n", id, val)
                time.Sleep(time.Millisecond * 50)
            }
        }(i)
    }

    time.Sleep(time.Second)
}

关键实现细节:写锁优先(防止写饥饿)

在早期的读写锁设计中,容易出现“写饥饿” (Writer Starvation) 现象:如果读请求源源不断,写者可能永远抢不到锁。

Go 语言的 RWMutex 采用了写锁优先的策略来解决这个问题:

  1. 当一个协程调用 Lock() (申请写锁) 时,它会阻塞等待。
  2. 关键点: 此时,新的 RLock() (读锁) 请求会被阻塞,即使当前持有锁的也是读者。
  3. 系统会等待当前已经持有读锁的协程执行完 RUnlock(),然后立即让等待的写者获得锁。
  4. 这保证了写操作不会因为读流量过大而被无限期推迟。

Mutex vs RWMutex 对比:

sync.Mutex(互斥锁):
Lock() → 独占。不管读还是写,同时只能一个 goroutine。
Unlock()
适用:读写都频繁,或者操作很快

sync.RWMutex(读写锁):
Lock() / Unlock() → 写锁。写的时候独占,所有读写都等待。
RLock() / RUnlock() → 读锁。读的时候共享,多个读可以并发。
适用:读多写少的场景(如配置、缓存)

5.4.3 方法三:sync/atomic(原子操作)

原子操作(Atomic Operation)是指不可分割的操作,就像化学中的"原子"一样,是最小的、不可再分的单位。

在并发编程中,原子操作保证:

  • 要么完全执行
  • 要么完全不执行
  • 不会被其他 goroutine 打断

为什么需要原子操作?

问题场景:数据竞争:

var counter int = 0  // 非原子操作

// 1000 个 goroutine 同时执行 counter++
for i := 0; i < 1000; i++ {
    go func() {
        counter++  // 危险!不是原子的
    }()
}

为什么 counter++ 不安全?

因为 counter++ 实际上是 3 个步骤:

  1. 读取 counter 的值(假设是 5)
  2. 计算 5 + 1 = 6
  3. 写回 6 到 counter

在并发时可能发生:

goroutine A: 读取 counter = 5
goroutine B: 读取 counter = 5  ← 还没来得及写回
goroutine A: 计算 5+1 = 6
goroutine B: 计算 5+1 = 6
goroutine A: 写回 6
goroutine B: 写回 6  ← 丢失了一次累加!

结果:预期 1000,实际可能只有 900 多。

解决方案:

方案 1:加锁(传统方式)

var mu sync.Mutex
mu.Lock()
counter++
mu.Unlock()

缺点:性能开销大(涉及操作系统调度)

方案 2:原子操作

优点:CPU 级别的指令,性能高

atomic.AddInt64(&counter, 1)  // 一步到位,无需加锁

优点:CPU 级别的指令,性能高

Go 原子操作详解

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var counter int64 = 0 // 注意:atomic 操作需要固定位数的类型
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1) // 原子 +1,无需加锁
        }()
    }

    wg.Wait()
    // 读取也要用原子操作
    finalValue := atomic.LoadInt64(&counter)
    fmt.Println("期望值: 1000")
    fmt.Println("实际值:", finalValue) // 一定是 1000 

    fmt.Println("\n--- 其他原子操作 ---")

    // 原子存储
    var flag int32
    atomic.StoreInt32(&flag, 1)
    fmt.Println("flag:", atomic.LoadInt32(&flag)) // 1

    // 原子比较并交换 (CAS - Compare And Swap)
    // 如果当前值是 1,就换成 2
    swapped := atomic.CompareAndSwapInt32(&flag, 1, 2)
    fmt.Println("CAS 成功:", swapped)                  // true
    fmt.Println("flag:", atomic.LoadInt32(&flag))       // 2

    // 再试一次,当前值是2不是1,所以不会交换
    swapped = atomic.CompareAndSwapInt32(&flag, 1, 3)
    fmt.Println("CAS 成功:", swapped)                  // false
    fmt.Println("flag:", atomic.LoadInt32(&flag))       // 仍然是 2

    // Go 1.19+ 新增的泛型原子类型
    var atomicBool atomic.Bool
    atomicBool.Store(true)
    fmt.Println("atomicBool:", atomicBool.Load()) // true

    var atomicValue atomic.Value
    atomicValue.Store("hello")
    fmt.Println("atomicValue:", atomicValue.Load()) // hello
}

Context —— 并发控制的"遥控器"

6.1 为什么需要 Context?

你是项目经理(main goroutine),派了一个团队去做项目:

  • 张三负责前端(goroutine 1)
  • 李四负责后端(goroutine 2)
  • 王五负责数据库(goroutine 3,由李四启动的子goroutine)

问题来了:

  1. 客户突然说"项目取消了",你怎么通知所有人停下?
  2. 项目有 deadline(截止时间),到期了怎么让所有人自动停下?
  3. 你想传递一些项目信息(如项目ID)给所有人

如果没有 Context:

  • 你要给每个人单独打电话通知(每个 goroutine 一个 channel)
  • 王五是李四启动的,你甚至联系不到他
  • 非常混乱!

有了 Context:

  • 你有一个"群广播"(context),一键通知所有人
  • 支持"到期自动通知"
  • 支持"附带信息"
  • 子 goroutine 也会自动收到通知(树状传递)

6.2 Context 的基本概念

Context 是 Go 的上下文管理机制,用于:

  1. 取消信号传递:通知 goroutine 停止工作
  2. 超时控制:自动取消超时的操作
  3. 截止时间:设置任务的最后期限
  4. 传递请求范围的值:(不推荐滥用)

核心接口

type Context interface {
    // 返回 context 的截止时间(如果有)
    Deadline() (deadline time.Time, ok bool)

    // 返回一个 channel,当 context 被取消时关闭
    Done() <-chan struct{}

    // 返回取消的原因
    Err() error

    // 返回与 context 关联的值
    Value(key interface{}) interface{}
}

6.3 Context 的四种创建方式

1. Background 和 TODO

// Background:根 context,通常用于 main 函数、初始化、测试
ctx := context.Background()

// TODO:当不确定用什么 context 时使用(占位符)
ctx := context.TODO()

它们的底层实现完全一致(都是 emptyCtx),只是语义不同:

context.Background():通常作为所有 Context 树的根节点(Root),用于主函数、初始化或测试代码中。

context.TODO():用于当你还不确定要用什么 Context,或者代码还在开发中(To Do)时作为一个占位符。它明确告诉阅读代码的人:“这里未来可能会传入一个具体的 Context” 。

2. WithCancel:手动取消

用于手动控制 Goroutine 的停止。

用法: ctx, cancel := context.WithCancel(parent)

原理:

  • 返回一个 cancel 函数。调用这个函数会关闭内部的 Done Channel。
  • 利用 Go 的 Channel 广播机制:关闭 Channel 会通知所有监听该 Channel 的 Goroutine(case <-ctx.Done(): 会立即解除阻塞)。
  • 级联取消: WithCancel 会构建父子关系。当父 Context 取消时,它会遍历并取消所有子 Context;但子 Context 取消不会影响父 Context 。
ctx, cancel := context.WithCancel(context.Background())

go func() {
    // 监听取消信号
    <-ctx.Done()
    fmt.Println("任务被取消了:", ctx.Err())
}()

// 手动取消
time.Sleep(1 * time.Second)
cancel()  // 调用 cancel() 会关闭 Done() channel

3. WithTimeout:超时自动取消

// 3秒后自动取消
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()  // 养成好习惯:总是 defer cancel()

go func() {
    select {
    case <-time.After(5 * time.Second):
        fmt.Println("任务完成")
    case <-ctx.Done():
        fmt.Println("超时取消:", ctx.Err())  // context deadline exceeded
    }
}()

time.Sleep(4 * time.Second)

4. WithDeadline:指定截止时间

底层逻辑与 WithCancel 类似,只是多了一个定时器。一旦时间到了,会自动调用 cancel 函数

区分取消原因: 可以通过 ctx.Err() 判断任务是因为手动取消(Canceled)还是因为超时(DeadlineExceeded)结束的

// 指定绝对时间点
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

go func() {
    <-ctx.Done()
    fmt.Println("截止时间到:", ctx.Err())
}()

time.Sleep(3 * time.Second)

5. WithValue:传递值(慎用!)

// 传递请求ID
ctx := context.WithValue(context.Background(), "requestID", "12345")

go func(ctx context.Context) {
    requestID := ctx.Value("requestID").(string)
    fmt.Println("请求ID:", requestID)
}(ctx)

警告

  • 不要用 Context 传递函数参数!
  • 只用于传递请求范围的元数据(如请求ID、用户身份等)
  • 优先使用函数参数,Context 只是补充

6.4 Context 的树状传递(父子关系)

1. Context 树的构成

Context 的树状结构并非一开始就存在,而是通过派生(Derivation)逐步建立的。

  • 根节点 (Root): 树的起点通常是 context.Background()(用于 main 函数、初始化)或 context.TODO()(用于占位)。它们是空的 Context,没有值,也不会被取消。
  • 子节点 (Children): 通过 WithCancelWithDeadlineWithTimeoutWithValue 函数,基于一个“父 Context”创建一个新的“子 Context”。

2. 信号传递:自上而下的取消

这是 Context 最重要的特性。取消信号只向叶子节点方向传递,不可逆流。

传递规则:

  1. 父死子必亡: 当父 Context 被取消(或超时)时,它会自动关闭自己的 Done channel。所有基于该父 Context 派生的子 Context(以及孙 Context)都会收到取消信号,它们的 Done channel 也会被关闭。
  2. 子死父不受影响: 子 Context 被取消时,只会影响它自己以及它的子树,不会影响父 Context 或兄弟 Context。

image-20260211014021393

package main

import (
    "context"
    "fmt"
    "time"
)

func serviceA(ctx context.Context) {
    // serviceA 创建子 context,传给 serviceB
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    go serviceB(ctx)

    select {
    case <-ctx.Done():
        fmt.Println("⛔ ServiceA 停止:", ctx.Err())
    }
}

func serviceB(ctx context.Context) {
    // serviceB 创建子 context,传给 serviceC
    ctx = context.WithValue(ctx, "requestID", "req-123")

    go serviceC(ctx)

    select {
    case <-ctx.Done():
        fmt.Println("⛔ ServiceB 停止:", ctx.Err())
    }
}

func serviceC(ctx context.Context) {
    reqID := ctx.Value("requestID") // 可以获取父 context 传递的值

    for {
        select {
        case <-ctx.Done():
            fmt.Printf("⛔ ServiceC 停止 (requestID=%v): %v\n", reqID, ctx.Err())
            return
        default:
            fmt.Printf("🔄 ServiceC 工作中 (requestID=%v)\n", reqID)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    // 顶层 context
    ctx, cancel := context.WithCancel(context.Background())

    go serviceA(ctx)

    time.Sleep(2 * time.Second)

    fmt.Println("\n🔴 Main: 取消顶层 context")
    cancel() // 取消顶层,所有子 context 都会被取消!

    time.Sleep(1 * time.Second)
    fmt.Println("✅ 程序结束")
}

Worker Pool —— 并发任务的"流水线工厂"

7.1 为什么需要 Worker Pool?

假设你开了一家快递分拣中心:

没有 Worker Pool(暴力并发):
今天有 10000 个包裹要分拣
你雇了 10000 个临时工,每人分拣一个
结果:场地挤不下,工人互相碰撞,效率反而更低!

有 Worker Pool(流水线模式):
你雇了 10 个固定员工(Worker)
包裹放在传送带上(Job Channel)
每个员工从传送带取一个包裹,分拣完再取下一个
分拣好的包裹放到另一条传送带(Result Channel)
结果:井然有序,效率最高!

Worker Pool 解决的核心问题:

  1. 资源控制:限制并发数量,防止 goroutine 爆炸(OOM)
  2. 复用 goroutine:避免频繁创建和销毁的开销
  3. 背压控制:任务太多时自动排队,不会压垮系统
  4. 有序收集结果:通过 result channel 统一收集处理结果

7.2 最简单的 Worker Pool

package main

import (
    "fmt"
    "sync"
    "time"
)

// worker 从 jobs channel 接收任务,处理后将结果发送到 results channel
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()

    for job := range jobs { // 不断从 jobs 中取任务,直到 jobs 被关闭
        fmt.Printf("Worker %d: 开始处理任务 %d\n", id, job)
        time.Sleep(500 * time.Millisecond) // 模拟耗时操作
        result := job * 2                  // 模拟处理逻辑
        fmt.Printf("Worker %d: 任务 %d 完成,结果 = %d\n", id, job, result)
        results <- result // 将结果发送到 results channel
    }
}

func main() {
    const numWorkers = 3 // 工人数量
    const numJobs = 10   // 任务数量

    jobs := make(chan int, numJobs)       // 任务队列(带缓冲)
    results := make(chan int, numJobs)    // 结果队列(带缓冲)

    // ========== 第1步:启动 Worker ==========
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // ========== 第2步:发送任务 ==========
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 所有任务发送完毕,关闭 jobs channel

    // ========== 第3步:等待所有 Worker 完成,然后关闭 results ==========
    // 用一个额外的 goroutine 来等待,避免阻塞主 goroutine
    go func() {
        wg.Wait()
        close(results) // 所有 worker 完成后,关闭 results channel
    }()

    // ========== 第4步:收集结果 ==========
    var total int
    for result := range results {
        total += result
    }
    fmt.Printf("\n所有任务完成!结果总和 = %d\n", total)
}

输出示例:

Worker 3: 开始处理任务 1
Worker 1: 开始处理任务 2
Worker 2: 开始处理任务 3
Worker 3: 任务 1 完成,结果 = 2
Worker 3: 开始处理任务 4
Worker 1: 任务 2 完成,结果 = 4
Worker 1: 开始处理任务 5
Worker 2: 任务 3 完成,结果 = 6
Worker 2: 开始处理任务 6
...
所有任务完成!结果总和 = 110

执行流程图解:

时间 →→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→
Worker1: [任务1处理中...] [任务4处理中...] [任务7处理中...] [任务10]
Worker2: [任务2处理中...] [任务5处理中...] [任务8处理中...]
Worker3: [任务3处理中...] [任务6处理中...] [任务9处理中...]

3个 Worker 并发处理 10 个任务,每个 Worker 处理完一个就取下一个

7.3 理解 Worker Pool 的关键细节

为什么用 for job := range jobs

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // 处理任务...
    }
    // range 退出 = jobs channel 已关闭且数据已读完
    // 此时 worker 自然退出,defer wg.Done() 执行
}

range 会持续从 channel 读取数据。当 jobsclose() && 缓冲区清空后,range 自动退出循环。这比手动检查 ok 更简洁。

为什么要单开一个 goroutine 等待并关闭 results ?

// 错误做法:在主 goroutine 中先 Wait 再 range
wg.Wait()       // 阻塞在这里等 worker 完成
close(results)
for r := range results { ... }  // 永远执行不到这里!因为 worker 往 results 发送时也会阻塞

// 正确做法:开一个 goroutine 去等待
go func() {
    wg.Wait()
    close(results)
}()
for r := range results { ... }  // 主 goroutine 可以一边消费 results 一边等

主 goroutine 有两件事要做:消费 results 和 等待 wg 归零。如果串行做,会导致死锁——worker 写 results 时可能阻塞(没人读),而主 goroutine 在 wg.Wait() 等 worker 完成。所以需要并行:主 goroutine 负责消费 results,另一个 goroutine 负责等待并关闭。

7.4 带 Context 取消的 Worker Pool

在实际场景中,我们往往需要在任务执行过程中取消所有 Worker(比如用户取消请求、超时等)。

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Job 表示一个任务
type Job struct {
    ID       int
    Payload  string
}

// Result 表示任务的处理结果
type Result struct {
    Job      Job
    Output   string
    WorkerID int
}

func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case <-ctx.Done():
            // 收到取消信号,立即退出
            fmt.Printf("Worker %d: 收到取消信号,退出\n", id)
            return

        case job, ok := <-jobs:
            if !ok {
                // jobs channel 已关闭,没有更多任务了
                fmt.Printf("Worker %d: 没有更多任务,退出\n", id)
                return
            }

            // 模拟处理任务(可能耗时)
            fmt.Printf("Worker %d: 处理任务 %d (%s)\n", id, job.ID, job.Payload)
            duration := time.Duration(rand.Intn(3)+1) * time.Second

            // 在处理过程中也要检查取消信号
            select {
            case <-ctx.Done():
                fmt.Printf("Worker %d: 任务 %d 处理中被取消\n", id, job.ID)
                return
            case <-time.After(duration):
                // 处理完成
                results <- Result{
                    Job:      job,
                    Output:   fmt.Sprintf("结果_%s_已处理", job.Payload),
                    WorkerID: id,
                }
            }
        }
    }
}

func main() {
    // 创建可取消的 context,3秒后自动取消
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    const numWorkers = 3
    jobs := make(chan Job, 10)
    results := make(chan Result, 10)

    // 启动 Worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(ctx, w, jobs, results, &wg)
    }

    // 发送任务(可能发到一半就被取消了)
    go func() {
        for i := 1; i <= 20; i++ {
            select {
            case <-ctx.Done():
                fmt.Println("任务发送被取消,停止发送")
                close(jobs)
                return
            case jobs <- Job{ID: i, Payload: fmt.Sprintf("数据_%d", i)}:
            }
        }
        close(jobs)
    }()

    // 等待所有 worker 结束并关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    var count int
    for result := range results {
        count++
        fmt.Printf("收到结果: 任务%d 由Worker%d完成 → %s\n",
            result.Job.ID, result.WorkerID, result.Output)
    }

    fmt.Printf("\n总共完成 %d 个任务(3秒超时)\n", count)
}

关键改进点:

  1. select 双监听:Worker 同时监听 ctx.Done()jobs,取消信号到来时优先退出
  2. 任务处理中也检查取消:模拟耗时操作时用 select 监听,而不是裸 time.Sleep
  3. 发送端也检查取消:避免向已经无人消费的 channel 继续发送导致阻塞

7.5 动态调整 Worker 数量

有时候任务量波动很大,我们希望根据负载动态增加或减少 Worker。

package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// Pool 是一个可动态调整大小的 Worker Pool
type Pool struct {
    jobs       chan int            // 任务队列
    results    chan int            // 结果队列
    workerNum  int64              // 当前 worker 数量(原子操作)
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

// NewPool 创建一个新的 Worker Pool
func NewPool(bufferSize int) *Pool {
    ctx, cancel := context.WithCancel(context.Background())
    return &Pool{
        jobs:    make(chan int, bufferSize),
        results: make(chan int, bufferSize),
        ctx:     ctx,
        cancel:  cancel,
    }
}

// AddWorker 添加一个 Worker
func (p *Pool) AddWorker() {
    p.wg.Add(1)
    id := atomic.AddInt64(&p.workerNum, 1)

    go func(workerID int64) {
        defer p.wg.Done()
        defer atomic.AddInt64(&p.workerNum, -1)

        fmt.Printf("🟢 Worker %d 上线\n", workerID)

        for {
            select {
            case <-p.ctx.Done():
                fmt.Printf("🔴 Worker %d 下线(Pool关闭)\n", workerID)
                return
            case job, ok := <-p.jobs:
                if !ok {
                    fmt.Printf("🔴 Worker %d 下线(无更多任务)\n", workerID)
                    return
                }
                // 处理任务
                time.Sleep(200 * time.Millisecond)
                p.results <- job * 10
            }
        }
    }(id)
}

// Submit 提交任务
func (p *Pool) Submit(job int) {
    p.jobs <- job
}

// Shutdown 优雅关闭
func (p *Pool) Shutdown() {
    close(p.jobs)  // 先关闭任务队列,让 worker 自然退出
    p.wg.Wait()    // 等待所有 worker 完成
    close(p.results)
}

// ForceShutdown 强制关闭(通过 context 取消)
func (p *Pool) ForceShutdown() {
    p.cancel()     // 发送取消信号
    p.wg.Wait()    // 等待所有 worker 响应取消
    close(p.results)
}

func main() {
    pool := NewPool(20)

    // 一开始启动 2 个 Worker
    fmt.Println("=== 初始:2 个 Worker ===")
    pool.AddWorker()
    pool.AddWorker()

    // 提交一批任务
    go func() {
        for i := 1; i <= 15; i++ {
            pool.Submit(i)
            fmt.Printf("提交任务 %d\n", i)
        }
    }()

    time.Sleep(1 * time.Second)

    // 发现任务堆积,动态增加 Worker
    fmt.Println("\n=== 负载增加,扩容到 5 个 Worker ===")
    pool.AddWorker()
    pool.AddWorker()
    pool.AddWorker()
    fmt.Printf("当前 Worker 数: %d\n", atomic.LoadInt64(&pool.workerNum))

    // 在另一个 goroutine 收集结果
    go func() {
        for result := range pool.results {
            fmt.Printf("收到结果: %d\n", result)
        }
    }()

    time.Sleep(3 * time.Second)

    // 优雅关闭
    fmt.Println("\n=== 关闭 Pool ===")
    pool.Shutdown()
    fmt.Println(" Pool 已关闭")
}

7.6 实战:并发爬虫 Worker Pool

模拟一个并发 URL 爬取场景,用 Worker Pool 控制并发请求数量。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// CrawlJob 爬取任务
type CrawlJob struct {
    URL string
}

// CrawlResult 爬取结果
type CrawlResult struct {
    URL        string
    StatusCode int
    Body       string
    Duration   time.Duration
    WorkerID   int
    Err        error
}

// crawlWorker 爬虫 Worker
func crawlWorker(id int, jobs <-chan CrawlJob, results chan<- CrawlResult, wg *sync.WaitGroup) {
    defer wg.Done()

    for job := range jobs {
        start := time.Now()

        // 模拟 HTTP 请求(实际项目中使用 http.Get)
        time.Sleep(time.Duration(rand.Intn(2000)+500) * time.Millisecond)

        // 模拟结果
        statusCode := 200
        if rand.Float32() < 0.1 { // 10% 概率失败
            statusCode = 500
        }

        results <- CrawlResult{
            URL:        job.URL,
            StatusCode: statusCode,
            Body:       fmt.Sprintf("<html>%s的内容</html>", job.URL),
            Duration:   time.Since(start),
            WorkerID:   id,
        }
    }
}

func main() {
    // 模拟待爬取的 URL 列表
    urls := []string{
        "https://example.com/page/1",
        "https://example.com/page/2",
        "https://example.com/page/3",
        "https://example.com/page/4",
        "https://example.com/page/5",
        "https://example.com/page/6",
        "https://example.com/page/7",
        "https://example.com/page/8",
        "https://example.com/page/9",
        "https://example.com/page/10",
    }

    const numWorkers = 3 // 限制并发数为 3,避免被目标网站封禁

    jobs := make(chan CrawlJob, len(urls))
    results := make(chan CrawlResult, len(urls))

    // 启动 Worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go crawlWorker(w, jobs, results, &wg)
    }

    // 发送任务
    start := time.Now()
    for _, url := range urls {
        jobs <- CrawlJob{URL: url}
    }
    close(jobs)

    // 等待完成并关闭结果通道
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集并统计结果
    var successCount, failCount int
    for result := range results {
        if result.StatusCode == 200 {
            successCount++
            fmt.Printf("✅ [Worker%d] %s → %d (耗时 %v)\n",
                result.WorkerID, result.URL, result.StatusCode, result.Duration)
        } else {
            failCount++
            fmt.Printf("❌ [Worker%d] %s → %d (耗时 %v)\n",
                result.WorkerID, result.URL, result.StatusCode, result.Duration)
        }
    }

    elapsed := time.Since(start)
    fmt.Printf("\n========== 爬取统计 ==========\n")
    fmt.Printf("总 URL 数: %d\n", len(urls))
    fmt.Printf("成功: %d, 失败: %d\n", successCount, failCount)
    fmt.Printf("并发数: %d\n", numWorkers)
    fmt.Printf("总耗时: %v\n", elapsed)
    fmt.Printf("(串行预估耗时: %v+)\n", time.Duration(len(urls))*time.Second)
}

输出示例:

✅ [Worker2] https://example.com/page/2 → 200 (耗时 732ms)
✅ [Worker1] https://example.com/page/1 → 200 (耗时 1.15s)
✅ [Worker3] https://example.com/page/3 → 200 (耗时 1.85s)
✅ [Worker2] https://example.com/page/4 → 200 (耗时 1.02s)
❌ [Worker1] https://example.com/page/5 → 500 (耗时 580ms)
...

========== 爬取统计 ==========
总 URL 数: 10
成功: 9, 失败: 1
并发数: 3
总耗时: 5.2s
(串行预估耗时: 10s+)

7.7 Worker Pool 模式总结

核心三件套:

jobs channel   →   N 个 Worker goroutine   →   results channel
  (任务入口)          (并发处理)                   (结果出口)

设计要点:

  1. Worker 数量:根据任务类型选择

    • CPU 密集型:Worker 数 ≈ CPU 核心数(runtime.NumCPU()
    • IO 密集型:Worker 数可以更多(10~100),因为大部分时间在等待 IO
  2. Channel 缓冲区大小

    • 太小:任务生产者会频繁阻塞
    • 太大:占用过多内存
    • 经验值:Worker 数量的 2~5 倍
  3. 优雅关闭顺序

    • 停止发送新任务(close(jobs)
    • 等待 Worker 处理完剩余任务(wg.Wait()
    • 关闭结果通道(close(results)
  4. 错误处理:Result 结构体中包含 Err 字段,让调用者决定如何处理

与直接创建 goroutine 的对比:

image-20260213210418302

0

评论 (0)

取消