sync.Mutexsema.gosleepwakeup
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
semaRootmutexmutex
基本使用
sema.gosemacquire1mutex
// Easy case.
if cansemacquire(addr) {
return
}
... ...
lockWithRank(&root.lock, lockRankRoot)
... ...
if cansemacquire(addr) {
....
unlock(&root.lock)
break
}
... ...
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
// goparkunlock里的钩子里藏着unlock方法调用
func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
unlock((*mutex)(lock))
return true
}
上述逻辑正如之前所说:
CASCASsync.Mutexsema.gomutexCASgopark
那么Go的运行时排他锁是如何实现的呢?先来看看其数据结构
数据结构
mutexlockRankStructuintpteratomic.Cas
// 互斥锁。
// 零值mutex是未上锁状态(不需要初始化锁)
// 初始化有利于静态锁定级 但是不是必须的
type mutex struct {
// 如果禁用了锁定级则为空结构体否则包括锁定等级
lockRankStruct
// 基于futex的实现将其看作uint32的key,
// 而基于sema的实现将其看作M* waitm
// 过去曾经是一个union,但是unions破坏了精确的GC
key uintptr
}
GOEXPERIMENT=staticlockrankinglockRanStructlockWithRank()lock()
mutexLockUnlock
dragonfly freebsd linux
// 从原子上讲,如果 *addr == val { sleep },可能发生虚假唤醒,但是这是被允许的
// 休眠时间不超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果有任何进程在addr上休眠,则最多唤醒cnt
futexwakeup(addr *uint32, cnt uint32)
aix darwin nacl netbsd openbsd plan9 solaris windows
// 如果尚未创建mp信号量,则为其创建一个
func semacreate(mp *m)
// 如果ns<0,则获取m的信号量并返回0
// 如果ns>=0,则在ns纳秒内尝试获取m的信号量
// 如果获取了信号量返回0 如果中断或超时返回-1
func semasleep(ns int64) int32
//唤醒已经或即将在其信号量上休眠的mp
func semawakeup(mp *m)
js,wasm
futex
Lock
// +build !goexperiment.staticlockrankingstaticlockranking// +build dragonfly freebsd linux
//lock_futex.go
func lock(l *mutex) {
lockWithRank(l, getLockRank(l))
}
//go:nosplit
func lockWithRank(l *mutex, rank lockRank) {
lock2(l)
}
func lock2(l *mutex) {
gp := getg()
if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
//g绑定的m的lock属性自增加一
gp.m.locks++
// 投机抢占锁 运气好抢到直接返回 不需要进行内核调用
v := atomic.Xchg(key32(&l.key), mutex_locked)
if v == mutex_unlocked {
return
}
// wait为MUTEX_LOCKED还是MUTEX_SLEEPING取决于此互斥对象mutex是否存在线程在休眠
// 如果我们曾经将l->key从MUTEX_SLEEPING更改为其他值,则在返回之前必须小心将其更改回MUTEX_SLEEPING,
// 以确保休眠线程得到其唤醒调用
wait := v
// 单处理器上 无自旋
// 多处理器上,自旋进行ACTIVE_SPIN尝试
spin := 0
if ncpu > 1 {
spin = active_spin
}
for {
// 尝试锁定 自旋
for i := 0; i < spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
procyield(active_spin_cnt)
}
// 尝试锁定 重新调度
for i := 0; i < passive_spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
osyield()
}
// 休眠
v = atomic.Xchg(key32(&l.key), mutex_sleeping)
if v == mutex_unlocked {
return
}
wait = mutex_sleeping
futexsleep(key32(&l.key), mutex_sleeping, -1)
}
}
procyieldPAUSE
active_spin_cnt = 30
procyield(active_spin_cnt)
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX // 将30放入AX寄存器
again:
PAUSE
SUBL $1, AX // 每次减一
JNZ again
RET
osyield()
osyield()
//sys_linux_amd64.s
TEXT runtime·osyield(SB),NOSPLIT,$0
MOVL $SYS_sched_yield, AX
SYSCALL
RET
Lock
atomic.Xchgmutex.keymutex_lockedmutex_unlockedatomic.CasPAUSEosyield()futexsleep()
Lockfutexsleep()
futexsleep()
//src/runtime/os_linux.go
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
// Some Linux kernels have a bug where futex of
// FUTEX_WAIT returns an internal error code
// as an errno. Libpthread ignores the return value
// here, and so can we: as it says a few lines up,
// spurious wakeups are allowed.
if ns < 0 {
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
}
var ts timespec
ts.setNsec(ns)
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
//go:noescape
func futex(addr unsafe.Pointer, op int32, val uint32, ts, addr2 unsafe.Pointer, val3 uint32) int32
futex_sleep()futex()
// src/runtime/sys_linux_amd64.s
// int64 futex(int32 *uaddr, int32 op, int32 val,
// struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
MOVQ addr+0(FP), DI
MOVL op+8(FP), SI
MOVL val+12(FP), DX
MOVQ ts+16(FP), R10
MOVQ addr2+24(FP), R8
MOVL val3+32(FP), R9
MOVL $SYS_futex, AX
SYSCALL
MOVL AX, ret+40(FP)
RET
runtime.futex$SYS_futexAXSYSCALLfutex
linuxfutex
从Linux内核看futex
啥是futex?
Fast Userspace muTEX
futex出现之前,linux的同步机制主要分为两类:
- 用户态的同步机制 利用原子指令实现自旋锁,但不适用于临界区执行时间过长的场景
- 内核同步机制 如semaphore等,使用自旋+等待的形式,lock和unlock都是系统调用,即使没有冲突也要通过系统调用进入内核之后才能识别。
理想状态下,在无冲突的情况下在用户空间通过自旋锁解决,如果仍未拿到锁资源需要等待时再通过系统调用完成sleep和wake的语义。那么自旋失败的情况下,将当前线程挂起,然后是不是由持有锁的线程释放锁的时候再将其唤醒呢?带着这个疑问我们一起粗略看看linux内核如何实现futex的吧
数据结构
kernel/futex.cinclude/linux/futex.hfutexJavafutex_hash_bucketfutex_q
/**
* struct futex_q - futex等待队列实体,每个等待任务分配一个
* @list: 等待在futex上带优先级链表
* @task: 等待在futex上的任务
* @lock_ptr: 哈希桶自旋锁指针
* @key: the key the futex is hashed on
* @pi_state: optional priority inheritance state
* @rt_waiter: rt_waiter storage for use with requeue_pi
* @requeue_pi_key: the requeue_pi target futex key
* @bitset: bitset for the optional bitmasked wakeup
*
* We use this hashed waitqueue, instead of a normal wait_queue_t, so
* we can wake only the relevant ones (hashed queues may be shared).
*
* A futex_q has a woken state, just like tasks have TASK_RUNNING.
* It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
* The order of wakeup is always to make the first condition true, then
* the second.
*
* PI futexes are typically woken before they are removed from the hash list via
* the rt_mutex code. See unqueue_me_pi().
*/
struct futex_q {
struct plist_node list;// 等待在futex上按照优先级排序的任务列表
struct task_struct *task; //等待在futex上的任务
spinlock_t *lock_ptr; //哈希桶自旋锁指针
union futex_key key; //futex进行hash的key
struct futex_pi_state *pi_state; //可选的 优先级继承状态 控制优先级继承
struct rt_mutex_waiter *rt_waiter;//
union futex_key *requeue_pi_key;
u32 bitset; //掩码匹配
};
/*
* Hash buckets are shared by all the futex_keys that hash to the same
* location. Each key may have multiple futex_q structures, one for each task
* waiting on a futex.
*/
struct futex_hash_bucket {
spinlock_t lock;
struct plist_head chain;
};
static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
futex
外部调用入口
mutexfutexfutex
#include <linux/futex.h>
#include <stdint.h>
#include <sys/time.h>
long futex(uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);
futex*uaddrfutexvalflags*timeout*uaddr2val3futex_opFUTEX_WAITFUTEX_WAKE
kernel/futex.cdo_futexdo_futexopcmd
//kernel/futex.c
SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
struct timespec __user *, utime, u32 __user *, uaddr2,
u32, val3)
{
struct timespec ts;
ktime_t t, *tp = NULL;
u32 val2 = 0;
int cmd = op & FUTEX_CMD_MASK;
if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||
cmd == FUTEX_WAIT_BITSET ||
cmd == FUTEX_WAIT_REQUEUE_PI)) {
if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
return -EFAULT;
if (!timespec_valid(&ts))
return -EINVAL;
t = timespec_to_ktime(ts);
if (cmd == FUTEX_WAIT)
t = ktime_add_safe(ktime_get(), t);
tp = &t;
}
/*
* requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.
* number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.
*/
if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||
cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)
val2 = (u32) (unsigned long) utime;
return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
u32 __user *uaddr2, u32 val2, u32 val3)
{
int cmd = op & FUTEX_CMD_MASK;
unsigned int flags = 0;
if (!(op & FUTEX_PRIVATE_FLAG))
flags |= FLAGS_SHARED;
if (op & FUTEX_CLOCK_REALTIME) {
flags |= FLAGS_CLOCKRT;
if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
return -ENOSYS;
}
switch (cmd) {
case FUTEX_LOCK_PI:
case FUTEX_UNLOCK_PI:
case FUTEX_TRYLOCK_PI:
case FUTEX_WAIT_REQUEUE_PI:
case FUTEX_CMP_REQUEUE_PI:
if (!futex_cmpxchg_enabled)
return -ENOSYS;
}
switch (cmd) {
case FUTEX_WAIT:
val3 = FUTEX_BITSET_MATCH_ANY;
case FUTEX_WAIT_BITSET:
// 若*uaddr==val则阻塞当前进程/线程 放入阻塞队列
return futex_wait(uaddr, flags, val, timeout, val3);
case FUTEX_WAKE:
val3 = FUTEX_BITSET_MATCH_ANY;
case FUTEX_WAKE_BITSET:
// 最多唤醒val个在uaddr等待的线程/进程
return futex_wake(uaddr, flags, val, val3);
case FUTEX_REQUEUE:
return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
case FUTEX_CMP_REQUEUE:
return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
case FUTEX_WAKE_OP:
return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
case FUTEX_LOCK_PI:
return futex_lock_pi(uaddr, flags, val, timeout, 0);
case FUTEX_UNLOCK_PI:
return futex_unlock_pi(uaddr, flags);
case FUTEX_TRYLOCK_PI:
return futex_lock_pi(uaddr, flags, 0, timeout, 1);
case FUTEX_WAIT_REQUEUE_PI:
val3 = FUTEX_BITSET_MATCH_ANY;
return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
uaddr2);
case FUTEX_CMP_REQUEUE_PI:
return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
}
return -ENOSYS;
}
futexsleep()futexwakeup()
// 原子上,如果 *addr == val { sleep } ;虚假唤醒是被允许的;不会阻塞超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果任何线程阻塞在addr上,则唤醒至少cnt个阻塞的任务
futexwakeup(addr *uint32, cnt uint32)
futex()futex_opFUTEX_WAITFUTEX_WAKE
futex_wait
futex()futex_opFUTEX_WAIT方法。主要逻辑为判断指向的值和值若相等,则将当前线程/进程阻塞在所指向的
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;
...
// 设置定时任务,超时abs_time后线程仍未被唤醒,则由定时任务唤醒它
if (abs_time) {
to = &timeout;
hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
CLOCK_REALTIME : CLOCK_MONOTONIC,
HRTIMER_MODE_ABS);
hrtimer_init_sleeper(to, current);
hrtimer_set_expires_range_ns(&to->timer, *abs_time,
current->timer_slack_ns);
}
retry:
// 准备等待在uaddr上 如果成功则持有hash_bucket的锁
// 比较uaddr和期望值val是否相等 并初始化q.key
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)//val被改变直接返回
goto out;
// 插入futex等待队列,等待超时或通过信号来唤醒
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
/* unqueue_me() drops q.key ref */
// 若unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移除等待队列,所以这里会失败)
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current))
goto retry;
ret = -ERESTARTSYS;
if (!abs_time)
goto out;
restart = ¤t_thread_info()->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = abs_time->tv64;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
ret = -ERESTART_RESTARTBLOCK;
out:
if (to) {
// 取消定时任务
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
futex_wait()
futex_wait_setup(uaddr, val, flags, &q, &hb)uaddrfutex_hash_bucketfutex_q.keyfutex_wait_queue_me(hb, &q, to)
/**
* futex_wait_setup() - 准备等待在一个futex变量上
* @uaddr: futex用户空间地址
* @val: 期望的值
* @flags: futex标识 (FLAGS_SHARED, etc.)
* @q: 关联的futex_q
* @hb: hash_bucket的指针 返回给调用者
*
* Setup the futex_q and locate the hash_bucket. Get the futex value and
* compare it with the expected value. Handle atomic faults internally.
* Return with the hb lock held and a q.key reference on success, and unlocked
* with no q.key reference on failure.
*
* Return:
* 0 - uaddr contains val and hb has been locked;
* <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
/*
* Access the page AFTER the hash-bucket is locked.
* Order is important:
*
* Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
* Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
*
* The basic logical guarantee of a futex is that it blocks ONLY
* if cond(var) is known to be true at the time of blocking, for
* any cond. If we locked the hash-bucket after testing *uaddr, that
* would open a race condition where we could block indefinitely with
* cond(var) false, which would violate the guarantee.
*
* On the other hand, we insert q and release the hash-bucket only
* after testing *uaddr. This guarantees that futex_wait() will NOT
* absorb a wakeup if *uaddr does not match the desired values
* while the syscall executes.
*/
retry:
//初始化futex_q 填充q->key
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
//1.对q->key进行hash 然后通过&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]找到futex_hash_bucket
//2. spin_lock(&hb->lock)获得自旋锁
*hb = queue_lock(q);
//原子的将uaddr的值拷贝到uval中
ret = get_futex_value_locked(&uval, uaddr);
if (ret) { //拷贝操作失败 重试
...
goto retry;
}
// 如果uval的值(即uaddr)不等于期望值val 则表明其他线程在修改
// 直接返回无需等待
if (uval != val) {
queue_unlock(q, *hb);
ret = -EWOULDBLOCK;
}
out:
if (ret)
put_futex_key(&q->key);
return ret;
}
futex_wait_setup()
futex_qfutex_q.keyfutex_q.key&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]futex_hash_bucket*uaddrval-EWOULDBLOCK\#define EWOULDBLOCK 246 /* Operation would block (Linux returns EAGAIN) */
/**
* futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
* @hb: the futex hash bucket, must be locked by the caller
* @q: the futex_q to queue up on
* @timeout: the prepared hrtimer_sleeper, or null for no timeout
*/
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
// task状态保证在另一个task唤醒它之前被设置,set_current_state利用set_mb()实现
// 设置task状态为TASK_INTERRUPTIBLE CPU只会调度状态为TASK_RUNNING的任务
set_current_state(TASK_INTERRUPTIBLE);
//将q插入到等待队列中然后释放自旋锁
queue_me(q, hb);
//启动定时器
if (timeout) {
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
if (!hrtimer_active(&timeout->timer))
timeout->task = NULL;
}
/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
/*
* If the timer has already expired, current will already be
* flagged for rescheduling. Only call schedule if there
* is no timeout, or if it has yet to expire.
*/
// 未设置过期时间 或过期时间还未到期才进行调度
if (!timeout || timeout->task)
//系统重新进行调度,此时CPU会去执行其他任务,当前任务会被阻塞
schedule();
}
// 走到这里说明当前任务被CPU选中执行
__set_current_state(TASK_RUNNING);
}
futex_wait_queue_me
TASK_INTERRUPTIBLEqueueme()futex_qTASK_RUNNING
futex_waitfutex_wait_setup()futex_q.keyfutex_hash_bucketspin_lock(futex_hash_bucket.lock)futex_wait_queue_me()TASK_INTERRUPTIBLEqueue_me()futex_qspin_unlock(&hb->lock);*uaddr
futex_wake
futex_wake()*uaddr
/*
* Wake up waiters matching bitset queued on this futex (uaddr).
*/
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key = FUTEX_KEY_INIT;
int ret;
if (!bitset)
return -EINVAL;
//根据uaddr的值填充&key的内容
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
if (unlikely(ret != 0))
goto out;
//根据&key获取对应uaddr所在的futex_hash_bucket
hb = hash_futex(&key);
//对该hb加自旋锁
spin_lock(&hb->lock);
head = &hb->chain;
//遍历该hb的链表 注意链表中存储的节点时plist_node类型,而这里的this却是futex_q类型
//这种类型转换是通过c中的container_of机制实现的
plist_for_each_entry_safe(this, next, head, list) {
if (match_futex (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
//唤醒对应进程
wake_futex(this);
if (++ret >= nr_wake)
break;
}
}
//释放自旋锁
spin_unlock(&hb->lock);
put_futex_key(&key);
out:
return ret;
}
wake_futexwake_futexfutex_hash_bucketfutex_q
static void wake_futex(struct futex_q *q)
{
struct task_struct *p = q->task;
if (WARN(q->pi_state || q->rt_waiter, "refusing to wake PI futex\n"))
return;
// 在唤醒任务之前将q->lock_ptr设置为NULL
// 如果另一个CPU发生了非futex方式的唤醒,则任务可能退出。p解引用则是一个不存在的task结构体
// 为避免这种case,需要持有p引用来进行唤醒
get_task_struct(p);
//将q出队列
__unqueue_futex(q);
/*
* 只要写入q-> lock_ptr = NULL,等待的任务就可以释放futex_q,而无需获取任何锁。
* 这里需要一个内存屏障,以防止lock_ptr的后续存储超越plist_del。
*/
smp_wmb();
q->lock_ptr = NULL;
// 将TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE状态的task唤醒
wake_up_state(p, TASK_NORMAL);
// task释放futex_q
put_task_struct(p);
}
futex_wake()
*uaddrfutex_q->keyhash_futex(&key);futex_hash_bucketspin_lock(&hb->lock);futex_hash_bucketwake_futexTASK_RUNNINGfutex_q
futex
mmap
Unlock
Lock
func unlock(l *mutex) {
unlockWithRank(l)
}
//go:nosplit
func unlockWithRank(l *mutex) {
unlock2(l)
}
func unlock2(l *mutex) {
v := atomic.Xchg(key32(&l.key), mutex_unlocked)
if v == mutex_unlocked {
throw("unlock of unlocked lock")
}
if v == mutex_sleeping {
futexwakeup(key32(&l.key), 1)
}
gp := getg()
gp.m.locks--
if gp.m.locks < 0 {
throw("runtime·unlock: lock count")
}
if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
}
}
Lock
atomic.Xchgmutex.keymutex_unlockedmutex_sleepingfutexwakeup()
futexwakeup()futex()
//src/runtime/os_linux.go
// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
if ret >= 0 {
return
}
// I don't know that futex wakeup can return
// EAGAIN or EINTR, but if it does, it would be
// safe to loop and call futex again.
systemstack(func() {
print("futexwakeup addr=", addr, " returned ", ret, "\n")
})
*(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}
futexwakeupfutex
总结
futex
刚刚不是用户态的自旋锁已经解决不了问题了吗,你可能会想我们可以这么玩:
func lock() {
for !tryLock(state) {
wait() //系统调用 进入休眠
}
}
func tryLock(state) bool {
i:=0
for !compareAndSwap(state, 0, 1) {
i++
if i>3 {
return false
}
}
return true
}
func unlock() {
compareAndSet(state,1,0)
wakeup() //系统调用唤醒
}
tryLockwaitlock()wait()unlock()wait()futex
如有异议 请轻拍 欢迎交流。