陈哈哈的博客

Go语言重点笔记-深入了解sync.WaitGroup

2018-04-13

Go的WaitGroup与Java的CountDownLatch相似,都是计数同步实现类。用于控制多个线程、Goroutine之间的同步。

一、WaitGroup官方用法

这里是Go官方的使用示例:sync.WaitGroup

1.1 WaitGroup的核心函数 - Add

Add(delta int),增加内部计数器。delta的值可以为正数,也可以负数。
正数,表示增加计数。如果是负数,表示某个Goroutine完成了处理。但如果有初始状态(内部计数器为0)时,设置负值,会导致Pannic。

此外,需要注意调用Add函数时的时机是否正确,如上例代码片段:

for _, url := range urls {
wg.Add(1) /* !!!!注意 */
go func(url string) {
/* ... */
}
/* ... */
}
// ...

go关键字在调度goroutine时需要一定时间。如果将Add(1)函数调用放在goroutine执行体中调用,会导致wg.Wait()的阻塞等待失败。
原因是Add(1)还没有被调用,wg.Wait()就已经执行完成。

1.2 WaitGroup的核心函数 - Done

Done这个函数比较简单,它是 Add(-1)——计数器减少1 的简写。见官方WaitGroup的源码:

/* Done decrements the WaitGroup counter by one. */
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

1.3 WaitGroup的核心函数 - Wait

任何Goroutine在调用Wait函数时,阻塞等待,直到内部计数器变为0时才返回。

二、先造个轮子:20行代码实现WaitGroup

造轮子有助于我们理解WaitGroup的基本原理。

假设一下,如果让我们来实现一个类似WaitGroup的同步计数类,那该要如何实现呢?

同步计数要解决两个问题:

  1. 多Goroutine读写计数器;
  2. 阻塞等待;

对于问题1:我们可以采用int 32和atomic包来处理多Goroutine读写问题;
问题2,我们可以利用 chan 在通道为空时阻塞,关闭后解除阻塞的特性;

先开始造轮子,简单20行代码开搞,如下:

测试看看,效果如何:

输出如下:

-> 9
-> 1
-> 5
-> 7
-> 8
-> 2
-> 3
-> 0
-> 4
-> 6

看样子,这20行代码能达成我们预想的效果。WaitGroup内部实现肯定不会这么简单,它内部实现考虑了性能、内存布局等各个因素。

三、进入WaitGroup内部

先列出我们要关注的几个问题:

  1. 它的结构如何?
  2. 它的Wait阻塞是如何实现的?
  3. 它能提供多大的计数器范围?

WaitGroup内部结构

很简单的三个字段:

注解 noCopy

noCopy may be embedded into structs which must not be copied after the first use.

这是个新奇的东西,意思是在Struct中嵌入此注解,在首次使用后,不可被复制。这是个什么东西?

计数器状态 state1

这是个12字节数组(64bit):高位32位是计数器,低位32位是阻塞等待计数器;
注释中还有段关于编译器内存对齐的说明,这个先不管它。

64-bit atomic operations require 64-bit alignment, but 32-bit
compilers do not ensure it. So we allocate 12 bytes and then use
the aligned 8 bytes in them as state.

翻译如下:

64位原子操作需要对齐64位比特,但32位的编译器并不确保此操作。
因此,我们可以申请一个12字节的数组,然后使用他们的8字节(32位)作为状态值。

由此可知,WaitGroup可以提供的计数器总量为:2^32

信号量 sema

WaitGroup内部使用信号量机制来实现计数同步。

分别在AddWait中使用:

其中,Add的内部代码,删除Race相关代码,删除临界值判断代码,留下核心:

Wait内部代码,删除Race相关代码,删除临界值判断代码,留下核心的:

CAS(CompareAndSwap 比较并交换)算法:

如上例子代码,WaitGroup内部通过一个for循环来重复判断statepstate状态的变更情况。

CAS 算法在CPU硬件级别对于并发操作做支持,它有三个操作数:

  • 内存值 V
  • 预估值 A
  • 更新值 B

算法的逻辑是:

当且仅当 V == A 时, 将B赋值给V; 否则,不执行任何操作。

先来看看信号量的两个函数的意义:

  • runtime_Semacquire 阻塞等待,直到信号量大于0(有可用资源),然后自动减少信号量(消费);
  • runtime_Semrelease 自动增加信号量(生产),通知唤醒被runtime_Semacquire阻塞的Goroutine;

在Goroutine中调用Wait,会增加WaitGroup的阻塞计数器数量,同时阻塞等待信号量大于0;
当每个Goroutine调用Done(即Add(-1))时,减少计数,唤醒阻塞Goroutine。