sync.Cond 条件变量是 Golang 标准库 sync 包中的一个常用类。sync.Cond 往往被用在一个或一组 goroutine 等待某个条件成立后唤醒这样的场景,例如常见的生产者消费者场景。
本文将基于 go-1.13 的源码 分析 sync.Cond 源码,将会涉及以下知识点:
- sync.Cond 的基本用法
- sync.Cond 的底层结构及原理分析
- sync.Cond 的惯用法及使用注意事项
在正式讲 sync.Cond 的原理之前,我们先看下 sync.Cond 是如何使用的。这里我给出了一个非常简单的单生产者多消费者的例子,代码如下:
var mutex = sync.Mutex{}
var cond = sync.NewCond(&mutex)
var queue []int
func producer() {
i := 0
for {
mutex.Lock()
queue = append(queue, i)
i++
mutex.Unlock()
cond.Signal()
time.Sleep(1 * time.Second)
}
}
func consumer(consumerName string) {
for {
mutex.Lock()
for len(queue) == 0 {
cond.Wait()
}
fmt.Println(consumerName, queue[0])
queue = queue[1:]
mutex.Unlock()
}
}
func main() {
// 开启一个 producer
go producer()
// 开启两个 consumer
go consumer("consumer-1")
go consumer("consumer-2")
for {
time.Sleep(1 * time.Minute)
}
}
sync.Mutex
sync.Cond
sync.Cond
sync.NewCond(l Locker)cond.Wait()cond.Wait()cond.Signal()BroadcastSignaBroadcast
sync.Cond
sync.Cond 底层原理分析
底层数据结构
sync.Cond
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
notifyList
type notifyList struct {
wait uint32
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
以上代码中,notifyList 包含两类属性:
waitnotifysync.Condheadtailsync.Cond
Wait 操作
sync.CondWait
func (c *Cond) Wait() {
c.checker.check()
// 获取ticket
t := runtime_notifyListAdd(&c.notify)
// 注意这里,必须先解锁,因为 runtime_notifyListWait 要切走 goroutine
// 所以这里要解锁,要不然其他 goroutine 没法获取到锁了
c.L.Unlock()
// 将当前 goroutine 加入到 notifyList 里面,然后切走 goroutine
runtime_notifyListWait(&c.notify, t)
// 这里已经唤醒了,因此需要再度锁上
c.L.Lock()
}
Wait 函数虽然短短几行代码,但里面蕴含了很多重要的逻辑。整个逻辑可以拆分为 4 步:
runtime_notifyListAddnotifyListwait
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
c.L.Unlock()runtime_notifyListWaitgopark
runtime_notifyListWaitnotifyListWait
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
...
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// go park 切走 goroutine
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
// 注意:这个时候,goroutine 已经切回来了, 释放 sudog
releaseSudog(s)
}
notifyList
SignalBroadcastc.L.Lock()
以上就是 sync.Cond 的 Wait 过程,可以简单用下图表示:
Signal:唤醒最早 Wait 的 goroutine
Signal
我们接下来看下,Signal 是如何唤醒 goroutine 以及如何实现 FIFO 式的唤醒。
代码如下:
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
// 如果二者相等,说明没有需要唤醒的 goroutine
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lock(&l.lock)
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// Update the next notify ticket number.
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
// 唤醒 goroutine
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
notifyList.waitnotifyList.wait
notifynotifynotifyListticketnotify
notifyList
notifyListticketnotifyListnotifyListnotifyList
Because g’s queue separately from taking numbers, there may be minor reorderings in the list.
ticket
notifyListO(n)
sync.Cond 的惯用法及使用注意事项
sync.Cond 在使用时还是有一些需要注意的地方,否则使用不当将造成代码错误。
panic("sync.Cond is copied")panic("sync: unlock of unlocked mutex")
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
- Wait 调用的条件检查一定要放在 for 循环中,代码如上。这是因为当 Boardcast 唤醒时,有可能其他 goroutine 先于当前 goroutine 唤醒并抢到锁,导致轮到当前 goroutine 抢到锁的时候,条件又不再满足了。因此,需要将条件检查放在 for 循环中。
- Signal 和 Boardcast 两个唤醒操作不需要加锁。