sync.Mutexsema.gosleepwakeup
type semaRoot struct {
 lock  mutex
 treap *sudog // root of balanced tree of unique waiters.
 nwait uint32 // Number of waiters. Read w/o the lock.
}
semaRootmutexmutex

基本使用

sema.gosemacquire1mutex
  // Easy case.
 if cansemacquire(addr) {
  return
 }
... ...
lockWithRank(&root.lock, lockRankRoot)
... ...
  if cansemacquire(addr) {
....
   unlock(&root.lock)
   break
  }
... ...
  goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
// goparkunlock里的钩子里藏着unlock方法调用
func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
 unlock((*mutex)(lock))
 return true
}

上述逻辑正如之前所说:

CASCASsync.Mutexsema.gomutexCASgopark

那么Go的运行时排他锁是如何实现的呢?先来看看其数据结构

数据结构

mutexlockRankStructuintpteratomic.Cas
// 互斥锁。
// 零值mutex是未上锁状态(不需要初始化锁)
// 初始化有利于静态锁定级 但是不是必须的
type mutex struct {
 // 如果禁用了锁定级则为空结构体否则包括锁定等级
 lockRankStruct
  // 基于futex的实现将其看作uint32的key,
  // 而基于sema的实现将其看作M* waitm
  // 过去曾经是一个union,但是unions破坏了精确的GC
 key uintptr
}

GOEXPERIMENT=staticlockrankinglockRanStructlockWithRank()lock()
mutexLockUnlock
dragonfly freebsd linux
// 从原子上讲,如果 *addr == val { sleep },可能发生虚假唤醒,但是这是被允许的
// 休眠时间不超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果有任何进程在addr上休眠,则最多唤醒cnt
futexwakeup(addr *uint32, cnt uint32)
aix darwin nacl netbsd openbsd plan9 solaris windows
// 如果尚未创建mp信号量,则为其创建一个
func semacreate(mp *m)
// 如果ns<0,则获取m的信号量并返回0
// 如果ns>=0,则在ns纳秒内尝试获取m的信号量
// 如果获取了信号量返回0 如果中断或超时返回-1
func semasleep(ns int64) int32
//唤醒已经或即将在其信号量上休眠的mp
func semawakeup(mp *m)
js,wasm
futex

Lock

// +build !goexperiment.staticlockrankingstaticlockranking// +build dragonfly freebsd linux
//lock_futex.go
func lock(l *mutex) {
 lockWithRank(l, getLockRank(l))
}
//go:nosplit
func lockWithRank(l *mutex, rank lockRank) {
 lock2(l)
}
func lock2(l *mutex) {
 gp := getg()

 if gp.m.locks < 0 {
  throw("runtime·lock: lock count")
 }
  //g绑定的m的lock属性自增加一
 gp.m.locks++

 // 投机抢占锁 运气好抢到直接返回 不需要进行内核调用
 v := atomic.Xchg(key32(&l.key), mutex_locked)
 if v == mutex_unlocked {
  return
 }

 // wait为MUTEX_LOCKED还是MUTEX_SLEEPING取决于此互斥对象mutex是否存在线程在休眠
  // 如果我们曾经将l->key从MUTEX_SLEEPING更改为其他值,则在返回之前必须小心将其更改回MUTEX_SLEEPING,
  // 以确保休眠线程得到其唤醒调用
 wait := v

 // 单处理器上 无自旋
 // 多处理器上,自旋进行ACTIVE_SPIN尝试
 spin := 0
 if ncpu > 1 {
  spin = active_spin
 }
 for {
  // 尝试锁定 自旋
  for i := 0; i < spin; i++ {
   for l.key == mutex_unlocked {
    if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
     return
    }
   }
   procyield(active_spin_cnt)
  }

  // 尝试锁定 重新调度
  for i := 0; i < passive_spin; i++ {
   for l.key == mutex_unlocked {
    if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
     return
    }
   }
   osyield()
  }

  // 休眠
  v = atomic.Xchg(key32(&l.key), mutex_sleeping)
  if v == mutex_unlocked {
   return
  }
  wait = mutex_sleeping
  futexsleep(key32(&l.key), mutex_sleeping, -1)
 }
}
procyieldPAUSE
active_spin_cnt = 30
procyield(active_spin_cnt)

// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX // 将30放入AX寄存器
again:
	PAUSE
	SUBL	$1, AX // 每次减一
	JNZ	again
	RET
osyield()
osyield()

//sys_linux_amd64.s
TEXT runtime·osyield(SB),NOSPLIT,$0
	MOVL	$SYS_sched_yield, AX
	SYSCALL
	RET
Lock
atomic.Xchgmutex.keymutex_lockedmutex_unlockedatomic.CasPAUSEosyield()futexsleep()
Lockfutexsleep()
futexsleep()
//src/runtime/os_linux.go
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
 // Some Linux kernels have a bug where futex of
 // FUTEX_WAIT returns an internal error code
 // as an errno. Libpthread ignores the return value
 // here, and so can we: as it says a few lines up,
 // spurious wakeups are allowed.
 if ns < 0 {
  futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
  return
 }

 var ts timespec
 ts.setNsec(ns)
 futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

//go:noescape
func futex(addr unsafe.Pointer, op int32, val uint32, ts, addr2 unsafe.Pointer, val3 uint32) int32
futex_sleep()futex()
// src/runtime/sys_linux_amd64.s
// int64 futex(int32 *uaddr, int32 op, int32 val,
//	struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
	MOVQ	addr+0(FP), DI
	MOVL	op+8(FP), SI
	MOVL	val+12(FP), DX
	MOVQ	ts+16(FP), R10
	MOVQ	addr2+24(FP), R8
	MOVL	val3+32(FP), R9
	MOVL	$SYS_futex, AX
	SYSCALL
	MOVL	AX, ret+40(FP)
	RET
runtime.futex$SYS_futexAXSYSCALLfutex
linuxfutex

从Linux内核看futex

啥是futex?

Fast Userspace muTEX

futex出现之前,linux的同步机制主要分为两类:

  • 用户态的同步机制 利用原子指令实现自旋锁,但不适用于临界区执行时间过长的场景
  • 内核同步机制 如semaphore等,使用自旋+等待的形式,lock和unlock都是系统调用,即使没有冲突也要通过系统调用进入内核之后才能识别。

理想状态下,在无冲突的情况下在用户空间通过自旋锁解决,如果仍未拿到锁资源需要等待时再通过系统调用完成sleep和wake的语义。那么自旋失败的情况下,将当前线程挂起,然后是不是由持有锁的线程释放锁的时候再将其唤醒呢?带着这个疑问我们一起粗略看看linux内核如何实现futex的吧

数据结构

kernel/futex.cinclude/linux/futex.hfutexJavafutex_hash_bucketfutex_q
/**
 * struct futex_q - futex等待队列实体,每个等待任务分配一个
 * @list:  等待在futex上带优先级链表
 * @task:  等待在futex上的任务
 * @lock_ptr:  哈希桶自旋锁指针
 * @key:  the key the futex is hashed on
 * @pi_state:  optional priority inheritance state
 * @rt_waiter:  rt_waiter storage for use with requeue_pi
 * @requeue_pi_key: the requeue_pi target futex key
 * @bitset:  bitset for the optional bitmasked wakeup
 *
 * We use this hashed waitqueue, instead of a normal wait_queue_t, so
 * we can wake only the relevant ones (hashed queues may be shared).
 *
 * A futex_q has a woken state, just like tasks have TASK_RUNNING.
 * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
 * The order of wakeup is always to make the first condition true, then
 * the second.
 *
 * PI futexes are typically woken before they are removed from the hash list via
 * the rt_mutex code. See unqueue_me_pi().
 */
struct futex_q {
 struct plist_node list;// 等待在futex上按照优先级排序的任务列表

