我们在写golang代码时,经常会用到sync.Mutex实现加锁解锁,对于Lock方法是如何实现goroutine的阻塞与唤醒的,我感到非常好奇,便去runtime包中看Lock与UnLock的源码,在这里做一些笔记与记录。

var mu = new(sync.Mutex)
func test() {
	mu.Lock()
	defer mu.Unlock()
	...
}

这是一段mutex的加锁解锁代码,当多线程同时执行这段代码时,能保证我们临界区的代码是顺序执行的。新来的goroutine可能直接获取到锁,也可能被阻塞,这里探究一下goroutine是如何被阻塞和唤醒的。由于mutex实现复杂,牵扯知识很多,我们暂时只关注我们的这一部分,其他的先忽略。

mutex的结构

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema  uint32
}
  • state是锁的状态,有未锁定、woken、饥饿模式等,并且是用state的二进制位来标识的,不同模式下会有不同的处理方式,这里我们暂时不去深究。
  • sema是信号量,goroutine的阻塞与唤醒是通过这个变量来实现的。

Lock()方法

Lock()方法实现比较复杂,大量用到cas(compareAndSwap)机制、spin(自旋)、状态转换、信号量等,在锁竞争不那么激烈时,Lock会采用自旋的方式来获取锁,这种方式是高效的,不会有goroutine之间的切换调度,但是条件比较苛刻;当不能自旋或自旋获取不到锁时,我们的goroutine会阻塞在Lock()方法上,等其他goroutine调用Unlock方法后,我们的goroutine可能会被唤醒。下面看一下Lock()方法中阻塞的关键代码:

runtime_SemacquireMutex(&m.sema, queueLifo)

这个方法的实现在runtime包中,被链接了过来,我们在runtime包中找到其实现:

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
	gp := getg()
	if gp != gp.m.curg {
		throw("semacquire not on the G stack")
	}

	// Easy case.
	if cansemacquire(addr) {
		return
	}
	// sudog是g的等待队列,是双向链表,这里给当前的g创建一个sudog
	s := acquireSudog()
	// 根据addr获取全局平衡二叉树的root节点,addr是Mutex结构中的sema信号,
	root := semroot(addr)
	t0 := int64(0)
	s.releasetime = 0
	s.acquiretime = 0
	s.ticket = 0
	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = -1
	}
	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
		if t0 == 0 {
			t0 = cputicks()
		}
		s.acquiretime = t0
	}
	for {
		lock(&root.lock)
		// Add ourselves to nwait to disable "easy case" in semrelease.
		atomic.Xadd(&root.nwait, 1)
		// Check cansemacquire to avoid missed wakeup.
		if cansemacquire(addr) {
			atomic.Xadd(&root.nwait, -1)
			unlock(&root.lock)
			break
		}
		// Any semrelease after the cansemacquire knows we're waiting
		// (we set nwait above), so go to sleep.
		// 这里将sudog节点入队,这个队列实际上是平衡二叉树,阻塞在我们的锁上的g都会先存储在这个队列上。
		root.queue(addr, s, lifo)
		// 调用gopark,将m与g关联去掉,m得不到操作系统线程运行权,则表现为阻塞状态。
		goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
		if s.ticket != 0 || cansemacquire(addr) {
			break
		}
	}
	if s.releasetime > 0 {
		blockevent(s.releasetime-t0, 3)
	}
	releaseSudog(s)
}

可以看到,我们的goroutine是被封装成了sudog的结构,然后根据addr(也就是mutex中的sema变量)添加到了一个全局的平衡二叉树中,然后调用gopark方法阻塞,gopark方法是比较重要的方法,gopark底层会调用dropg方法,该方法会解除g与m的关联(可以参考golang的GPM模型,gopark的实现比较复杂,感兴趣的可以自行查询),m(machine)是操作系统的线程,解除关联后,g就没有运行的线程,我们goroutine的表现就是阻塞了。
关键的gopark方法:

// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
	mp := acquirem()
	gp := mp.curg
	status := readgstatus(gp)
	if status != _Grunning && status != _Gscanrunning {
		throw("gopark: bad g status")
	}
	mp.waitlock = lock
	mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
	releasem(mp)
	// can't do anything that might move the G between Ms here.
	mcall(park_m)
}

第一行的注释即说明了gopark的作用,gopark会调用dropg方法,dropg会解除m与g的关联(下方第一行注释):

// dropg removes the association between m and the current goroutine m->curg (gp for short).**
// Typically a caller sets gp's status away from Grunning and then
// immediately calls dropg to finish the job. The caller is also responsible
// for arranging that gp will be restarted using ready at an
// appropriate time. After calling dropg and arranging for gp to be
// readied later, the caller can do other work but eventually should
// call schedule to restart the scheduling of goroutines on this m.
func dropg() {
	_g_ := getg()

	setMNoWB(&_g_.m.curg.m, nil)
	setGNoWB(&_g_.m.curg, nil)
}

