首页
关于
Search
1
Golang闭包
10 阅读
2
ROS2(二):第一个自定义 Python 节点从编写到运行
9 阅读
3
Golang并发
7 阅读
4
Golang匿名函数
3 阅读
5
Golang Time包入门
2 阅读
默认分类
编程
ROS
登录
Search
tqtqtq
累计撰写
11
篇文章
累计收到
1
条评论
首页
栏目
默认分类
编程
ROS
页面
关于
搜索到
8
篇与
的结果
2026-04-16
Leetcode 239. 滑动窗口最大值
239. 滑动窗口最大值题目描述给你一个整数数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。 你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回 滑动窗口中的最大值。示例 1输入:nums = [1,3,-1,-3,5,3,6,7], k = 3 输出:[3,3,5,5,6,7] 解释: 滑动窗口的位置 最大值 --------------- ----- [1 3 -1] -3 5 3 6 7 3 1 [3 -1 -3] 5 3 6 7 3 1 3 [-1 -3 5] 3 6 7 5 1 3 -1 [-3 5 3] 6 7 5 1 3 -1 -3 [5 3 6] 7 6 1 3 -1 -3 5 [3 6 7] 7示例 2输入:nums = [1], k = 1 输出:[1]提示1 <= nums.length <= 10^5-10^4 <= nums[i] <= 10^41 <= k <= nums.length题解笔记:暴力、大顶堆、单调队列一、先理解题目本质这道题的核心是:窗口大小固定为 k窗口不断向右滑动每次都要快速得到当前窗口的最大值最直接的想法是: 每形成一个窗口,就把窗口里的元素都扫一遍,找最大值。这就是暴力解法。但如果数组很长,每次都重新扫,会有很多重复计算,所以就会继续优化到:大顶堆单调队列解法一:暴力解法思路每次窗口形成后,直接遍历这个窗口里的所有元素,找到最大值。例如窗口是:[3, -1, -3]那就把这 3 个数重新比较一遍,得到最大值 3。代码from typing import List class Solution: def maxSlidingWindow(self, nums: List[int], k: int) -> List[int]: ans = [] for left in range(len(nums) - k + 1): max_num = nums[left] for right in range(left, left + k): max_num = max(max_num, nums[right]) ans.append(max_num) return ans复杂度时间复杂度:O(n * k)空间复杂度:O(1)(不算返回结果)优缺点优点最容易理解非常适合刚开始学这题时打基础缺点每个窗口都重新扫描一遍重复计算很多数据大时容易超时解法二:大顶堆先说结论大顶堆可以做,而且比暴力解法更高效。 不过它不是最优解,通常复杂度是:时间复杂度:O(n log n)(严格说更稳妥)空间复杂度:O(n)1. 为什么会想到堆因为这题每次都在问:当前窗口里的最大值是谁?而堆最擅长做的事就是:插入一个数快速拿到最大值(或最小值)所以一个很自然的想法就是:把窗口相关元素放进堆里每次从堆顶拿最大值2. 但为什么不能直接只存值问题在于: 窗口会向右移动,旧元素会离开窗口。例如:[3, -1, -3]下一步变成:[-1, -3, 5]这里的 3 已经离开窗口了。 但如果它还在堆里,并且还在堆顶,那就会影响答案。所以只存值不够,还必须存:值下标3. 为什么 Python 里要存 (-值, 下标)因为 Python 的 heapq 默认是小顶堆,没有原生大顶堆。所以我们常用一个技巧:把值取负原本最大的值,负数后反而最小这样小顶堆就能模拟大顶堆例如数字 5,下标为 4,会存成:(-5, 4)其中:-5:用来模拟大顶堆4:表示它原来的下标4. 什么叫“过期元素”假设当前窗口范围是:[left, right]如果某个元素的下标:idx < left就说明它已经滑出窗口,不能再参与当前窗口最大值比较了。这类元素就叫:过期元素5. 什么叫“懒删除”堆不擅长删除中间任意位置的元素。所以我们不主动去堆里搜索并删除所有过期元素,而是:只要堆顶过期,就把堆顶弹出一直弹到堆顶合法为止这就叫:懒删除代码里通常是:while heap[0][1] < left: heapq.heappop(heap)代码from typing import List import heapq class Solution: def maxSlidingWindow(self, nums: List[int], k: int) -> List[int]: heap = [] ans = [] for i, num in enumerate(nums): # 入堆:(-值, 下标) heapq.heappush(heap, (-num, i)) # 窗口形成后再开始记录答案 if i >= k - 1: left = i - k + 1 # 懒删除:弹掉所有过期堆顶 while heap[0][1] < left: heapq.heappop(heap) # 堆顶就是当前窗口最大值 ans.append(-heap[0][0]) return ans堆解法动画复杂度时间复杂度:O(n log n)空间复杂度:O(n)优缺点优点思路比单调队列更直观很适合从暴力解法继续进阶也能解决这道题缺点不是最优解代码里要理解“过期元素”和“懒删除”堆中可能保留一些已经过期但暂时不在堆顶的元素解法三:单调队列先说结论这是这道题的最优解。时间复杂度:O(n)空间复杂度:O(k)1. 核心思想我们希望队列里维护的是:可能成为当前窗口最大值的元素下标并且让这些下标对应的值,保持单调递减。也就是说:队首最大队尾最小这样每次窗口形成后:队首就是当前窗口最大值的下标直接取 nums[deque[0]] 就行2. 为什么要存下标,不直接存值因为窗口会移动,我们需要判断队首元素是否已经离开窗口。如果只存值,就不知道它对应的是哪个位置。所以单调队列里存的是:下标3. 队列维护规则遍历到 i 时,做三件事。第一步:移除队首过期元素如果队首下标已经不在窗口内:deque[0] < i - k + 1就把它弹掉。第二步:保持单调递减如果当前元素 nums[i] 比队尾对应的值还大,那么队尾那个元素以后就不可能成为最大值了。因为:它比当前元素小它还比当前元素更早过期所以直接弹出队尾,直到队列重新单调递减。第三步:把当前下标加入队尾这样队列中保留的,都是未来仍有机会成为最大值的元素。代码from typing import List from collections import deque class Solution: def maxSlidingWindow(self, nums: List[int], k: int) -> List[int]: q = deque() ans = [] for i, num in enumerate(nums): # 1. 删除窗口左边已经过期的下标 while q and q[0] < i - k + 1: q.popleft() # 2. 维护单调递减队列 while q and nums[q[-1]] < num: q.pop() # 3. 当前下标加入队尾 q.append(i) # 4. 窗口形成后,队首就是最大值 if i >= k - 1: ans.append(nums[q[0]]) return ans复杂度时间复杂度:O(n)空间复杂度:O(k)为什么是 O(n)因为每个元素最多只会:进队一次出队一次不会反复进出,所以总操作次数和 n 成正比。优缺点优点最优解性能最好是这道题的经典标准答案缺点对初学者来说抽象一点第一次接触“单调队列”时不如堆直观
2026年04月16日
2 阅读
0 评论
0 点赞
2026-03-15
Python核心数据结构
Python核心数据结构1. List(列表):灵活的多面手列表是 Python 中最常用的数据结构,就像一个可以随时增删改查的动态数组。特性:有序(按添加顺序排列)、可变(随时可以修改元素)、允许重复。适用场景:需要按顺序存储同类或不同类数据,且后续需要频繁修改数据的场景(如:待办事项列表、爬虫抓取的数据集)。创建与核心操作# 创建列表 my_list = [1, 2, "hello", 3.14, 1] # 增加元素 my_list.append([3, 4]) # 在末尾追加,将列表 [3, 4] 作为单个元素添加,结果为:[1, 2, "hello", 3.14, 1, [3, 4]] my_list.insert(1, "插入") # 在指定索引位置插入 my_list.extend([4, 5]) # 将另一个列表的元素拆散合并进来 # 删除元素 my_list.remove("hello") # 删除第一个匹配的元素 popped_item = my_list.pop() # 移除并返回最后一个元素 # 修改与查询 my_list[0] = 100 # 修改指定索引的值 print(my_list[1]) # 通过索引访问2. Tuple(元组):带锁的列表元组和列表非常相似,但它一旦创建,里面的元素就不能被修改(Immutable)。特性:有序、不可变、允许重复。适用场景:存储不需要更改的数据(如:地理坐标经纬度、数据库查询返回的单条记录)。因为不可变,元组的内存占用比列表小,遍历速度稍快,且可以作为字典的 Key(键)。创建与核心操作# 创建元组 my_tuple = (1, 2, "hello", 1) single_tuple = (1,) # 注意:只有一个元素的元组必须加逗号 # 访问元素 (与列表相同) print(my_tuple[0]) # 输出: 1 # 核心方法 (因为不可变,所以只有查询方法) print(my_tuple.count(1)) # 统计元素 1 出现的次数 (输出: 2) print(my_tuple.index("hello")) # 查找元素的索引位置 """ 如果尝试修改,会抛出TypeError: 't = (1, 2, 3) result = t[0] = 100 # 尝试修改 tuple' object does not support item assignment """ 注意:虽然元组本身不可变,但如果元组里面包含可变对象(如列表),该列表内部的元素是可以修改的。3. Dict(字典):高效的键值对字典就像一本真正的字典,通过唯一的词条(Key)去快速查找对应的释义(Value)。特性:键值对映射、可变、Key 必须唯一且不可变(如字符串、数字、元组)、Value 可以是任意类型。(注:Python 3.7+ 版本后,字典默认保留插入顺序)。适用场景:需要通过特定标识符快速检索数据的场景(如:用户个人信息、配置参数存储、JSON数据处理)。创建与核心操作# 创建字典 my_dict = {"name": "Alice", "age": 25, "city": "New York"} # 增加/修改元素 my_dict["job"] = "Engineer" # 增加新键值对 my_dict["age"] = 26 # 修改已有键的值 # 删除元素 del my_dict["city"] # 删除键值对 age = my_dict.pop("age") # 移除并返回对应的值 # 查询元素 print(my_dict["name"]) # 可能会触发 KeyError 如果键不存在 print(my_dict.get("phone", "Not Found")) # 安全获取,不存在则返回默认值 # 遍历字典 for key, value in my_dict.items(): print(f"{key}: {value}")4. Set(集合):去重与数学运算利器集合就像是一个没有 Value 的字典,它里面只存放独一无二的元素。特性:无序(不支持索引)、可变、元素必须唯一且不可变。适用场景:数据快速去重、测试成员是否属于某个群体、进行数学集合运算(交集、并集、差集)。创建与核心操作# 创建集合 my_set = {1, 2, 3, 3, 4} # 重复的 3 会被自动过滤,结果为 {1, 2, 3, 4} empty_set = set() # 注意:空集合必须用 set(),不能用 {} (那是空字典) # 增删元素 my_set.add(5) # 增加元素 my_set.remove(2) # 移除元素 (如果不存在会报错) my_set.discard(10) # 移除元素 (不存在不会报错) # 集合数学运算 set_a = {1, 2, 3} set_b = {3, 4, 5} print(set_a | set_b) # 并集: {1, 2, 3, 4, 5} (也可用 set_a.union(set_b)) print(set_a & set_b) # 交集: {3} (也可用 set_a.intersection(set_b)) print(set_a - set_b) # 差集: {1, 2} (也可用 set_a.difference(set_b))综合对比总结数据结构符号有序性可变性是否允许重复核心优势 / 用途List (列表)[]有序可变允许灵活存储和操作一连串数据Tuple (元组)()有序不可变允许保护数据不被修改,比列表更省内存Dict (字典){}保持插入顺序可变Key 唯一,Value 允许极其快速的键值查找 (O(1) 复杂度)Set (集合){}无序可变不允许快速去重,强大的集合数学运算
2026年03月15日
1 阅读
0 评论
0 点赞
2026-03-15
Python列表推导式
Python列表推导式1. 基本语法列表推导式的核心语法如下:[expression for item in iterable]expression: 计算结果将被添加到新列表中的表达式。它可以是对 item 的操作,或者仅仅是 item 本身。item: 代表 iterable 中当前遍历到的元素。iterable: 任何可以被遍历的对象(例如列表、字符串、range 对象等)。示例:创建一个包含 0 到 9 平方的列表传统方法 (使用 for 循环):squares = [] for x in range(10): squares.append(x**2) print(squares) # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]使用列表推导式:squares = [x**2 for x in range(10)] print(squares) # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]2. 添加条件判断 (过滤)你可以在列表推导式中添加 if 语句,以过滤掉不满足条件的元素。[expression for item in iterable if condition]condition: 一个布尔表达式。只有当 condition 为 True 时,对应的 expression 结果才会被添加到新列表中。示例:获取 0 到 19 之间的所有偶数evens = [x for x in range(20) if x % 2 == 0] print(evens) # 输出: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]示例:过滤出长度大于 3 的单词,并转换为大写words = ["apple", "bat", "cat", "banana", "dog"] long_words = [word.upper() for word in words if len(word) > 3] print(long_words) # 输出: ['APPLE', 'BANANA']3. 使用 if-else 结构当你想根据条件改变 expression 的结果,而不是过滤掉元素时,你需要将 if-else 结构放在 for 循环之前。[expression_if_true if condition else expression_if_false for item in iterable]示例:将列表中的正数保持不变,将负数替换为 0numbers = [1, -5, 3, -2, 8] processed_numbers = [x if x > 0 else 0 for x in numbers] print(processed_numbers) # 输出: [1, 0, 3, 0, 8]4. 嵌套循环列表推导式也支持多个 for 循环,这就相当于嵌套的 for 循环。[expression for item1 in iterable1 for item2 in iterable2]示例:生成两个列表元素的笛卡尔积colors = ["red", "blue"] sizes = ["S", "M", "L"] combinations = [(color, size) for color in colors for size in sizes] print(combinations) # 输出: [('red', 'S'), ('red', 'M'), ('red', 'L'), ('blue', 'S'), ('blue', 'M'), ('blue', 'L')]等效的传统代码:combinations = [] for color in colors: for size in sizes: combinations.append((color, size))5. 多维列表的展平可以使用嵌套的列表推导式将一个二维列表(或多维)展平为一个一维列表。示例:展平一个二维矩阵matrix = [ [1, 2, 3], [4, 5, 6], [7, 8, 9] ] flattened = [num for row in matrix for num in row] print(flattened) # 输出: [1, 2, 3, 4, 5, 6, 7, 8, 9]
2026年03月15日
2 阅读
0 评论
0 点赞
2026-02-27
Golang错误机制
Golang错误机制Go的错误处理哲学Go的设计者认为:错误是常态,应该显式处理。// Go的方式:错误是返回值的一部分,你必须处理 result, err := 做某事() if err != nil { // 出错了!必须在这里处理 return } // 没出错,继续使用result核心原则:不隐藏错误,强制开发者面对问题在 Go 语言中,错误(Error)就像普通的数据一样被服务员平平静静地“递”回来。Go 鼓励程序员在每一步都主动检查:“刚才的任务成功了吗?”error接口——错误的本质在Go中,error就是一个接口:// error接口的定义(内置在Go中) type error interface { Error() string // 只要实现了Error()方法,就是error }任何类型,只要有一个 Error() string 方法,就可以当作错误使用。// 定义一个包含状态码的专属错误 type MyVIPError struct { Code int // 错误代码,比如 404 Message string // 错误信息 } // 只要它有 "Error()" ,Go 就认它是一个错误 func (e *MyVIPError) Error() string { // 把代码和信息拼起来 return fmt.Sprintf("错误码: %d, 原因: %s", e.Code, e.Message) }创建最基本的错误package main import ( "errors" // 标准库提供的错误工具包 "fmt" ) func main() { // 方式1:使用errors.New创建简单错误 err1 := errors.New(" Something went wrong") fmt.Println(err1) // 输出: Something went wrong // 方式2:使用fmt.Errorf创建格式化错误 name := "Alice" age := -5 err2 := fmt.Errorf("invalid age: %d for user %s", age, name) fmt.Println(err2) // 输出: invalid age: -5 for user Alice }errors.New = 写一张简单的便签fmt.Errorf = 写一张可以填入信息的模板便签函数如何返回错误多返回值Go函数可以一次返回多个值,这是错误处理的基础:package main import ( "errors" "fmt" ) // 函数返回两个值:(结果, 错误) // 如果出错,结果通常为零值,错误不为nil // 如果成功,错误为nil func divide(a, b float64) (float64, error) { if b == 0 { // 除数为0,返回错误 return 0, errors.New("cannot divide by zero") } // 正常情况,错误返回nil return a / b, nil } func main() { // 调用函数,接收两个返回值 result, err := divide(10, 2) if err != nil { fmt.Println("计算失败:", err) return } fmt.Printf("10 / 2 = %.2f\n", result) // 输出: 10 / 2 = 5.00 // 测试错误情况 result2, err2 := divide(10, 0) if err2 != nil { fmt.Println("计算失败:", err2) // 输出: 计算失败: cannot divide by zero return } fmt.Printf("10 / 0 = %.2f\n", result2) }错误处理的标准模式// 标准5步曲 result, err := 某个函数() if err != nil { // 1. 记录日志 // 2. 清理资源 // 3. 返回错误(或降级处理) return err } // 4. 使用结果 使用(result) // 5. 继续后续操作关键约定:错误是最后一个返回值错误变量名永远叫err(约定俗成)成功时 err == nil,失败时 err != nil错误的判断与分类判断错误是否发生if err != nil { // 有错误! }判断具体是什么错误方式一:错误值比较(Sentinel Errors)预定义一些"标准错误",像常量一样使用:package main import ( "errors" "fmt" ) // 在包级别定义标准错误(哨兵错误) // 习惯用 ErrXXX 命名 var ( ErrNotFound = errors.New("resource not found") ErrInvalidInput = errors.New("invalid input") ErrPermissionDenied = errors.New("permission denied") ErrTimeout = errors.New("operation timeout") ) // 模拟数据库查询 func findUser(id int) (string, error) { if id < 1 { return "", ErrInvalidInput // 返回特定错误 } if id == 999 { return "", ErrNotFound // 返回特定错误 } return "User" + fmt.Sprint(id), nil } func main() { // 测试不同错误 testCases := []int{-5, 999, 42} for _, id := range testCases { user, err := findUser(id) if err != nil { // 判断具体是哪种错误 if err == ErrInvalidInput { fmt.Printf("ID=%d: 输入无效,请检查参数\n", id) } else if err == ErrNotFound { fmt.Printf("ID=%d: 用户不存在,显示404页面\n", id) } else { fmt.Printf("ID=%d: 未知错误: %v\n", id, err) } continue } fmt.Printf("ID=%d: 找到用户 %s\n", id, user) } }输出:ID=-5: 输入无效,请检查参数 ID=999: 用户不存在,显示404页面 ID=42: 找到用户 User42方式二:errors.Is(Go 1.13+ 推荐)问题:错误可能被包装,直接比较会失败。package main import ( "errors" "fmt" ) var ErrNotFound = errors.New("not found") func findInDB() error { return ErrNotFound } func findInCache() error { return fmt.Errorf("cache miss: %w", ErrNotFound) // 包装错误 } func main() { err := findInCache() // 直接比较:失败!因为err是包装后的 fmt.Println("直接比较:", err == ErrNotFound) // false // 使用errors.Is:成功!会自动"拆包装" fmt.Println("errors.Is:", errors.Is(err, ErrNotFound)) // true }errors.Is的作用:像剥洋葱一样,一层层拆开包装,直到找到目标错误。方式三:errors.As(获取错误的具体类型)在 Go 1.13 版本之前,如果遇到错误,我们通常只是简单地返回它。但有时候,为了增加更多的上下文信息(比如是在哪个函数、哪个步骤出错的),我们会把原始错误“包装”起来。以前,直接用类型断言 err.(*PathError) 只能检查最外层。如果错误被套在了里面,断言就会失败。 errors.As 就是为了解决这个问题而诞生的。errors.As 是一个专门用来“拆套娃并提取内容”的工具。它的工作原理是:剥开外衣:它会自动一层一层地剥开(Unwrap)错误套娃。寻找目标:在每一层检查,看当前的错误是不是你想要的那个“特定类型”。赋值提取:如果找到了,它就把那个错误赋值给你准备好的变量,并返回 true。如果剥到最里面都没找到,就返回 false。当需要调用错误类型的方法或访问其字段时:package main import ( "errors" "fmt" "net" ) func main() { // 模拟一个网络错误 err := &net.OpError{ Op: "dial", Net: "tcp", Err: errors.New("connection refused"), } // 我们想判断这是否是网络错误,并获取详细信息 // 方式1:类型断言(旧方式,不推荐) if netErr, ok := err.(*net.OpError); ok { fmt.Printf("网络操作: %s, 网络类型: %s\n", netErr.Op, netErr.Net) } // 方式2:errors.As(推荐,支持错误链) var netErr *net.OpError // 参数1: 被包装的总错误 (套娃) // 参数2: 目标变量的指针 (必须传指针的地址 &) if errors.As(err, &netErr) { fmt.Printf("操作: %s, 类型: %s\n", netErr.Op, netErr.Net) } }初学者在使用 errors.As 时,最容易在第二个参数上犯错。请记住这个口诀:“一定要传目标变量的指针(地址)”。为什么? 因为 errors.As 是一个函数,如果你只传变量的值进去,它是无法修改你外面的变量的(Go 是值传递)。只有把变量的“内存地址”交出去(用 &),errors.As 才能顺着地址把找到的错误塞进你的变量里。错误的创建与包装创建错误package main import ( "errors" "fmt" ) func main() { // 方法1:简单错误 err1 := errors.New("something went wrong") // 方法2:格式化错误 err2 := fmt.Errorf("user %s not found", "Alice") // 方法3:包装错误(保留原始错误) original := errors.New("connection refused") wrapped := fmt.Errorf("database connection failed: %w", original) fmt.Println(err1) fmt.Println(err2) fmt.Println(wrapped) // 输出: database connection failed: connection refused }错误包装的艺术为什么要包装?添加上下文(在哪里出错的)保留原始错误(用于后续判断)构建错误链package main import ( "errors" "fmt" ) var ErrNotFound = errors.New("record not found") // 底层函数 func queryDB(id string) error { return fmt.Errorf("database query failed for id=%s: %w", id, ErrNotFound) } // 中层函数 func getUser(id string) error { if err := queryDB(id); err != nil { return fmt.Errorf("getUser failed: %w", err) } return nil } // 顶层函数 func handleRequest(id string) error { if err := getUser(id); err != nil { return fmt.Errorf("request processing failed: %w", err) } return nil } func main() { err := handleRequest("123") fmt.Println("=== 完整错误链 ===") fmt.Println(err) // 输出: request processing failed: getUser failed: database query failed for id=123: record not found fmt.Println("\n=== 判断根因 ===") if errors.Is(err, ErrNotFound) { fmt.Println("根本原因是:记录不存在") } fmt.Println("\n=== 逐层展开 ===") // 手动展开(实际很少需要) current := err for current != nil { fmt.Printf("→ %v\n", current) current = errors.Unwrap(current) } }输出:=== 完整错误链 === request processing failed: getUser failed: database query failed for id=123: record not found === 判断根因 === 根本原因是:记录不存在 === 逐层展开 === → request processing failed: getUser failed: database query failed for id=123: record not found → getUser failed: database query failed for id=123: record not found → database query failed for id=123: record not found → record not found关键:使用 %w 动词包装错误,才能用 errors.Is 和 errors.As 追溯。自定义错误类型为什么需要自定义错误?标准错误只是一段文字,但有时我们需要:错误码(给程序判断)HTTP状态码(给API返回)详细信息(给日志记录)堆栈信息(给调试)创建自定义错误类型package main import ( "fmt" "time" ) // 定义一个结构体作为错误类型 type AppError struct { Code int // 错误码 Message string // 给用户看的消息 Detail string // 给开发者看的详情 Timestamp time.Time // 发生时间 Path string // 发生位置 } // 实现error接口(这是关键!) func (e *AppError) Error() string { return fmt.Sprintf("[%d] %s: %s (at %s)", e.Code, e.Message, e.Detail, e.Path) } // 构造函数 func NewAppError(code int, message, detail, path string) *AppError { return &AppError{ Code: code, Message: message, Detail: detail, Timestamp: time.Now(), Path: path, } } // 使用示例 func doSomething() error { return NewAppError( 1001, // 错误码 "操作失败", // 用户消息 "数据库连接超时超过30秒", // 详细原因 "/api/v1/users", // 发生位置 ) } func main() { err := doSomething() // 作为普通错误打印 fmt.Println(err) // 输出: [1001] 操作失败: 数据库连接超时超过30秒 (at /api/v1/users) // 类型断言获取详细信息 if appErr, ok := err.(*AppError); ok { fmt.Printf("\n错误码: %d\n", appErr.Code) fmt.Printf("用户提示: %s\n", appErr.Message) fmt.Printf("发生时间: %v\n", appErr.Timestamp) } }
2026年02月27日
2 阅读
0 评论
0 点赞
2026-02-13
Golang并发
Golang并发理解并发并发(Concurrency):多个任务在同一时间段内交替执行(逻辑上同时)V.S.并行(Parallelism):多个任务在同一时刻真正同时执行(物理上同时)1.1 为什么需要并发?// 没有并发的世界:一个Web服务器 // 假设每个请求处理需要1秒,那么: // 第1个用户:等1秒 // 第2个用户:等2秒(排队) // 第1000个用户:等1000秒 // 有并发的世界: // 第1个用户:等1秒 // 第2个用户:等1秒(同时处理) // 第1000个用户:等1秒(同时处理)1.2 Go 的并发哲学"不要通过共享内存来通信,而要通过通信来共享内存"❌ 传统方式:多个人共用一个笔记本,需要排队写(加锁)✅ Go 的方式:每个人有自己的笔记本,需要时传纸条(channel)Goroutine —— Go 的超轻量级线程2.1 什么是 Goroutine?线程(Thread) = 雇一个正式员工(成本高,一个约1-8MB内存)Goroutine = 雇一个临时工(成本极低,一个只要约2KB内存)这意味着:1GB内存大约能创建 50万个 goroutine但只能创建约 1000 个线程goroutine 是 Go 运行时管理的,不是操作系统管理的2.2 第一个 Goroutinepackage main import ( "fmt" "time" ) // 普通函数 func sayHello() { fmt.Println("Hello, 我是 goroutine!") } func main() { // ============ 普通调用(串行)============ // sayHello() // 这是普通调用,main 会等它执行完 // ============ goroutine 调用(并发)============ go sayHello() // 加一个 go 关键字,就创建了一个 goroutine! // 就这么简单! // 重要:main 函数本身也是一个 goroutine(主 goroutine) // 如果 main 结束了,所有其他 goroutine 都会被强制终止! time.Sleep(1 * time.Second) // 暂时用 Sleep 等待,后面会学更好的方式 fmt.Println("main 结束了") }执行结果:Hello, 我是 goroutine! main 结束了2.3 理解 Goroutine 的执行顺序package main import ( "fmt" "time" ) func printNumbers(name string) { for i := 1; i <= 5; i++ { fmt.Printf("[%s] 数字: %d\n", name, i) time.Sleep(100 * time.Millisecond) // 模拟耗时操作 } } func main() { // 启动两个 goroutine go printNumbers("协程A") go printNumbers("协程B") // 主 goroutine 等待 time.Sleep(1 * time.Second) fmt.Println("=== 主程序结束 ===") }可能的运行结果(每次可能不同!):[协程B] 数字: 1 [协程A] 数字: 1 [协程A] 数字: 2 [协程B] 数字: 2 [协程B] 数字: 3 [协程A] 数字: 3 ...关键理解:goroutine 的执行顺序是不确定的(由 Go 调度器决定)每次运行结果可能不同这就是并发的本质 —— 你不能假设执行顺序2.4 匿名函数创建 Goroutinepackage main import ( "fmt" "time" ) func main() { // 方式1:使用命名函数 go sayHi("张三") // 方式2:使用匿名函数(更常用) go func() { fmt.Println("我是匿名函数 goroutine") }() // 注意最后的 () 表示立即调用 // 方式3:匿名函数带参数 name := "李四" go func(n string) { fmt.Printf("你好,%s\n", n) }(name) // 把 name 作为参数传入 // ⚠️ 方式4:闭包陷阱(版本差异点!) // 在 Go 1.22 之前,这里有 Bug;在 Go 1.22+,这里是安全的。 for i := 0; i < 5; i++ { go func() { // [Go < 1.22] ❌ i 是共享的,结果全是 5 // [Go ≥ 1.22] ✅ 每次循环 i 都是独立的,结果 0,1,2,3,4 fmt.Println("方式4 - i =", i) }() } // ✅ 方式5(正确方式):通过参数传值 for i := 0; i < 5; i++ { go func(n int) { fmt.Println("正确方式 - n =", n) // ✅ n 是每个 goroutine 自己的副本 }(i) // 把当前的 i 值传进去 } time.Sleep(1 * time.Second) } func sayHi(name string) { fmt.Printf("你好,%s\n", name) }传参:为什么有时候要传,有时候不用?方式 3(传参):go func(n string) { ... }(name)这里我们将外部变量 name 的值拷贝了一份,传给了匿名函数的参数 n。此时,n 是这个 Goroutine 私有的,外部怎么变都跟 n 没关系。go 关键字执行的那一瞬间,Go 语言会把外部变量(如 name 或 i)的当前值复制一份(Snapshot)。闭包(不传参直接用): 如果在匿名函数内部直接使用外部变量(如方式 4 中的 i),这叫做闭包(Closure)。函数“捕获”了外部变量的引用(地址)。P.S. 注意版本旧版本 (Go < 1.22):机制: 整个 for 循环只有一个 i 变量(内存地址不变)。结果: 主线程跑得太快,瞬间把 i 加到了 5。等 Goroutine 拿起望远镜看的时候,看到的都是 5。对策: 必须使用方式 5(传参)来强制拷贝。新版本 (Go ≥ 1.22):机制: 编译器做了优化,每次循环都会创建一个全新的 i。结果: 即使是闭包,每个 Goroutine 盯着的也是属于它那一轮的那个 i。输出正常的 0, 1, 2, 3, 4。注意: 虽然新版修复了,但在写库代码或兼容老项目时,为了保险,依然推荐使用方式 5。在方式5中for i := 0; i < 5; i++ { go func(n int) { fmt.Println("正确方式 - n =", n) }(i) // <--- 关键在这里! }go 关键字执行的那一瞬间,Go 语言会把当前的 i 的值(比如 0),复制一份给参数 n。虽然 Goroutine 还没开始运行,但它口袋里已经揣好了属于它自己的那个数字 0。等到 Goroutine 运行时,它打印的是它口袋里的 n,而不是外部那个已经变成 5 的 i。2.5 闭包陷阱详细解(注意版本)错误代码:for i := 0; i < 5; i++ { go func() { fmt.Println(i) // 所有 goroutine 共享同一个变量 i }() } 执行过程: 时间线 →→→→→→→→→→→→→→→→→→→→→→→ 主goroutine: i=0 → i=1 → i=2 → i=3 → i=4 → i=5(循环结束) goroutine1: 打印 i → 5 goroutine2: 打印 i → 5 goroutine3: 打印 i → 5 goroutine4: 打印 i → 5 goroutine5: 打印 i → 5 ↑ 全部打印5!原因:goroutine 创建很快,但调度执行有延迟。等 goroutine 真正执行时,循环早已结束,i 已经变成 5。正确代码:for i := 0; i < 5; i++ { go func(n int) { // 参数 n 是 i 的副本 fmt.Println(n) }(i) // 调用时立即把当前 i 的值复制给 n } 执行过程: 主goroutine: i=0(复制0给n) → i=1(复制1给n) → i=2(复制2给n) → ... goroutine1(n=0): 打印 0 goroutine2(n=1): 打印 1 goroutine3(n=2): 打印 2 ...Channel —— Goroutine 之间的通信管道3.1 为什么需要 Channel?生活类比:你(main goroutine)派小王(goroutine)去买咖啡。没有 channel 的情况: 你: "小王去买咖啡"(go buyCoffee()) 你: 继续干活...然后走了(main结束) 小王: 买到了!但你已经走了...拿着咖啡不知道给谁有 channel 的情况: 你们之间有一个"传递窗口"(channel) 你: "小王去买咖啡,买到了放传递窗口" 你: 在传递窗口这里等着... 小王: 买到了!放到传递窗口 你: 从传递窗口拿到咖啡 Channel = goroutine 之间传递数据的管道/传递窗口3.2 Channel 基础操作package main import "fmt" func main() { // ========== 创建 channel ========== // 语法:make(chan 数据类型) ch := make(chan string) // 创建一个传递 string 的 channel // ========== 发送和接收 ========== // 发送:ch <- 数据 // 接收:数据 := <-ch // 启动一个 goroutine 发送数据 go func() { ch <- "你好,我是来自 goroutine 的消息" // 发送数据到 channel fmt.Println("goroutine: 消息已发送") }() // 主 goroutine 接收数据 msg := <-ch // 从 channel 接收数据(会阻塞,直到收到数据) fmt.Println("main: 收到消息 →", msg) }输出:goroutine: 消息已发送 main: 收到消息 → 你好,我是来自 goroutine 的消息3.3 Channel 的阻塞特性(重要!)package main import "fmt" func main() { ch := make(chan int) go func() { fmt.Println("goroutine: 准备发送数据...") ch <- 42 // 发送。如果没人接收,这里会阻塞等待 fmt.Println("goroutine: 数据已发送") }() fmt.Println("main: 准备接收数据...") value := <-ch // 接收。如果没有数据,这里会阻塞等待 fmt.Println("main: 收到数据 →", value) }Channel 阻塞规则(无缓冲 channel):发送方(ch <- data): → 如果没有接收方在等待,发送方会【阻塞】,直到有人来接收接收方(data := <-ch): → 如果没有发送方发送数据,接收方会【阻塞】,直到有人来发送就像面对面递东西:你伸出手递东西,对方没来 → 你就一直举着手等你伸出手要接东西,对方没来 → 你就一直举着手等双方都到了 → 一手交钱一手交货,各走各的3.4 Channel 的方向package main import "fmt" // 只能发送的 channel(只写) func producer(ch chan<- string) { ch <- "产品A" ch <- "产品B" ch <- "产品C" close(ch) // 发送完毕,关闭 channel } // 只能接收的 channel(只读) func consumer(ch <-chan string) { for item := range ch { // range 会一直接收,直到 channel 被关闭 fmt.Println("消费了:", item) } } func main() { ch := make(chan string) go producer(ch) // 双向 channel 自动转为 只写 consumer(ch) // 双向 channel 自动转为 只读 }Channel 方向总结:chan string → 双向 channel(可读可写)chan<- string → 只写 channel(只能发送)——箭头指向 chan,数据进入<-chan string → 只读 channel(只能接收)——箭头远离 chan,数据出来记忆技巧:看箭头方向 chan<- 箭头指向chan = 数据流入chan = 只能写/发送 <-chan 箭头离开chan = 数据流出chan = 只能读/接收3.5 带缓冲的 Channelpackage main import "fmt" func main() { // ========== 无缓冲 channel ========== // ch1 := make(chan int) // 必须有人接收,发送方才能继续 // ========== 有缓冲 channel ========== ch2 := make(chan int, 3) // 缓冲区大小为3,最多存3个数据 // 可以连续发送3个数据,不需要有人接收 ch2 <- 10 ch2 <- 20 ch2 <- 30 // ch2 <- 40 // 如果再发第4个,就会阻塞(缓冲满了) fmt.Println("缓冲区长度:", len(ch2)) // 3(当前有3个数据) fmt.Println("缓冲区容量:", cap(ch2)) // 3(最多能存3个) // 接收数据(先进先出 FIFO) fmt.Println(<-ch2) // 10 fmt.Println(<-ch2) // 20 fmt.Println(<-ch2) // 30 }使用场景: 无缓冲:需要严格同步的场景(发送方要确认接收方收到) 有缓冲:允许一定程度异步的场景(提高吞吐量)3.6 关闭 Channel 和 range 遍历package main import "fmt" func main() { ch := make(chan int, 5) // 发送数据 go func() { for i := 1; i <= 5; i++ { ch <- i fmt.Printf("发送: %d\n", i) } close(ch) // 发送完毕一定要关闭!否则接收方会永远等待(死锁) }() // ========== 接收方式1:for range(推荐)========== for value := range ch { // range 会自动检测 channel 是否关闭 // 关闭后自动退出循环 fmt.Printf("接收: %d\n", value) } fmt.Println("所有数据接收完毕") // ========== 接收方式2:ok 判断 ========== ch2 := make(chan string, 2) go func() { ch2 <- "hello" ch2 <- "world" close(ch2) }() for { value, ok := <-ch2 if !ok { // ok 为 false 表示 channel 已关闭且没有数据了 fmt.Println("channel 已关闭") break } fmt.Println("收到:", value) } }关闭 Channel 的规则:只有发送方应该关闭 channel(谁发送,谁关闭)接收方不应该关闭 channel关闭已经关闭的 channel 会 panic向已关闭的 channel 发送数据会 panic从已关闭的 channel 接收数据:如果还有数据,正常接收如果没有数据了,返回零值(int→0, string→"", bool→false)3.7 select 多路复用package main import ( "fmt" "time" ) func main() { ch1 := make(chan string) ch2 := make(chan string) // goroutine 1:2秒后发送 go func() { time.Sleep(2 * time.Second) ch1 <- "来自 channel 1 的数据" }() // goroutine 2:1秒后发送 go func() { time.Sleep(1 * time.Second) ch2 <- "来自 channel 2 的数据" }() // select 同时监听多个 channel // 哪个先来就处理哪个 for i := 0; i < 2; i++ { select { case msg1 := <-ch1: fmt.Println("收到:", msg1) case msg2 := <-ch2: fmt.Println("收到:", msg2) } } }输出:收到: 来自 channel 2 的数据 (1秒后) 收到: 来自 channel 1 的数据 (2秒后)select 就像你同时等多个外卖: 美团外卖(ch1) 饿了么外卖(ch2)select { case 饭 := <-美团: //哪个先到 吃(饭) //就先吃哪个 case 饭 := <-饿了么: 吃(饭) }3.8 select 的高级用法package main import ( "fmt" "time" ) func main() { ch := make(chan int) // ========== 1. 带超时的 select ========== fmt.Println("--- 超时示例 ---") select { case data := <-ch: fmt.Println("收到数据:", data) case <-time.After(2 * time.Second): // 2秒后超时 fmt.Println("超时了!没有收到数据") } // ========== 2. 带 default 的非阻塞 select ========== fmt.Println("\n--- 非阻塞示例 ---") select { case data := <-ch: fmt.Println("收到数据:", data) default: // 如果所有 channel 都没有数据,立即执行 default fmt.Println("没有数据可读,不等了") } // ========== 3. 定时器 + select 实现定期任务 ========== fmt.Println("\n--- 定时任务示例 ---") ticker := time.NewTicker(500 * time.Millisecond) // 每500ms触发一次 defer ticker.Stop() done := make(chan bool) go func() { time.Sleep(2 * time.Second) done <- true }() for { select { case <-done: fmt.Println("任务完成,退出") return case t := <-ticker.C: fmt.Println("定时任务执行:", t.Format("15:04:05.000")) } } } default:普通的 select确实是设计用来“等待”channel 数据的。但是,一旦加上了 default 分支,select 的行为模式就完全变了。当 select 语句中所有其他的 case(无论是读还是写)都无法立即执行(即会被阻塞)时,default 分支就会立即触发。它的核心作用就是把“阻塞等待”变成了“非阻塞检查”。情况 1:没有 Default(阻塞模式)select { case msg := <-ch: fmt.Println("收到数据:", msg) } // 如果 ch 是空的,程序永远不会执行到这一行(除非 ch 被关闭)情况2:有 Default(非阻塞模式)Go 运行时会检查 ch:如果有数据:执行 case,读取数据。如果没数据:Go 运行时发现读取 ch 会导致阻塞,于是立刻放弃等待,转而执行 default。select { case msg := <-ch: fmt.Println("收到数据:", msg) default: fmt.Println("通道没数据,我不想等,我先溜了") } // 这一行会立刻被执行常见用途:非阻塞接收(探测)select { case msg := <-ch: // 有数据才处理 process(msg) default: // 没数据也不阻塞,继续干别的 doOtherWork() }非阻塞发送(防止阻塞)select { case ch <- value: // 发送成功 default: // 通道满或没有接收者,不阻塞,直接丢弃或处理 log.Println("发送失败,通道繁忙") }WaitGroup —— 等待一组 Goroutine 完成在前面的讲解中,我们一直在使用time.Sleep作为权宜之计,但这有很多的问题:不知道该等多久等太久浪费时间等太短可能 goroutine 还没执行完WaitGroup 就是来解决这个问题的!4.1 WaitGroup 基础在 Go 语言的并发编程中,sync.WaitGroup 是一个非常实用的“计数器”,它的核心作用是等待一组并发任务(goroutine)全部完成后,再继续执行后面的代码。如果不使用 WaitGroup,主线程(main goroutine)可能会在子任务还没跑完时就提前结束,导致程序直接退出。WaitGroup 只有三个简单的方法,就像是在维护一个计数器:Add(int):计数器 加 N。通常在启动 goroutine 之前调用,表示“我有 N 个任务要做了”。Done():计数器 减 1。通常在 goroutine 内部的逻辑结束后调用,表示“这个任务做完了”。Wait():阻塞主线程。直到计数器归零,才会解除阻塞,继续往下走。package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup // 创建 WaitGroup(零值可用,不需要初始化) // 启动3个 goroutine for i := 1; i <= 3; i++ { wg.Add(1) // 计数器 +1:告诉 WaitGroup "我要启动一个 goroutine" go func(id int) { defer wg.Done() // 计数器 -1:当 goroutine 完成时调用(用 defer 保证一定执行) fmt.Printf("Worker %d: 开始工作\n", id) time.Sleep(time.Duration(id) * time.Second) // 模拟不同耗时 fmt.Printf("Worker %d: 工作完成\n", id) }(i) } fmt.Println("Main: 等待所有 worker 完成...") wg.Wait() // 阻塞,直到计数器变为 0 fmt.Println("Main: 所有 worker 都完成了!") }输出:Main: 等待所有 worker 完成... Worker 1: 开始工作 Worker 3: 开始工作 Worker 2: 开始工作 Worker 1: 工作完成 (1秒后) Worker 2: 工作完成 (2秒后) Worker 3: 工作完成 (3秒后) Main: 所有 worker 都完成了!WaitGroup 就像餐厅的"叫号系统":wg.Add(1) → 来了一桌客人,等待人数 +1wg.Done() → 一桌客人吃完走了,等待人数 -1wg.Wait() → 服务员说:"等所有客人都吃完,我再打扫关门"计数器变化: Add(1) → 计数器: 0→1 Add(1) → 计数器: 1→2 Add(1) → 计数器: 2→3 Done() → 计数器: 3→2 Done() → 计数器: 2→1 Done() → 计数器: 1→0 Wait() → 检测到计数器=0,不再阻塞,继续执行4.2 WaitGroup 常见避坑传递指针,而非值: 在函数间传递 WaitGroup 时,必须使用指针。如果你直接传值,函数内部会复制一个新的计数器,导致主线程的 Wait() 永远接收不到归零信号,引发 Deadlock(死锁)。下面还会讲Add 的时机: 永远要在 go func() 之前调用 Add()。如果写在 goroutine 内部,可能会出现主线程已经运行到 Wait(),而子协程还没来得及 Add 的情况,导致程序误以为没有任务直接跳过。计数器不能为负数: 如果 Done() 的调用次数超过了 Add() 的次数,程序会直接 panic。这就是为什么建议使用 defer wg.Done() 来确保即使函数出错也能正常减去计数。4.3 WaitGroup 传递给函数(必须用指针)package main import ( "fmt" "sync" "time" ) // 必须传指针 *sync.WaitGroup func worker(id int, wg *sync.WaitGroup) { defer wg.Done() // 完成时计数器 -1 fmt.Printf("Worker %d: 开始\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d: 完成\n", id) } // 错误!:传值会复制一份,Done() 操作的是副本,原来的计数器不变 → 死锁! // func worker(id int, wg sync.WaitGroup) { ... } func main() { var wg sync.WaitGroup for i := 1; i <= 3; i++ { wg.Add(1) go worker(i, &wg) // 传指针! } wg.Wait() fmt.Println("全部完成") }在 Go 语言中,所有的参数传递都是值传递。如果你传值 (sync.WaitGroup): 这就好比你手里有一个原件计数器,但在交给子协程时,你把它放进复印机,“复印”了一份交给它。子协程在复印件上按下了 Done(),复印件归零了。但主线程(main)手里拿的还是原件,原件的计数器依然是 3。主线程执行 wg.Wait() 时,它只盯着原件看。原件永远不会变成 0,于是主线程永远等下去,程序报错 Deadlock(死锁)。如果你传指针 (*sync.WaitGroup): 这就好比你把计数器的家庭地址告诉了子协程。大家顺着地址找到的是同一个实体。子协程减 1,主线程看到的也是减了 1。在实际开发中,除了传指针,还有一种更“稳妥”且常见的写法:使用闭包。如果你不想显式传递指针,可以直接在 main 函数的作用域内启动协程,这样就不存在传参拷贝的问题了:func main() { var wg sync.WaitGroup for i := 1; i <= 3; i++ { wg.Add(1) // 使用闭包直接引用外部变量 wg go func(id int) { defer wg.Done() fmt.Printf("Worker %d 运行中\n", id) }(i) } wg.Wait() }4.4 实战:并发下载文件package main import ( "fmt" "math/rand" "sync" "time" ) // 模拟下载文件 func downloadFile(filename string, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("开始下载: %s\n", filename) // 模拟下载耗时(1-3秒随机) duration := time.Duration(rand.Intn(3)+1) * time.Second time.Sleep(duration) fmt.Printf("下载完成: %s (耗时 %v)\n", filename, duration) } func main() { files := []string{ "photo1.jpg", "video.mp4", "document.pdf", "music.mp3", "archive.zip", } var wg sync.WaitGroup start := time.Now() for _, file := range files { wg.Add(1) go downloadFile(file, &wg) } wg.Wait() elapsed := time.Since(start) fmt.Printf("\n 所有文件下载完成!总耗时: %v\n", elapsed) fmt.Println("(如果串行下载,至少需要5秒以上)") }竞态检测(Race Condition Detection)5.1 什么是竞态条件(Race Condition)?你和室友共用一个银行账户(余额 1000 元)【正常情况 —— 串行操作】: 你:查看余额 1000 → 取 500 → 余额更新为 500 室友:查看余额 500 → 取 300 → 余额更新为 200 最终余额:200 【竞态条件 —— 并发操作】: 你:查看余额 1000 → 准备取 500... 室友:查看余额 1000 → 准备取 300...(你还没取完,他看到的还是1000) 你:余额更新为 1000 - 500 = 500 室友:余额更新为 1000 - 300 = 700(覆盖了你的结果!) 最终余额:700 (实际应该是 200!凭空多了 500 元!)这就是竞态条件: 多个 goroutine 同时读写同一个变量,结果取决于执行顺序,导致程序行为不可预测。代码演示:package main import ( "fmt" "sync" ) func main() { counter := 0 var wg sync.WaitGroup // 启动 1000 个 goroutine,每个对 counter +1 for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() counter++ // 竞态!多个 goroutine 同时读写 counter }() } wg.Wait() fmt.Println("期望值: 1000") fmt.Println("实际值:", counter) // 每次运行结果可能不同! }多次运行可能的结果:如果哪次等于1000,就是纯碰巧为什么 counter++ 不是原子操作?counter++ 在底层其实是三步操作:步骤1:从内存读取 counter 的值到寄存器 (READ)步骤2:在寄存器中将值 +1 (ADD)步骤3:将新值写回内存 (WRITE)结果:两次 +1 操作,counter 只变成了 1 而不是 2,丢失了一次更新!5.2 Go 的竞态检测器(Race Detector)Go 内置了一个非常强大的竞态检测工具,编译时加 -race 标志即可使用。5.2.1 基本使用# 运行时检测竞态 go run -race main.go # 测试时检测竞态 go test -race ./... # 编译时嵌入竞态检测(生成的二进制文件运行时会检测) go build -race -o myapp main.go ./myapp5.2.2 检测上面的竞态代码将之前的代码保存为 main.go,执行:go run -race main.go5.3 常见竞态场景与检测5.3.1 场景一:共享变量并发读写package main import ( "fmt" "sync" ) // 有竞态! func raceExample() { data := make(map[string]int) // map 不是并发安全的! var wg sync.WaitGroup // 多个 goroutine 同时写 map for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() key := fmt.Sprintf("key-%d", n) data[key] = n // 并发写 map → 竞态!甚至可能 panic! }(i) } wg.Wait() fmt.Println(data) } func main() { raceExample() }输出:fatal error: concurrent map writesmap 的并发写甚至不需要 race detector,直接 panic!5.3.2 场景二:共享 slice 并发 appendpackage main import ( "fmt" "sync" ) func main() { var results []int // slice 不是并发安全的 var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(n int) { defer wg.Done() results = append(results, n) // 竞态! }(i) } wg.Wait() fmt.Println("期望长度: 100") fmt.Println("实际长度:", len(results)) // 可能不是 100! }5.3.3 场景三:结构体字段并发访问package main import ( "fmt" "sync" ) type User struct { Name string Balance int } func main() { user := &User{Name: "张三", Balance: 1000} var wg sync.WaitGroup // 多个 goroutine 同时修改同一个结构体字段 for i := 0; i < 100; i++ { wg.Add(1) go func() { defer wg.Done() user.Balance-- // 竞态!并发修改结构体字段 }() } wg.Wait() fmt.Printf("期望: %d, 实际: %d\n", 900, user.Balance) }5.3.4 场景四:隐蔽的竞态 —— 并发读写 bool/interfacepackage main import ( "fmt" "sync" "time" ) // 很多人以为 bool 赋值是原子的 —— 这是错误的! func main() { var running bool = true // 即使是 bool,也有竞态! var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() time.Sleep(1 * time.Second) running = false // 一个 goroutine 写 fmt.Println("已设置 running = false") }() // 另一个 goroutine 读 wg.Add(1) go func() { defer wg.Done() for running { // 另一个 goroutine 读 time.Sleep(100 * time.Millisecond) } fmt.Println("检测到 running 为 false,退出循环") }() wg.Wait() }5.4 修复竞态的四种方法5.4.1 方法一:sync.Mutex(互斥锁)package main import ( "fmt" "sync" ) func main() { counter := 0 var mu sync.Mutex // 互斥锁 var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() mu.Lock() // 🔒 加锁:同一时间只有一个 goroutine 能进入 counter++ // 安全地操作共享变量 mu.Unlock() // 🔓 解锁:让其他 goroutine 可以进入 }() } wg.Wait() fmt.Println("期望值: 1000") fmt.Println("实际值:", counter) // 一定是 1000 }5.4.2 方法二:sync.RWMutex(读写锁)核心规则:读锁之间不互斥 → 多个读操作可并发执行写锁与读锁互斥 → 写时阻止所有读写锁与写锁互斥 → 同时只能一个写操作主要方法sync.RWMutex 提供了以下 4 个核心方法:写操作(独占锁):Lock(): 加写锁。如果已有读锁或写锁,会阻塞直到锁可用。Unlock(): 解写锁。读操作(共享锁):RLock(): 加读锁。只要没有写锁(或写锁请求),就可以立即获得。RUnlock(): 解读锁。代码示例:并发安全的 Map这是 RWMutex 最经典的使用场景。假设我们有一个配置中心,读取配置的请求非常多,但修改配置的操作很少。package main import ( "fmt" "sync" "time" ) // SafeMap 包装了一个普通的 map 和一个 RWMutex type SafeMap struct { mu sync.RWMutex // 读写锁 data map[string]string // 被保护的共享数据 } // Get 读取数据 -> 使用读锁 (RLock) func (m *SafeMap) Get(key string) string { m.mu.RLock() // 加读锁(允许多个读者同时进入) defer m.mu.RUnlock() // 延迟解读锁(函数返回时自动释放) return m.data[key] } // Set 修改数据 -> 使用写锁 (Lock) func (m *SafeMap) Set(key string, value string) { m.mu.Lock() // 加写锁(独占,阻止所有读写) defer m.mu.Unlock() // 确保函数退出时解写锁,即使 panic 也会解锁,防止死锁 m.data[key] = value } func main() { sm := SafeMap{data: make(map[string]string)} // 模拟写入(写少) go func() { for i := 0; i < 5; i++ { sm.Set("config", fmt.Sprintf("v%d", i)) fmt.Println("Write: Data updated") time.Sleep(time.Millisecond * 100) } }() // 模拟并发读取(读多) for i := 0; i < 10; i++ { go func(id int) { for { val := sm.Get("config") // 多个 reader 可以同时打印,不会互相阻塞 fmt.Printf("Reader %d got: %s\n", id, val) time.Sleep(time.Millisecond * 50) } }(i) } time.Sleep(time.Second) }关键实现细节:写锁优先(防止写饥饿)在早期的读写锁设计中,容易出现“写饥饿” (Writer Starvation) 现象:如果读请求源源不断,写者可能永远抢不到锁。Go 语言的 RWMutex 采用了写锁优先的策略来解决这个问题:当一个协程调用 Lock() (申请写锁) 时,它会阻塞等待。关键点: 此时,新的 RLock() (读锁) 请求会被阻塞,即使当前持有锁的也是读者。系统会等待当前已经持有读锁的协程执行完 RUnlock(),然后立即让等待的写者获得锁。这保证了写操作不会因为读流量过大而被无限期推迟。Mutex vs RWMutex 对比:sync.Mutex(互斥锁): Lock() → 独占。不管读还是写,同时只能一个 goroutine。 Unlock() 适用:读写都频繁,或者操作很快sync.RWMutex(读写锁): Lock() / Unlock() → 写锁。写的时候独占,所有读写都等待。 RLock() / RUnlock() → 读锁。读的时候共享,多个读可以并发。 适用:读多写少的场景(如配置、缓存)5.4.3 方法三:sync/atomic(原子操作)原子操作(Atomic Operation)是指不可分割的操作,就像化学中的"原子"一样,是最小的、不可再分的单位。在并发编程中,原子操作保证:要么完全执行要么完全不执行不会被其他 goroutine 打断为什么需要原子操作?问题场景:数据竞争:var counter int = 0 // 非原子操作 // 1000 个 goroutine 同时执行 counter++ for i := 0; i < 1000; i++ { go func() { counter++ // 危险!不是原子的 }() }为什么 counter++ 不安全?因为 counter++ 实际上是 3 个步骤:读取 counter 的值(假设是 5)计算 5 + 1 = 6写回 6 到 counter在并发时可能发生:goroutine A: 读取 counter = 5 goroutine B: 读取 counter = 5 ← 还没来得及写回 goroutine A: 计算 5+1 = 6 goroutine B: 计算 5+1 = 6 goroutine A: 写回 6 goroutine B: 写回 6 ← 丢失了一次累加!结果:预期 1000,实际可能只有 900 多。解决方案:方案 1:加锁(传统方式)var mu sync.Mutex mu.Lock() counter++ mu.Unlock()缺点:性能开销大(涉及操作系统调度)方案 2:原子操作优点:CPU 级别的指令,性能高atomic.AddInt64(&counter, 1) // 一步到位,无需加锁优点:CPU 级别的指令,性能高Go 原子操作详解package main import ( "fmt" "sync" "sync/atomic" ) func main() { var counter int64 = 0 // 注意:atomic 操作需要固定位数的类型 var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() atomic.AddInt64(&counter, 1) // 原子 +1,无需加锁 }() } wg.Wait() // 读取也要用原子操作 finalValue := atomic.LoadInt64(&counter) fmt.Println("期望值: 1000") fmt.Println("实际值:", finalValue) // 一定是 1000 fmt.Println("\n--- 其他原子操作 ---") // 原子存储 var flag int32 atomic.StoreInt32(&flag, 1) fmt.Println("flag:", atomic.LoadInt32(&flag)) // 1 // 原子比较并交换 (CAS - Compare And Swap) // 如果当前值是 1,就换成 2 swapped := atomic.CompareAndSwapInt32(&flag, 1, 2) fmt.Println("CAS 成功:", swapped) // true fmt.Println("flag:", atomic.LoadInt32(&flag)) // 2 // 再试一次,当前值是2不是1,所以不会交换 swapped = atomic.CompareAndSwapInt32(&flag, 1, 3) fmt.Println("CAS 成功:", swapped) // false fmt.Println("flag:", atomic.LoadInt32(&flag)) // 仍然是 2 // Go 1.19+ 新增的泛型原子类型 var atomicBool atomic.Bool atomicBool.Store(true) fmt.Println("atomicBool:", atomicBool.Load()) // true var atomicValue atomic.Value atomicValue.Store("hello") fmt.Println("atomicValue:", atomicValue.Load()) // hello }Context —— 并发控制的"遥控器"6.1 为什么需要 Context?你是项目经理(main goroutine),派了一个团队去做项目:张三负责前端(goroutine 1)李四负责后端(goroutine 2)王五负责数据库(goroutine 3,由李四启动的子goroutine)问题来了:客户突然说"项目取消了",你怎么通知所有人停下?项目有 deadline(截止时间),到期了怎么让所有人自动停下?你想传递一些项目信息(如项目ID)给所有人如果没有 Context:你要给每个人单独打电话通知(每个 goroutine 一个 channel)王五是李四启动的,你甚至联系不到他非常混乱!有了 Context:你有一个"群广播"(context),一键通知所有人支持"到期自动通知"支持"附带信息"子 goroutine 也会自动收到通知(树状传递)6.2 Context 的基本概念Context 是 Go 的上下文管理机制,用于:取消信号传递:通知 goroutine 停止工作超时控制:自动取消超时的操作截止时间:设置任务的最后期限传递请求范围的值:(不推荐滥用)核心接口:type Context interface { // 返回 context 的截止时间(如果有) Deadline() (deadline time.Time, ok bool) // 返回一个 channel,当 context 被取消时关闭 Done() <-chan struct{} // 返回取消的原因 Err() error // 返回与 context 关联的值 Value(key interface{}) interface{} }6.3 Context 的四种创建方式1. Background 和 TODO// Background:根 context,通常用于 main 函数、初始化、测试 ctx := context.Background() // TODO:当不确定用什么 context 时使用(占位符) ctx := context.TODO()它们的底层实现完全一致(都是 emptyCtx),只是语义不同:context.Background():通常作为所有 Context 树的根节点(Root),用于主函数、初始化或测试代码中。context.TODO():用于当你还不确定要用什么 Context,或者代码还在开发中(To Do)时作为一个占位符。它明确告诉阅读代码的人:“这里未来可能会传入一个具体的 Context” 。2. WithCancel:手动取消用于手动控制 Goroutine 的停止。用法: ctx, cancel := context.WithCancel(parent)原理:返回一个 cancel 函数。调用这个函数会关闭内部的 Done Channel。利用 Go 的 Channel 广播机制:关闭 Channel 会通知所有监听该 Channel 的 Goroutine(case <-ctx.Done(): 会立即解除阻塞)。级联取消: WithCancel 会构建父子关系。当父 Context 取消时,它会遍历并取消所有子 Context;但子 Context 取消不会影响父 Context 。ctx, cancel := context.WithCancel(context.Background()) go func() { // 监听取消信号 <-ctx.Done() fmt.Println("任务被取消了:", ctx.Err()) }() // 手动取消 time.Sleep(1 * time.Second) cancel() // 调用 cancel() 会关闭 Done() channel3. WithTimeout:超时自动取消// 3秒后自动取消 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() // 养成好习惯:总是 defer cancel() go func() { select { case <-time.After(5 * time.Second): fmt.Println("任务完成") case <-ctx.Done(): fmt.Println("超时取消:", ctx.Err()) // context deadline exceeded } }() time.Sleep(4 * time.Second)4. WithDeadline:指定截止时间底层逻辑与 WithCancel 类似,只是多了一个定时器。一旦时间到了,会自动调用 cancel 函数 区分取消原因: 可以通过 ctx.Err() 判断任务是因为手动取消(Canceled)还是因为超时(DeadlineExceeded)结束的// 指定绝对时间点 deadline := time.Now().Add(2 * time.Second) ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() go func() { <-ctx.Done() fmt.Println("截止时间到:", ctx.Err()) }() time.Sleep(3 * time.Second)5. WithValue:传递值(慎用!)// 传递请求ID ctx := context.WithValue(context.Background(), "requestID", "12345") go func(ctx context.Context) { requestID := ctx.Value("requestID").(string) fmt.Println("请求ID:", requestID) }(ctx)警告:不要用 Context 传递函数参数!只用于传递请求范围的元数据(如请求ID、用户身份等)优先使用函数参数,Context 只是补充6.4 Context 的树状传递(父子关系)1. Context 树的构成Context 的树状结构并非一开始就存在,而是通过派生(Derivation)逐步建立的。根节点 (Root): 树的起点通常是 context.Background()(用于 main 函数、初始化)或 context.TODO()(用于占位)。它们是空的 Context,没有值,也不会被取消。子节点 (Children): 通过 WithCancel、WithDeadline、WithTimeout 或 WithValue 函数,基于一个“父 Context”创建一个新的“子 Context”。2. 信号传递:自上而下的取消这是 Context 最重要的特性。取消信号只向叶子节点方向传递,不可逆流。传递规则:父死子必亡: 当父 Context 被取消(或超时)时,它会自动关闭自己的 Done channel。所有基于该父 Context 派生的子 Context(以及孙 Context)都会收到取消信号,它们的 Done channel 也会被关闭。子死父不受影响: 子 Context 被取消时,只会影响它自己以及它的子树,不会影响父 Context 或兄弟 Context。package main import ( "context" "fmt" "time" ) func serviceA(ctx context.Context) { // serviceA 创建子 context,传给 serviceB ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() go serviceB(ctx) select { case <-ctx.Done(): fmt.Println("⛔ ServiceA 停止:", ctx.Err()) } } func serviceB(ctx context.Context) { // serviceB 创建子 context,传给 serviceC ctx = context.WithValue(ctx, "requestID", "req-123") go serviceC(ctx) select { case <-ctx.Done(): fmt.Println("⛔ ServiceB 停止:", ctx.Err()) } } func serviceC(ctx context.Context) { reqID := ctx.Value("requestID") // 可以获取父 context 传递的值 for { select { case <-ctx.Done(): fmt.Printf("⛔ ServiceC 停止 (requestID=%v): %v\n", reqID, ctx.Err()) return default: fmt.Printf("🔄 ServiceC 工作中 (requestID=%v)\n", reqID) time.Sleep(500 * time.Millisecond) } } } func main() { // 顶层 context ctx, cancel := context.WithCancel(context.Background()) go serviceA(ctx) time.Sleep(2 * time.Second) fmt.Println("\n🔴 Main: 取消顶层 context") cancel() // 取消顶层,所有子 context 都会被取消! time.Sleep(1 * time.Second) fmt.Println("✅ 程序结束") }Worker Pool —— 并发任务的"流水线工厂"7.1 为什么需要 Worker Pool?假设你开了一家快递分拣中心:没有 Worker Pool(暴力并发): 今天有 10000 个包裹要分拣 你雇了 10000 个临时工,每人分拣一个 结果:场地挤不下,工人互相碰撞,效率反而更低!有 Worker Pool(流水线模式): 你雇了 10 个固定员工(Worker) 包裹放在传送带上(Job Channel) 每个员工从传送带取一个包裹,分拣完再取下一个 分拣好的包裹放到另一条传送带(Result Channel) 结果:井然有序,效率最高!Worker Pool 解决的核心问题:资源控制:限制并发数量,防止 goroutine 爆炸(OOM)复用 goroutine:避免频繁创建和销毁的开销背压控制:任务太多时自动排队,不会压垮系统有序收集结果:通过 result channel 统一收集处理结果7.2 最简单的 Worker Poolpackage main import ( "fmt" "sync" "time" ) // worker 从 jobs channel 接收任务,处理后将结果发送到 results channel func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { // 不断从 jobs 中取任务,直到 jobs 被关闭 fmt.Printf("Worker %d: 开始处理任务 %d\n", id, job) time.Sleep(500 * time.Millisecond) // 模拟耗时操作 result := job * 2 // 模拟处理逻辑 fmt.Printf("Worker %d: 任务 %d 完成,结果 = %d\n", id, job, result) results <- result // 将结果发送到 results channel } } func main() { const numWorkers = 3 // 工人数量 const numJobs = 10 // 任务数量 jobs := make(chan int, numJobs) // 任务队列(带缓冲) results := make(chan int, numJobs) // 结果队列(带缓冲) // ========== 第1步:启动 Worker ========== var wg sync.WaitGroup for w := 1; w <= numWorkers; w++ { wg.Add(1) go worker(w, jobs, results, &wg) } // ========== 第2步:发送任务 ========== for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 所有任务发送完毕,关闭 jobs channel // ========== 第3步:等待所有 Worker 完成,然后关闭 results ========== // 用一个额外的 goroutine 来等待,避免阻塞主 goroutine go func() { wg.Wait() close(results) // 所有 worker 完成后,关闭 results channel }() // ========== 第4步:收集结果 ========== var total int for result := range results { total += result } fmt.Printf("\n所有任务完成!结果总和 = %d\n", total) }输出示例:Worker 3: 开始处理任务 1 Worker 1: 开始处理任务 2 Worker 2: 开始处理任务 3 Worker 3: 任务 1 完成,结果 = 2 Worker 3: 开始处理任务 4 Worker 1: 任务 2 完成,结果 = 4 Worker 1: 开始处理任务 5 Worker 2: 任务 3 完成,结果 = 6 Worker 2: 开始处理任务 6 ... 所有任务完成!结果总和 = 110执行流程图解:时间 →→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→→ Worker1: [任务1处理中...] [任务4处理中...] [任务7处理中...] [任务10] Worker2: [任务2处理中...] [任务5处理中...] [任务8处理中...] Worker3: [任务3处理中...] [任务6处理中...] [任务9处理中...] 3个 Worker 并发处理 10 个任务,每个 Worker 处理完一个就取下一个7.3 理解 Worker Pool 的关键细节为什么用 for job := range jobs ?func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { // 处理任务... } // range 退出 = jobs channel 已关闭且数据已读完 // 此时 worker 自然退出,defer wg.Done() 执行 }range 会持续从 channel 读取数据。当 jobs 被 close() && 缓冲区清空后,range 自动退出循环。这比手动检查 ok 更简洁。为什么要单开一个 goroutine 等待并关闭 results ?// 错误做法:在主 goroutine 中先 Wait 再 range wg.Wait() // 阻塞在这里等 worker 完成 close(results) for r := range results { ... } // 永远执行不到这里!因为 worker 往 results 发送时也会阻塞 // 正确做法:开一个 goroutine 去等待 go func() { wg.Wait() close(results) }() for r := range results { ... } // 主 goroutine 可以一边消费 results 一边等主 goroutine 有两件事要做:消费 results 和 等待 wg 归零。如果串行做,会导致死锁——worker 写 results 时可能阻塞(没人读),而主 goroutine 在 wg.Wait() 等 worker 完成。所以需要并行:主 goroutine 负责消费 results,另一个 goroutine 负责等待并关闭。7.4 带 Context 取消的 Worker Pool在实际场景中,我们往往需要在任务执行过程中取消所有 Worker(比如用户取消请求、超时等)。package main import ( "context" "fmt" "math/rand" "sync" "time" ) // Job 表示一个任务 type Job struct { ID int Payload string } // Result 表示任务的处理结果 type Result struct { Job Job Output string WorkerID int } func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) { defer wg.Done() for { select { case <-ctx.Done(): // 收到取消信号,立即退出 fmt.Printf("Worker %d: 收到取消信号,退出\n", id) return case job, ok := <-jobs: if !ok { // jobs channel 已关闭,没有更多任务了 fmt.Printf("Worker %d: 没有更多任务,退出\n", id) return } // 模拟处理任务(可能耗时) fmt.Printf("Worker %d: 处理任务 %d (%s)\n", id, job.ID, job.Payload) duration := time.Duration(rand.Intn(3)+1) * time.Second // 在处理过程中也要检查取消信号 select { case <-ctx.Done(): fmt.Printf("Worker %d: 任务 %d 处理中被取消\n", id, job.ID) return case <-time.After(duration): // 处理完成 results <- Result{ Job: job, Output: fmt.Sprintf("结果_%s_已处理", job.Payload), WorkerID: id, } } } } } func main() { // 创建可取消的 context,3秒后自动取消 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() const numWorkers = 3 jobs := make(chan Job, 10) results := make(chan Result, 10) // 启动 Worker var wg sync.WaitGroup for w := 1; w <= numWorkers; w++ { wg.Add(1) go worker(ctx, w, jobs, results, &wg) } // 发送任务(可能发到一半就被取消了) go func() { for i := 1; i <= 20; i++ { select { case <-ctx.Done(): fmt.Println("任务发送被取消,停止发送") close(jobs) return case jobs <- Job{ID: i, Payload: fmt.Sprintf("数据_%d", i)}: } } close(jobs) }() // 等待所有 worker 结束并关闭 results go func() { wg.Wait() close(results) }() // 收集结果 var count int for result := range results { count++ fmt.Printf("收到结果: 任务%d 由Worker%d完成 → %s\n", result.Job.ID, result.WorkerID, result.Output) } fmt.Printf("\n总共完成 %d 个任务(3秒超时)\n", count) }关键改进点:select 双监听:Worker 同时监听 ctx.Done() 和 jobs,取消信号到来时优先退出任务处理中也检查取消:模拟耗时操作时用 select 监听,而不是裸 time.Sleep发送端也检查取消:避免向已经无人消费的 channel 继续发送导致阻塞7.5 动态调整 Worker 数量有时候任务量波动很大,我们希望根据负载动态增加或减少 Worker。package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // Pool 是一个可动态调整大小的 Worker Pool type Pool struct { jobs chan int // 任务队列 results chan int // 结果队列 workerNum int64 // 当前 worker 数量(原子操作) ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // NewPool 创建一个新的 Worker Pool func NewPool(bufferSize int) *Pool { ctx, cancel := context.WithCancel(context.Background()) return &Pool{ jobs: make(chan int, bufferSize), results: make(chan int, bufferSize), ctx: ctx, cancel: cancel, } } // AddWorker 添加一个 Worker func (p *Pool) AddWorker() { p.wg.Add(1) id := atomic.AddInt64(&p.workerNum, 1) go func(workerID int64) { defer p.wg.Done() defer atomic.AddInt64(&p.workerNum, -1) fmt.Printf("🟢 Worker %d 上线\n", workerID) for { select { case <-p.ctx.Done(): fmt.Printf("🔴 Worker %d 下线(Pool关闭)\n", workerID) return case job, ok := <-p.jobs: if !ok { fmt.Printf("🔴 Worker %d 下线(无更多任务)\n", workerID) return } // 处理任务 time.Sleep(200 * time.Millisecond) p.results <- job * 10 } } }(id) } // Submit 提交任务 func (p *Pool) Submit(job int) { p.jobs <- job } // Shutdown 优雅关闭 func (p *Pool) Shutdown() { close(p.jobs) // 先关闭任务队列,让 worker 自然退出 p.wg.Wait() // 等待所有 worker 完成 close(p.results) } // ForceShutdown 强制关闭(通过 context 取消) func (p *Pool) ForceShutdown() { p.cancel() // 发送取消信号 p.wg.Wait() // 等待所有 worker 响应取消 close(p.results) } func main() { pool := NewPool(20) // 一开始启动 2 个 Worker fmt.Println("=== 初始:2 个 Worker ===") pool.AddWorker() pool.AddWorker() // 提交一批任务 go func() { for i := 1; i <= 15; i++ { pool.Submit(i) fmt.Printf("提交任务 %d\n", i) } }() time.Sleep(1 * time.Second) // 发现任务堆积,动态增加 Worker fmt.Println("\n=== 负载增加,扩容到 5 个 Worker ===") pool.AddWorker() pool.AddWorker() pool.AddWorker() fmt.Printf("当前 Worker 数: %d\n", atomic.LoadInt64(&pool.workerNum)) // 在另一个 goroutine 收集结果 go func() { for result := range pool.results { fmt.Printf("收到结果: %d\n", result) } }() time.Sleep(3 * time.Second) // 优雅关闭 fmt.Println("\n=== 关闭 Pool ===") pool.Shutdown() fmt.Println(" Pool 已关闭") }7.6 实战:并发爬虫 Worker Pool模拟一个并发 URL 爬取场景,用 Worker Pool 控制并发请求数量。package main import ( "fmt" "math/rand" "sync" "time" ) // CrawlJob 爬取任务 type CrawlJob struct { URL string } // CrawlResult 爬取结果 type CrawlResult struct { URL string StatusCode int Body string Duration time.Duration WorkerID int Err error } // crawlWorker 爬虫 Worker func crawlWorker(id int, jobs <-chan CrawlJob, results chan<- CrawlResult, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { start := time.Now() // 模拟 HTTP 请求(实际项目中使用 http.Get) time.Sleep(time.Duration(rand.Intn(2000)+500) * time.Millisecond) // 模拟结果 statusCode := 200 if rand.Float32() < 0.1 { // 10% 概率失败 statusCode = 500 } results <- CrawlResult{ URL: job.URL, StatusCode: statusCode, Body: fmt.Sprintf("<html>%s的内容</html>", job.URL), Duration: time.Since(start), WorkerID: id, } } } func main() { // 模拟待爬取的 URL 列表 urls := []string{ "https://example.com/page/1", "https://example.com/page/2", "https://example.com/page/3", "https://example.com/page/4", "https://example.com/page/5", "https://example.com/page/6", "https://example.com/page/7", "https://example.com/page/8", "https://example.com/page/9", "https://example.com/page/10", } const numWorkers = 3 // 限制并发数为 3,避免被目标网站封禁 jobs := make(chan CrawlJob, len(urls)) results := make(chan CrawlResult, len(urls)) // 启动 Worker var wg sync.WaitGroup for w := 1; w <= numWorkers; w++ { wg.Add(1) go crawlWorker(w, jobs, results, &wg) } // 发送任务 start := time.Now() for _, url := range urls { jobs <- CrawlJob{URL: url} } close(jobs) // 等待完成并关闭结果通道 go func() { wg.Wait() close(results) }() // 收集并统计结果 var successCount, failCount int for result := range results { if result.StatusCode == 200 { successCount++ fmt.Printf("✅ [Worker%d] %s → %d (耗时 %v)\n", result.WorkerID, result.URL, result.StatusCode, result.Duration) } else { failCount++ fmt.Printf("❌ [Worker%d] %s → %d (耗时 %v)\n", result.WorkerID, result.URL, result.StatusCode, result.Duration) } } elapsed := time.Since(start) fmt.Printf("\n========== 爬取统计 ==========\n") fmt.Printf("总 URL 数: %d\n", len(urls)) fmt.Printf("成功: %d, 失败: %d\n", successCount, failCount) fmt.Printf("并发数: %d\n", numWorkers) fmt.Printf("总耗时: %v\n", elapsed) fmt.Printf("(串行预估耗时: %v+)\n", time.Duration(len(urls))*time.Second) }输出示例:✅ [Worker2] https://example.com/page/2 → 200 (耗时 732ms) ✅ [Worker1] https://example.com/page/1 → 200 (耗时 1.15s) ✅ [Worker3] https://example.com/page/3 → 200 (耗时 1.85s) ✅ [Worker2] https://example.com/page/4 → 200 (耗时 1.02s) ❌ [Worker1] https://example.com/page/5 → 500 (耗时 580ms) ... ========== 爬取统计 ========== 总 URL 数: 10 成功: 9, 失败: 1 并发数: 3 总耗时: 5.2s (串行预估耗时: 10s+)7.7 Worker Pool 模式总结核心三件套:jobs channel → N 个 Worker goroutine → results channel (任务入口) (并发处理) (结果出口)设计要点:Worker 数量:根据任务类型选择CPU 密集型:Worker 数 ≈ CPU 核心数(runtime.NumCPU())IO 密集型:Worker 数可以更多(10~100),因为大部分时间在等待 IOChannel 缓冲区大小:太小:任务生产者会频繁阻塞太大:占用过多内存经验值:Worker 数量的 2~5 倍优雅关闭顺序:停止发送新任务(close(jobs))等待 Worker 处理完剩余任务(wg.Wait())关闭结果通道(close(results))错误处理:Result 结构体中包含 Err 字段,让调用者决定如何处理与直接创建 goroutine 的对比:
2026年02月13日
7 阅读
0 评论
0 点赞
1
2