 struct task_struct *task; //等待在futex上的任务
 spinlock_t *lock_ptr; //哈希桶自旋锁指针
 union futex_key key; //futex进行hash的key
 struct futex_pi_state *pi_state; //可选的 优先级继承状态 控制优先级继承
 struct rt_mutex_waiter *rt_waiter;//
 union futex_key *requeue_pi_key;
 u32 bitset; //掩码匹配
};
/*
 * Hash buckets are shared by all the futex_keys that hash to the same
 * location.  Each key may have multiple futex_q structures, one for each task
 * waiting on a futex.
 */
struct futex_hash_bucket {
 spinlock_t lock;
 struct plist_head chain;
};
static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];

futex

外部调用入口

mutexfutexfutex
#include <linux/futex.h>
#include <stdint.h>
#include <sys/time.h>

long futex(uint32_t *uaddr, int futex_op, uint32_t val,
           const struct timespec *timeout,   /* or: uint32_t val2 */
           uint32_t *uaddr2, uint32_t val3);
futex*uaddrfutexvalflags*timeout*uaddr2val3futex_opFUTEX_WAITFUTEX_WAKE
kernel/futex.cdo_futexdo_futexopcmd
//kernel/futex.c
SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
  struct timespec __user *, utime, u32 __user *, uaddr2,
  u32, val3)
{
 struct timespec ts;
 ktime_t t, *tp = NULL;
 u32 val2 = 0;
 int cmd = op & FUTEX_CMD_MASK;

 if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||
        cmd == FUTEX_WAIT_BITSET ||
        cmd == FUTEX_WAIT_REQUEUE_PI)) {
  if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
   return -EFAULT;
  if (!timespec_valid(&ts))
   return -EINVAL;

  t = timespec_to_ktime(ts);
  if (cmd == FUTEX_WAIT)
   t = ktime_add_safe(ktime_get(), t);
  tp = &t;
 }
 /*
  * requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.
  * number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.
  */
 if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||
     cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)
  val2 = (u32) (unsigned long) utime;

 return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}

long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
  u32 __user *uaddr2, u32 val2, u32 val3)
{
 int cmd = op & FUTEX_CMD_MASK;
 unsigned int flags = 0;

 if (!(op & FUTEX_PRIVATE_FLAG))
  flags |= FLAGS_SHARED;

 if (op & FUTEX_CLOCK_REALTIME) {
  flags |= FLAGS_CLOCKRT;
  if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
   return -ENOSYS;
 }

 switch (cmd) {
 case FUTEX_LOCK_PI:
 case FUTEX_UNLOCK_PI:
 case FUTEX_TRYLOCK_PI:
 case FUTEX_WAIT_REQUEUE_PI:
 case FUTEX_CMP_REQUEUE_PI:
  if (!futex_cmpxchg_enabled)
   return -ENOSYS;
 }

 switch (cmd) {
 case FUTEX_WAIT:
  val3 = FUTEX_BITSET_MATCH_ANY;
 case FUTEX_WAIT_BITSET:
      // 若*uaddr==val则阻塞当前进程/线程 放入阻塞队列
  return futex_wait(uaddr, flags, val, timeout, val3);
 case FUTEX_WAKE:
  val3 = FUTEX_BITSET_MATCH_ANY;
 case FUTEX_WAKE_BITSET:
      // 最多唤醒val个在uaddr等待的线程/进程
  return futex_wake(uaddr, flags, val, val3);
 case FUTEX_REQUEUE:
  return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
 case FUTEX_CMP_REQUEUE:
  return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
 case FUTEX_WAKE_OP:
  return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
 case FUTEX_LOCK_PI:
  return futex_lock_pi(uaddr, flags, val, timeout, 0);
 case FUTEX_UNLOCK_PI:
  return futex_unlock_pi(uaddr, flags);
 case FUTEX_TRYLOCK_PI:
  return futex_lock_pi(uaddr, flags, 0, timeout, 1);
 case FUTEX_WAIT_REQUEUE_PI:
  val3 = FUTEX_BITSET_MATCH_ANY;
  return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
          uaddr2);
 case FUTEX_CMP_REQUEUE_PI:
  return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
 }
 return -ENOSYS;
}
futexsleep()futexwakeup()
// 原子上,如果 *addr == val { sleep } ;虚假唤醒是被允许的;不会阻塞超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果任何线程阻塞在addr上,则唤醒至少cnt个阻塞的任务
futexwakeup(addr *uint32, cnt uint32) 
futex()futex_opFUTEX_WAITFUTEX_WAKE

