go并发编程-WaitGroup

go并发编程-WaitGroup

使用场景

sync.WaitGroup可以等待一组Goroutine的返回,主goroutine调用Add设置需要等待线程的数量,每个goroutine执行结束后调用Done标志任务已完成,主线程调用Wait阻塞当前线程直到所有goroutine执行完成。 一个常见的使用场景是批量发出RPC或者HTTP请求。

requests := []*Request{...}
wg := &sync.WaitGroup{} // 声明一个WaitGroup变量
wg.Add(len(requests))   // 通过Add设置计数器的值

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done() // 任务执行完成后,调用Done减少计数器的值
    }(request)
}
wg.Wait()
  1. 变量声明
var wg sync.WaitGroup
// wg := &sync.WaitGroup{}

  1. Add

WaitGroup是一个结构体类型,内部通过一个计数器来存储需要等待执行完成的goroutine的数量。首次声明时,这个计数器默认为0,然后我们可以通过Add方法增加或者减少该计数器的值,注意计数器的值不能小于0,否则会触发panic。

Add方法的入参可以是正数,也可以是负数,为正数时增加计数器的值,为负数时则减少计数器的值。

wg.Add(3)
wg.Add(-1)
  1. Done

WaitGroup的Done方法内部实际是调用了WaitGroup的Add(-1)。

wg.Done()
// 等价于 wg.Add(-1)
  1. Wait

当某个线程调用WaitGroup的Wait方法时,它会去检查该WaitGroup的计数器的值,如果计数器为0,则直接返回;如果计数器的值不为0,那么该线程会被阻塞,直到计数器的值变为0时才会唤醒所有被阻塞的线程。

wg.Wait()

实现原理

type WaitGroup struct {
    noCopy noCopy     // sync.WaitGroup不能通过值拷贝的方式传递
    state1 [3]uint32  // 存储状态和信号量
}

noCopy机制

sync.noCopy是一个特殊私有结构体,实现了sync.Locker接口,分析器会在编译期间检查被拷贝的变量中是否包含sync.noCopy或者实现了sync.Locker接口,如果包含该结构体或者实现了sync.Locker就会报错。

type noCopy struct{}
func (*noCopy) Lock{}
func (*noCopy) Unlock()

state1字段处理

WaitGroup分配了12个字节,用来保存当前状态。其中8个字节作为状态位,高32位记录waiter的数量,低32位记录counter的数量,其余四个字节用于存储信号量。

  • counter: 计数器,保存当前waitGroup实际等待gourontine完成的数量;
  • waiter: 当前等待waitGroup任务结束的等待者的数量,也就是调用wait()的次数,一般是1.
  • sema: 信号量,用来唤醒wait()函数。

go并发编程-WaitGroup

我们可以看到,在不同操作系统上,state获取的方式也不同。64位原子操作需要64位对齐,但32位编译器不保证这一点,于是32位机器空出第一个32位,也就保证了后面64位天然满足64位对齐,第一个32位刚好用于存储sema。

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
   if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
      // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
      return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
   } else {
      // 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
      return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
   }
}

Add

ADD主要用于增加需要等待的gouroutine个数。WaitGroup.ADD使用时需要注意:

  1. 不能同时调用Add和Wait;
  2. 当需要等待的gouroutine数量为0时,不能再添加新的waiter。
func (wg *WaitGroup) Add(delta int) {
   statep, semap := wg.state()
   // delta左移32位,将这个值加到counter上.
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   v := int32(state >> 32) // 获取counter的值
   w := uint32(state)      // 获取waiter的值
   if v < 0 {
      panic("sync: negative WaitGroup counter")
   }
   // waiter数量不为0,且累加后的counter值和delta相等
   // 说明Wait()和Add()同时调用了,直接panic。
   if w != 0 && delta > 0 && v == int32(delta) {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   if v > 0 || w == 0 {
      return
   }
   // counter=0,waiter>0
   // 此时不应该有同时发生的状态突变,1. add和wait不能同时调用;2. 如果counter为0,不能再增加waiter
   if *statep != state {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   // 所有的状态位清0
   *statep = 0
   // 当counter为0时,唤醒所有waiter
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}

Done

//Done方法其实就是Add(-1)
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

Wait

Wait方法会在counter>0&&waiter>0时会调用runtime_Semacquire进入睡眠状态,直到最后一个goroutine调用Done方法唤醒wait的goroutine。

func (wg *WaitGroup) Wait() {
   statep, semap := wg.state()
   for {
      state := atomic.LoadUint64(statep)
      v := int32(state >> 32) // 获取waiter当前值
      if v == 0 {
         return
      }
      // 当有多个goroutine同时调用Wait时,可能会产生竞争,for循环重新处理即可.
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
         runtime_Semacquire(semap)
         if *statep != 0 {
            panic("sync: WaitGroup is reused before previous Wait has returned")
         }
         return
      }
   }
}

作者:Koffee
链接:https://juejin.cn/post/7135434070976102414
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。