atomic包提供了对内存的原子操作,可以方便地实现各种同步算法。提供的原子操作有:
- swap
- compare-and-swap
- load
- store
- add
支持的数据类型
操作 | 支持的数据类型 |
swap | int32、int64、unit32、uint64、uintptr、unsafe.Pointer |
cas(compare-and-swap) | int32、int64、unit32、uint64、uintptr、unsafe.Pointer |
add | int32、int64、unit32、uint64、uintptr |
load | int32、int64、unit32、uint64、uintptr、unsafe.Pointer |
store | int32、int64、unit32、uint64、uintptr、unsafe.Pointer |
对这些类型的数据的操作,使用对应函数就可以了。比如要不加锁地将 int32 值在并发环境中 +10:
atomic.AddInt32(&a, 10)
atomic包提供了更省心的方式:使用包装过的类型 atomic.Int32:
type Int32 struct {
_ noCopy
v int32
}
func (x *Int32) Load() int32 { return LoadInt32(&x.v) }
func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }
func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {
return CompareAndSwapInt32(&x.v, old, new)
}
func (x *Int32) Add(delta int32) (new int32) { return AddInt32(&x.v, delta) }
可以看出,atomic.Int32支持所有5种原子操作,每种操作内部还是用对应原子函数实现的,atomic.Int32 实现了对int32的包装。
包装类型
atomic包支持的
数据类型 | 包装类型 | 支持的操作 |
---|---|---|
int32 | atomic.Int32 | Add、Load、Store、Swap、CompareAndSwap |
int64 | atomic.Int64 | Add、Load、Store、Swap、CompareAndSwap |
uint32 | atomic.Uint32 | Add、Load、Store、Swap、CompareAndSwap |
uint64 | atomic.Uint64 | Add、Load、Store、Swap、CompareAndSwap |
uintptr | atomic.Uintptr | Add、Load、Store、Swap、CompareAndSwap |
unsafe.Pointer | atomic. Pointer[T any] | Load、Store、Swap、CompareAndSwap |
atomic.Bool | Load、Store、Swap、CompareAndSwap | |
atomic.Value | Load、Store、Swap、CompareAndSwap |
实现原理
包装类型是对原子类型的包装,内部也是调用原子函数实现的。那么原子函数是如何实现的?
func SwapUint64(addr *uint64, new uint64) (old uint64)
在代码中,只看到了函数的声明,并没有实现。
在 asm.s 中,我们发现了对应的汇编代码:
TEXT ·SwapUint64(SB),NOSPLIT,$0
JMP runtime∕internal∕atomic·Xchg64(SB)
以amd64架构为例,在文件 runtime/internal/atomic/atomic_amd64.s中:
TEXT ·Xchg64(SB), NOSPLIT, $0-24
// 先把原值的地址放入BX,新值放入AX
MOVQ ptr+0(FP), BX
MOVQ new+8(FP), AX
// 执行 XCHGQ指令,将 AX 和 0(BX)的值交换,这时AX中的值是旧值
XCHGQ AX, 0(BX)
// 把旧值(在AX中)移动到 内存 16(FP)的地方
MOVQ AX, ret+16(FP)
RET
TEXT 说明这是一个函数,NOSPLIT表明不做栈溢出检查,$0-24 说明栈帧大小为0, 24为参数和返回值占用的内存。在Golang中,参数和返回值位于栈上、连续存放,起始指针放于 FP中。2个入参和1个返回值,总大小为24字节。
XCHGQ指令完成两个操作数的交换。
用到的其他指令(runtime/internal/atomic/atomic_amd64.s)
- CMPXCHGL CX, 0(BX)
- CMPXCHGQ CX, 0(BX)
- XADDL AX, 0(BX)
- XADDQ AX, 0(BX)
- XCHGL AX, 0(BX)
- XCHGQ AX, 0(BX)
- XCHGB AX, 0(BX)
即CMPXCHGx 、 XCHGx、XADDx 这三类。CMPXCHGx 这类可以完成比较并交换,XCHGx 这类完成交换,XADDx这类完成加法。
这些指令都要操作一个内存地址( 0(BX) ),依然不是安全的,因为可能存在其他CPU核心对这个内存地址执行操作。0(BX)可能在内存中,也可能在CPU cache 中。不管那种情况,依然不能保证指令是 排他写的。(原理可以参考CPU cache 和MESI协议)
要保证排他性,需要使用 LOCK 指令。
TEXT ·Cas64(SB), NOSPLIT, $0-25
MOVQ ptr+0(FP), BX
MOVQ old+8(FP), AX
MOVQ new+16(FP), CX
LOCK
CMPXCHGQ CX, 0(BX)
SETEQ ret+24(FP)
RET
Cas64使用 CMPXCHGQ 完成了比较并交换操作,为了保证执行这个指令期间,对数据的访问是排他的,使用了LOCK。LOCK实际上不是指令,他的作用是对内存或CPU cache加锁:
- 如果 0(BX) 数据在内存中,LOCK的作用是锁内存。这个影响是巨大的,加锁期间,其它CPU也无法访问内存。
- 如果 0(BX) 数据在 CPU cache中,LOCK就锁cache line
具体实现需要理解 MESI协议。
使用场景
1.在runtime中实现mutex
type mutex struct {
// Empty struct if lock ranking is disabled, otherwise includes the lock rank
lockRankStruct
// Futex-based impl treats it as uint32 key,
// while sema-based impl as M* waitm.
// Used to be a union, but unions break precise GC.
key uintptr
}
func lock(l *mutex) {
lockWithRank(l, getLockRank(l))
}
func lockWithRank(l *mutex, rank lockRank) {
lock2(l)
}
func lock2(l *mutex) {
gp := getg()
if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
gp.m.locks++
// Speculative grab for lock.
if atomic.Casuintptr(&l.key, 0, locked) {
return
}
semacreate(gp.m)
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin := 0
if ncpu > 1 {
spin = active_spin
}
Loop:
for i := 0; ; i++ {
v := atomic.Loaduintptr(&l.key)
if v&locked == 0 {
// Unlocked. Try to lock.
if atomic.Casuintptr(&l.key, v, v|locked) {
return
}
i = 0
}
if i < spin {
procyield(active_spin_cnt)
} else if i < spin+passive_spin {
osyield()
} else {
// Someone else has it.
// l->waitm points to a linked list of M's waiting
// for this lock, chained through m->nextwaitm.
// Queue this M.
for {
gp.m.nextwaitm = muintptr(v &^ locked)
if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {
break
}
v = atomic.Loaduintptr(&l.key)
if v&locked == 0 {
continue Loop
}
}
if v&locked != 0 {
// Queued. Wait.
semasleep(-1)
i = 0
}
}
}
}