futex_wait

futex()futex_opFUTEX_WAIT方法。主要逻辑为判断指向的值和值若相等,则将当前线程/进程阻塞在所指向的
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
        ktime_t *abs_time, u32 bitset)
{
 struct hrtimer_sleeper timeout, *to = NULL;
 struct restart_block *restart;
 struct futex_hash_bucket *hb;
 struct futex_q q = futex_q_init;
 int ret;
...
 // 设置定时任务,超时abs_time后线程仍未被唤醒,则由定时任务唤醒它
 if (abs_time) {
  to = &timeout;
  hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
          CLOCK_REALTIME : CLOCK_MONOTONIC,
          HRTIMER_MODE_ABS);
  hrtimer_init_sleeper(to, current);
  hrtimer_set_expires_range_ns(&to->timer, *abs_time,
          current->timer_slack_ns);
 }

retry:
  // 准备等待在uaddr上 如果成功则持有hash_bucket的锁
  // 比较uaddr和期望值val是否相等 并初始化q.key
 ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
 if (ret)//val被改变直接返回
  goto out;
  // 插入futex等待队列,等待超时或通过信号来唤醒
 futex_wait_queue_me(hb, &q, to);

 /* If we were woken (and unqueued), we succeeded, whatever. */
 ret = 0;
 /* unqueue_me() drops q.key ref */
  // 若unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移除等待队列,所以这里会失败)
 if (!unqueue_me(&q))
  goto out;
 ret = -ETIMEDOUT;
 if (to && !to->task)
  goto out;

 /*
  * We expect signal_pending(current), but we might be the
  * victim of a spurious wakeup as well.
  */
 if (!signal_pending(current))
  goto retry;

 ret = -ERESTARTSYS;
 if (!abs_time)
  goto out;

 restart = &current_thread_info()->restart_block;
 restart->fn = futex_wait_restart;
 restart->futex.uaddr = uaddr;
 restart->futex.val = val;
 restart->futex.time = abs_time->tv64;
 restart->futex.bitset = bitset;
 restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

 ret = -ERESTART_RESTARTBLOCK;

out:
 if (to) {
    // 取消定时任务
  hrtimer_cancel(&to->timer);
  destroy_hrtimer_on_stack(&to->timer);
 }
 return ret;
}


futex_wait()
futex_wait_setup(uaddr, val, flags, &q, &hb)uaddrfutex_hash_bucketfutex_q.keyfutex_wait_queue_me(hb, &q, to)
/**
 * futex_wait_setup() - 准备等待在一个futex变量上
 * @uaddr: futex用户空间地址
 * @val: 期望的值
 * @flags: futex标识 (FLAGS_SHARED, etc.)
 * @q:  关联的futex_q
 * @hb:  hash_bucket的指针 返回给调用者
 *
 * Setup the futex_q and locate the hash_bucket.  Get the futex value and
 * compare it with the expected value.  Handle atomic faults internally.
 * Return with the hb lock held and a q.key reference on success, and unlocked
 * with no q.key reference on failure.
 *
 * Return:
 *  0 - uaddr contains val and hb has been locked;
 * <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
 */
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
      struct futex_q *q, struct futex_hash_bucket **hb)
{
 u32 uval;
 int ret;

