go源码阅读-sync.waitgroup
前言
sync.waitgroup
是一种用于同步的工具,它允许一组goroutine
在继续执行之前等待其他goroutine
完成。WaitGroup.Done
方法和Wait
方法共同工作来实现这种同步。
WaitGroup.Add
方法用于初始化计数器,表示需要等待的goroutine
的数量。- 每个
goroutine
完成时调用WaitGroup.Done
方法,减少计数器。 WaitGroup.Wait
方法阻塞调用它的goroutine
,直到WaitGroup
维护的计数器为零。(state
字段)
官方文档中有这样一段注释:In the terminology of the Go memory model, a call to WaitGroup.Done “synchronizes before” the return of any Wait call that it unblocks.
synchronizes before
这个术语是Go内存模型(Go Memory Model, GMM
)的一部分,它描述了程序中不同部分之间的内存可见性。
在Go内存模型中,如果一个操作synchronizes before
另一个操作,那么第一个操作对内存的修改对第二个操作是可见的。这意味着,如果一个goroutine
执行了Done方法,那么任何被这个Done调用解除阻塞的Wait调用都能看到Done之前的所有内存修改。
也即: 当你在WaitGroup
上调用Done
时,WaitGroup
计数器减一。如果WaitGroup的
计数器因此变为零,那么所有因为WaitGroup
计数器不为零而阻塞在Wait
调用上的goroutine
将被解除阻塞。根据Go内存模型,这些goroutine
在Wait
返回后能够看到Done
调用之前的所有内存状态,包括对共享变量的修改。
使用
源码解析
sync.WaitGroup
的定义如下,提供了Add|Wait|Done
三个接口。
1 | type WaitGroup struct { |
WaitGroup
中包含三个字段:
noCopy
用以标识WaitGroup
被使用后不允许被copystate
是64位无符号整形数,其中高位32表示计数器,低32位表示等待的个数。后续代码中可以看到这样的操作v := int32(state >> 32)
和w := uint32(state)
;其中>>
表示算数右移。以此获取到state
的高32位,也即计数器值。而低w
则获取其低32位表示阻塞的协程数量。
当counter计数器为0时,wait
操作不会增加等待的协程数量。sema
: 32位无符号数
WaitGroup.Add
WaitGroup.Add
用来增加或者减少计数器,delta
可以是整数也可以是负数。如果调用Add
后,计数器变为零,那么所有因为 Wait
方法而阻塞的 goroutine
将被释放.
如果Add
方法使得计数器为0,则panic
。
调用时机:
- 当计数器为零时,如果
delta
是正数,那么这个Add
调用必须在调用Wait
方法之前发生。因为如果计数器为0时,Wait
方法会立即返回。 - 当计数器大于0,或者
delta
为负数,则可以在任意时刻调用Add
所以,通常而言,Add
方法的调用应该在创建goroutine
或等待其他事件之前执行。这样可以确保WaitGroup
正确地跟踪并发操作的数量.
WaitGroup
可以被重用,但是前提是所有wait
返回后才能被新的Add
调用。
如下是WaitGroup.Add
的源码:
1 | func (wg *WaitGroup) Add(delta int) { |
1-8行,判断当前程序是否允许竞争,如果允许且当前传入的delta
小于0,则调用race.ReleaseMerge
传入当前WaitGroup
的指针。ReleaseMerge
的作用是告知race detector
,当前的减少计数器操作(Release)与 Wait 方法中的等待操作是相关的,需要同步。这样可以确保race detector
能够正确地检测到可能的数据竞争
第6行,关闭数据竞争。
第7行,在函数返回时重新启用数据竞争
第9行, 使用 wg.state.Add(uint64(delta) << 32)
增加计数器的值
第10-11行,分别用v和w
描述计数器和等待的协程数。
第12-14行,如果开启了数据竞争,且delta
大于0,且当前计数器的值等于delta
,则调用race.Read
并传入WaitGroup
实例sema
字段的指针。sema
是用于同步的信号量。race.Read
函数用于告诉race detector
,当前的增加计数器操作(Add 方法)需要与Wait
方法中的等待操作同步。这是因为可能有多个goroutine
同时尝试将WaitGroup
的计数器从0
增加到正数,这种情况下需要确保内存的可见性。
第15-21行,如果 计数器小于0,则panic;如果等待协程数量不为0,且delta>0
并且计数器的值和delta
相同,则panic。
第22行,如果计数器大于0,但是等待协程数为0,则返回。
第23行 检查waitGroup.state
的状态是否符合预期,如果不符合预期,说明有其他goroutine
正在并发地调用Add
方法,这会导致数据竞争将会panic
; ;
第24行将WaitGroup
的状态重置为 0。Store
方法用于安全地更新atomic.Value
类型的变量。此时所有等待的goroutine
都将被释放。
低25-27行 使用 runtime_Semrelease
函数释放等待的goroutine
。runtime_Semrelease
是runtime
包中用于释放信号量的函数。&wg.sema
是WaitGroup
的信号量字段的地址,false 表示这是一个正常的释放操作,0 是一个标志,表示释放的数量。循环会一直执行,直到 w 减到 0 为止。
WaitGroup.Done
通常在使用的时候都是调用Done()减少计数器,其源码如下,实际上就是调用Add
方法,传入delta=-1
。
1 | func (wg *WaitGroup) Done() { |
WaitGroup.Wait
此方法的调用者将会一直阻塞直至WaitGroup
的计数器为0
1 | func (wg *WaitGroup) Wait() { |
源代码和Add
非常类似,使用位运算操作,获取WaitGroup
的计数器和等待的协程个数。如果当前计数器为0,且开启数据竞争,则开启当前线程的数据竞争检测
使用race.Acquire
标记WaitGroup
对象的获取。如果此时计数器的值为0,则直接返回。
否则,尝试将等待计数器的个数加一,如果开启了数据竞争检测,且当前协程是第一个等待协程吗,则使用race.Write
标记信号量sema
的写入,以同步第一个添加操作.
调用runtime
包的Semacquire
函数,阻塞当前goroutine直到信号量sema可用.
检查WaitGroup
的计数器个数是否为0,如果不为0,则表示当前WaitGroup
被重用,抛出panic
最后,如果启用了数据竞争检测,则重新开启数据竞争,再次使用race.Acquire
标记WaitGroup
对象的获取。