好,我们已经搞清了Lock方法阻塞的大体流程,当然很多细节忽略了,这也是可以接受的。首先如果mutex被其他goroutine加锁了,我们的goroutine如果不满足自旋的条件,获取自旋没有获取到锁,go会让我们的goroutine陷入阻塞。

如何使g陷入阻塞呢?因为当前的g是在运行的,也就是绑定了m的,所以肯定要解除m与g的关联,调用gopark方法来实现。另外g陷入阻塞了,我们要保存这个g的信息,等以后可以被唤醒的时候,我们要继续执行g,保存到哪里呢?从上面的代码可以看到,g被封装成sudog节点,sudog是双向链表结构、g的等待队列,同时这个sudog节点根据mutex中的sema变量,被添加到了全局的平衡二叉树中保存。

那这个g是什么时候被唤醒的呢,以及如何被唤醒的呢?当我们的g陷入阻塞,肯定不是自己去唤醒自己,只能其他g来唤醒。被唤醒之后,Lock方法才可能返回,我们的Lock()方法什么时候返回呢,自然是其他g调用UnLock方法释放锁之后,也就是说,是调用UnLock方法的g,负责唤醒我们阻塞的g。

那么我们看下UnLock方法的实现:

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true)
	}
}

runtime_Semrelease这个方法会负责唤醒阻塞的g,传入sema变量,用于在上面全局平衡二叉树中寻找在当前mutex上阻塞的sudog,也就能找到阻塞的g。我们看下runtime包中的实现。

func semrelease1(addr *uint32, handoff bool) {
// 根据传入的sema,找到全局平衡二叉树的根节点
	root := semroot(addr)
	atomic.Xadd(addr, 1)

	// Easy case: no waiters?
	// This check must happen after the xadd, to avoid a missed wakeup
	// (see loop in semacquire).
	if atomic.Load(&root.nwait) == 0 {
		return
	}

	// Harder case: search for a waiter and wake it.
	lock(&root.lock)
	if atomic.Load(&root.nwait) == 0 {
		// The count is already consumed by another goroutine,
		// so no need to wake up another goroutine.
		unlock(&root.lock)
		return
	}
	// Lock方法中是queue,这里出队dequeue,将sudog从树中找出。
	s, t0 := root.dequeue(addr)
	if s != nil {
		atomic.Xadd(&root.nwait, -1)
	}
	unlock(&root.lock)
	if s != nil { // May be slow, so unlock first
		acquiretime := s.acquiretime
		if acquiretime != 0 {
			mutexevent(t0-acquiretime, 3)
		}
		if s.ticket != 0 {
			throw("corrupted semaphore ticket")
		}
		if handoff && cansemacquire(addr) {
			s.ticket = 1
		}
		readyWithTime(s, 5) // 里面会调用goready方法
	}
}
func readyWithTime(s *sudog, traceskip int) {
	if s.releasetime != 0 {
		s.releasetime = cputicks()
	}
	goready(s.g, traceskip)
}

可以看到,readyWithTime方法调用了goready方法,这个方法同gopark方法一样,也是很重要的方法,goready函数相比gopark函数来说简单一些,主要功能就是唤醒某一个goroutine,该协程转换到runnable的状态,并将其放入P的local queue(runq),等待调度。

func goready(gp *g, traceskip int) {
	systemstack(func() {
		ready(gp, traceskip, true)
	})
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
	if trace.enabled {
		traceGoUnpark(gp, traceskip)
	}

	status := readgstatus(gp)

	// Mark runnable.
	_g_ := getg()
	_g_.m.locks++ // disable preemption because it can be holding p in a local var
	if status&^_Gscan != _Gwaiting {
		dumpgstatus(gp)
		throw("bad g->status in ready")
	}

	// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
	casgstatus(gp, _Gwaiting, _Grunnable)
	runqput(_g_.m.p.ptr(), gp, next) // 放入当前P的runq队列中,等待调度。
	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
		wakep()
	}
	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
}

阻塞的g被放入P的runq中,P与M关联后,会调度g的运行,当g与M关联后,被阻塞的线程被唤醒了,可能去与新来的g重新竞争锁,获取到锁后,Lock方法返回。

实际上mutex将锁的状态划分的比较细,分为饥饿模式和正常模式,不同模式下有不同的策略,比如插入等待队列的顺序、UnLock是唤醒策略等等。现在深入go runtime底层,我们只是海边拾贝,runtime中的每一个数据结构、方法,都是一座庞大的冰山,我们只是了解其冰山一角,甚至这些冰山一角,我们也忽略了好多。

以上就是对于mutex实现goroutine的阻塞与唤醒的初探,如果有表达有误或者错误的地方,请直接评论交流指出,谢谢!