 /*
  * Access the page AFTER the hash-bucket is locked.
  * Order is important:
  *
  *   Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
  *   Userspace waker:  if (cond(var)) { var = new; futex_wake(&var); }
  *
  * The basic logical guarantee of a futex is that it blocks ONLY
  * if cond(var) is known to be true at the time of blocking, for
  * any cond.  If we locked the hash-bucket after testing *uaddr, that
  * would open a race condition where we could block indefinitely with
  * cond(var) false, which would violate the guarantee.
  *
  * On the other hand, we insert q and release the hash-bucket only
  * after testing *uaddr.  This guarantees that futex_wait() will NOT
  * absorb a wakeup if *uaddr does not match the desired values
  * while the syscall executes.
  */
retry:
   //初始化futex_q 填充q->key
 ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
 if (unlikely(ret != 0))
  return ret;

retry_private:
  //1.对q->key进行hash 然后通过&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]找到futex_hash_bucket
  //2. spin_lock(&hb->lock)获得自旋锁
 *hb = queue_lock(q);
 //原子的将uaddr的值拷贝到uval中
 ret = get_futex_value_locked(&uval, uaddr);
 if (ret) { //拷贝操作失败 重试
    ...
  goto retry;
 }
 // 如果uval的值(即uaddr)不等于期望值val 则表明其他线程在修改
  // 直接返回无需等待
 if (uval != val) {
  queue_unlock(q, *hb);
  ret = -EWOULDBLOCK;
 }

out:
 if (ret)
  put_futex_key(&q->key);
 return ret;
}
futex_wait_setup()
futex_qfutex_q.keyfutex_q.key&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]futex_hash_bucket*uaddrval-EWOULDBLOCK\#define EWOULDBLOCK 246 /* Operation would block (Linux returns EAGAIN) */
/**
 * futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
 * @hb:  the futex hash bucket, must be locked by the caller
 * @q:  the futex_q to queue up on
 * @timeout: the prepared hrtimer_sleeper, or null for no timeout
 */
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
    struct hrtimer_sleeper *timeout)
{
  // task状态保证在另一个task唤醒它之前被设置,set_current_state利用set_mb()实现
  // 设置task状态为TASK_INTERRUPTIBLE CPU只会调度状态为TASK_RUNNING的任务
 set_current_state(TASK_INTERRUPTIBLE);
  //将q插入到等待队列中然后释放自旋锁
 queue_me(q, hb);
 //启动定时器
 if (timeout) {
  hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
  if (!hrtimer_active(&timeout->timer))
   timeout->task = NULL;
 }

 /*
  * If we have been removed from the hash list, then another task
  * has tried to wake us, and we can skip the call to schedule().
  */
 if (likely(!plist_node_empty(&q->list))) {
  /*
   * If the timer has already expired, current will already be
   * flagged for rescheduling. Only call schedule if there
   * is no timeout, or if it has yet to expire.
   */
    // 未设置过期时间 或过期时间还未到期才进行调度
  if (!timeout || timeout->task)
      //系统重新进行调度,此时CPU会去执行其他任务,当前任务会被阻塞
   schedule();
 }
  // 走到这里说明当前任务被CPU选中执行
 __set_current_state(TASK_RUNNING);
}
futex_wait_queue_me
TASK_INTERRUPTIBLEqueueme()futex_qTASK_RUNNING
futex_waitfutex_wait_setup()futex_q.keyfutex_hash_bucketspin_lock(futex_hash_bucket.lock)futex_wait_queue_me()TASK_INTERRUPTIBLEqueue_me()futex_qspin_unlock(&hb->lock);*uaddr

futex_wake

