sync.Cond 条件变量是 Golang 标准库 sync 包中的一个常用类。sync.Cond 往往被用在一个或一组 goroutine 等待某个条件成立后唤醒这样的场景,例如常见的生产者消费者场景。

本文将基于 go-1.13 的源码 分析 sync.Cond 源码,将会涉及以下知识点:

  • sync.Cond 的基本用法
  • sync.Cond 的底层结构及原理分析
  • sync.Cond 的惯用法及使用注意事项
sync.Cond 的基本用法

在正式讲 sync.Cond 的原理之前,我们先看下 sync.Cond 是如何使用的。这里我给出了一个非常简单的单生产者多消费者的例子,代码如下:

var mutex = sync.Mutex{}
var cond = sync.NewCond(&mutex)

var queue []int

func producer() {
	i := 0
	for {
		mutex.Lock()
		queue = append(queue, i)
		i++
		mutex.Unlock()

		cond.Signal()
		time.Sleep(1 * time.Second)
	}
}

func consumer(consumerName string) {
	for {
		mutex.Lock()
		for len(queue) == 0 {
			cond.Wait()
		}

		fmt.Println(consumerName, queue[0])
		queue = queue[1:]
		mutex.Unlock()
	}
}

func main() {
	// 开启一个 producer
	go producer()

	// 开启两个 consumer
	go consumer("consumer-1")
	go consumer("consumer-2")

	for {
		time.Sleep(1 * time.Minute)
	}
}
sync.Mutex
sync.Cond
sync.Cond
sync.NewCond(l Locker)cond.Wait()cond.Wait()cond.Signal()BroadcastSignaBroadcast
sync.Cond
sync.Cond 底层原理分析

底层数据结构

sync.Cond
type Cond struct {
	noCopy noCopy

	// L is held while observing or changing the condition
	L Locker

	notify  notifyList
	checker copyChecker
}
notifyList
type notifyList struct {
    wait uint32
	notify uint32

	// List of parked waiters.
	lock mutex
	head *sudog
	tail *sudog
}

以上代码中,notifyList 包含两类属性:

waitnotifysync.Condheadtailsync.Cond

sync.Cond notifyList 结构

Wait 操作

sync.CondWait
func (c *Cond) Wait() {
	c.checker.check()
	// 获取ticket
	t := runtime_notifyListAdd(&c.notify)
	// 注意这里,必须先解锁,因为 runtime_notifyListWait 要切走 goroutine
	// 所以这里要解锁,要不然其他 goroutine 没法获取到锁了
	c.L.Unlock()
	// 将当前 goroutine 加入到 notifyList 里面,然后切走 goroutine
	runtime_notifyListWait(&c.notify, t)
	// 这里已经唤醒了,因此需要再度锁上
	c.L.Lock()
}

Wait 函数虽然短短几行代码,但里面蕴含了很多重要的逻辑。整个逻辑可以拆分为 4 步:

runtime_notifyListAddnotifyListwait
func notifyListAdd(l *notifyList) uint32 {
	return atomic.Xadd(&l.wait, 1) - 1
}
c.L.Unlock()runtime_notifyListWaitgopark
runtime_notifyListWaitnotifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lock(&l.lock)

	...

	s := acquireSudog()
	s.g = getg()
	s.ticket = t

	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s

	// go park 切走 goroutine
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)

	// 注意:这个时候,goroutine 已经切回来了, 释放 sudog
	releaseSudog(s)
}
notifyList
SignalBroadcastc.L.Lock()

以上就是 sync.Cond 的 Wait 过程,可以简单用下图表示:

sync.Cond wait 过程

Signal:唤醒最早 Wait 的 goroutine

Signal

我们接下来看下,Signal 是如何唤醒 goroutine 以及如何实现 FIFO 式的唤醒。

代码如下:

func (c *Cond) Signal() {
	runtime_notifyListNotifyOne(&c.notify)
}

func notifyListNotifyOne(l *notifyList) {
	// 如果二者相等,说明没有需要唤醒的 goroutine
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lock(&l.lock)

	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// Update the next notify ticket number.
	atomic.Store(&l.notify, t+1)

	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil

			// 唤醒 goroutine
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}
notifyList.waitnotifyList.wait
notifynotifynotifyListticketnotify
notifyList
notifyListticketnotifyListnotifyListnotifyList

Because g’s queue separately from taking numbers, there may be minor reorderings in the list.

ticket
notifyListO(n)
sync.Cond 的惯用法及使用注意事项

sync.Cond 在使用时还是有一些需要注意的地方,否则使用不当将造成代码错误。

panic("sync.Cond is copied")panic("sync: unlock of unlocked mutex")
c.L.Lock()
for !condition() {
       c.Wait()
}
... make use of condition ...
c.L.Unlock()
  1. Wait 调用的条件检查一定要放在 for 循环中,代码如上。这是因为当 Boardcast 唤醒时,有可能其他 goroutine 先于当前 goroutine 唤醒并抢到锁,导致轮到当前 goroutine 抢到锁的时候,条件又不再满足了。因此,需要将条件检查放在 for 循环中。
  2. Signal 和 Boardcast 两个唤醒操作不需要加锁。