futex_wake()*uaddr
/*
 * Wake up waiters matching bitset queued on this futex (uaddr).
 */
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
 struct futex_hash_bucket *hb;
 struct futex_q *this, *next;
 struct plist_head *head;
 union futex_key key = FUTEX_KEY_INIT;
 int ret;

 if (!bitset)
  return -EINVAL;
  //根据uaddr的值填充&key的内容
 ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
 if (unlikely(ret != 0))
  goto out;
 //根据&key获取对应uaddr所在的futex_hash_bucket
 hb = hash_futex(&key);
  //对该hb加自旋锁
 spin_lock(&hb->lock);
 head = &hb->chain;
 //遍历该hb的链表 注意链表中存储的节点时plist_node类型,而这里的this却是futex_q类型
  //这种类型转换是通过c中的container_of机制实现的
 plist_for_each_entry_safe(this, next, head, list) {
  if (match_futex (&this->key, &key)) {
   if (this->pi_state || this->rt_waiter) {
    ret = -EINVAL;
    break;
   }

   /* Check if one of the bits is set in both bitsets */
   if (!(this->bitset & bitset))
    continue;
   //唤醒对应进程
   wake_futex(this);
   if (++ret >= nr_wake)
    break;
  }
 }
 //释放自旋锁
 spin_unlock(&hb->lock);
 put_futex_key(&key);
out:
 return ret;
}
wake_futexwake_futexfutex_hash_bucketfutex_q
static void wake_futex(struct futex_q *q)
{
 struct task_struct *p = q->task;

 if (WARN(q->pi_state || q->rt_waiter, "refusing to wake PI futex\n"))
  return;

  // 在唤醒任务之前将q->lock_ptr设置为NULL
  // 如果另一个CPU发生了非futex方式的唤醒,则任务可能退出。p解引用则是一个不存在的task结构体
  // 为避免这种case,需要持有p引用来进行唤醒
 get_task_struct(p);
 //将q出队列
 __unqueue_futex(q);
 /*
  * 只要写入q-> lock_ptr = NULL,等待的任务就可以释放futex_q,而无需获取任何锁。
   * 这里需要一个内存屏障,以防止lock_ptr的后续存储超越plist_del。
  */
 smp_wmb();
 q->lock_ptr = NULL;
 // 将TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE状态的task唤醒
 wake_up_state(p, TASK_NORMAL);
  // task释放futex_q
 put_task_struct(p);
}

futex_wake()
*uaddrfutex_q->keyhash_futex(&key);futex_hash_bucketspin_lock(&hb->lock);futex_hash_bucketwake_futexTASK_RUNNINGfutex_q
futex
mmap

Unlock

Lock
func unlock(l *mutex) {
 unlockWithRank(l)
}
//go:nosplit
func unlockWithRank(l *mutex) {
 unlock2(l)
}
func unlock2(l *mutex) {
 v := atomic.Xchg(key32(&l.key), mutex_unlocked)
 if v == mutex_unlocked {
  throw("unlock of unlocked lock")
 }
 if v == mutex_sleeping {
  futexwakeup(key32(&l.key), 1)
 }

 gp := getg()
 gp.m.locks--
 if gp.m.locks < 0 {
  throw("runtime·unlock: lock count")
 }
 if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
  gp.stackguard0 = stackPreempt
 }
}
Lock
atomic.Xchgmutex.keymutex_unlockedmutex_sleepingfutexwakeup()
futexwakeup()futex()
//src/runtime/os_linux.go
// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
 ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
 if ret >= 0 {
  return
 }

 // I don't know that futex wakeup can return
 // EAGAIN or EINTR, but if it does, it would be
 // safe to loop and call futex again.
 systemstack(func() {
  print("futexwakeup addr=", addr, " returned ", ret, "\n")
 })

 *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}

futexwakeupfutex

总结

futex

刚刚不是用户态的自旋锁已经解决不了问题了吗,你可能会想我们可以这么玩:

func lock() {
 for !tryLock(state) {
  wait() //系统调用 进入休眠
 }
}

func tryLock(state) bool {
 i:=0
 for !compareAndSwap(state, 0, 1) {
  i++
  if i>3 {
   return false
  }
 }
 return true
}
func unlock() {
 compareAndSet(state,1,0)
 wakeup() //系统调用唤醒
}
tryLockwaitlock()wait()unlock()wait()futex

如有异议 请轻拍 欢迎交流。