atomic包提供了对内存的原子操作,可以方便地实现各种同步算法。提供的原子操作有:

  • swap
  • compare-and-swap
  • load
  • store
  • add

支持的数据类型

操作支持的数据类型
swapint32、int64、unit32、uint64、uintptr、unsafe.Pointer
cas(compare-and-swap)int32、int64、unit32、uint64、uintptr、unsafe.Pointer
addint32、int64、unit32、uint64、uintptr
loadint32、int64、unit32、uint64、uintptr、unsafe.Pointer
storeint32、int64、unit32、uint64、uintptr、unsafe.Pointer

对这些类型的数据的操作,使用对应函数就可以了。比如要不加锁地将 int32 值在并发环境中 +10:

atomic.AddInt32(&a, 10) 

atomic包提供了更省心的方式:使用包装过的类型 atomic.Int32:

type Int32 struct {
    _ noCopy
    v int32
}

func (x *Int32) Load() int32 { return LoadInt32(&x.v) }
func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }
func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {
	return CompareAndSwapInt32(&x.v, old, new)
}
func (x *Int32) Add(delta int32) (new int32) { return AddInt32(&x.v, delta) }

可以看出,atomic.Int32支持所有5种原子操作,每种操作内部还是用对应原子函数实现的,atomic.Int32 实现了对int32的包装。

包装类型

atomic包支持的

数据类型包装类型支持的操作
int32atomic.Int32Add、Load、Store、Swap、CompareAndSwap
int64atomic.Int64Add、Load、Store、Swap、CompareAndSwap
uint32atomic.Uint32Add、Load、Store、Swap、CompareAndSwap
uint64atomic.Uint64Add、Load、Store、Swap、CompareAndSwap
uintptratomic.UintptrAdd、Load、Store、Swap、CompareAndSwap
unsafe.Pointeratomic. Pointer[T any]Load、Store、Swap、CompareAndSwap
atomic.BoolLoad、Store、Swap、CompareAndSwap
atomic.ValueLoad、Store、Swap、CompareAndSwap

实现原理

包装类型是对原子类型的包装,内部也是调用原子函数实现的。那么原子函数是如何实现的?

func SwapUint64(addr *uint64, new uint64) (old uint64) 

在代码中,只看到了函数的声明,并没有实现。

在 asm.s 中,我们发现了对应的汇编代码:

TEXT ·SwapUint64(SB),NOSPLIT,$0     
    JMP runtime∕internal∕atomic·Xchg64(SB)

以amd64架构为例,在文件 runtime/internal/atomic/atomic_amd64.s中:

TEXT ·Xchg64(SB), NOSPLIT, $0-24
    // 先把原值的地址放入BX,新值放入AX     
    MOVQ    ptr+0(FP), BX     
    MOVQ    new+8(FP), AX
    // 执行 XCHGQ指令,将 AX 和 0(BX)的值交换,这时AX中的值是旧值     
    XCHGQ   AX, 0(BX)  
    // 把旧值(在AX中)移动到 内存 16(FP)的地方   
    MOVQ    AX, ret+16(FP)     
    RET

TEXT 说明这是一个函数,NOSPLIT表明不做栈溢出检查,$0-24 说明栈帧大小为0, 24为参数和返回值占用的内存。在Golang中,参数和返回值位于栈上、连续存放,起始指针放于 FP中。2个入参和1个返回值,总大小为24字节。

XCHGQ指令完成两个操作数的交换。

用到的其他指令(runtime/internal/atomic/atomic_amd64.s)

  • CMPXCHGL CX, 0(BX)
  • CMPXCHGQ CX, 0(BX)
  • XADDL AX, 0(BX)
  • XADDQ AX, 0(BX)
  • XCHGL AX, 0(BX)
  • XCHGQ AX, 0(BX)
  • XCHGB AX, 0(BX)

即CMPXCHGx 、 XCHGx、XADDx 这三类。CMPXCHGx 这类可以完成比较并交换,XCHGx 这类完成交换,XADDx这类完成加法。

这些指令都要操作一个内存地址( 0(BX) ),依然不是安全的,因为可能存在其他CPU核心对这个内存地址执行操作。0(BX)可能在内存中,也可能在CPU cache 中。不管那种情况,依然不能保证指令是 排他写的。(原理可以参考CPU cache 和MESI协议)

要保证排他性,需要使用 LOCK 指令。

TEXT ·Cas64(SB), NOSPLIT, $0-25
    MOVQ    ptr+0(FP), BX
    MOVQ    old+8(FP), AX
    MOVQ    new+16(FP), CX
    LOCK
    CMPXCHGQ    CX, 0(BX)
    SETEQ   ret+24(FP)
    RET

Cas64使用 CMPXCHGQ 完成了比较并交换操作,为了保证执行这个指令期间,对数据的访问是排他的,使用了LOCK。LOCK实际上不是指令,他的作用是对内存或CPU cache加锁:

  • 如果 0(BX) 数据在内存中,LOCK的作用是锁内存。这个影响是巨大的,加锁期间,其它CPU也无法访问内存。
  • 如果 0(BX) 数据在 CPU cache中,LOCK就锁cache line

具体实现需要理解 MESI协议。

使用场景

1.在runtime中实现mutex

type mutex struct {
 // Empty struct if lock ranking is disabled, otherwise includes the lock rank
    lockRankStruct
 // Futex-based impl treats it as uint32 key,
 // while sema-based impl as M* waitm.
 // Used to be a union, but unions break precise GC.
    key uintptr
}

func lock(l *mutex) {
	lockWithRank(l, getLockRank(l))
}

func lockWithRank(l *mutex, rank lockRank) {
	lock2(l)
}

func lock2(l *mutex) {
	gp := getg()
	if gp.m.locks < 0 {
		throw("runtime·lock: lock count")
	}
	gp.m.locks++

	// Speculative grab for lock.
	if atomic.Casuintptr(&l.key, 0, locked) {
		return
	}
	semacreate(gp.m)

	// On uniprocessor's, no point spinning.
	// On multiprocessors, spin for ACTIVE_SPIN attempts.
	spin := 0
	if ncpu > 1 {
		spin = active_spin
	}
Loop:
	for i := 0; ; i++ {
		v := atomic.Loaduintptr(&l.key)
		if v&locked == 0 {
			// Unlocked. Try to lock.
			if atomic.Casuintptr(&l.key, v, v|locked) {
				return
			}
			i = 0
		}
		if i < spin {
			procyield(active_spin_cnt)
		} else if i < spin+passive_spin {
			osyield()
		} else {
			// Someone else has it.
			// l->waitm points to a linked list of M's waiting
			// for this lock, chained through m->nextwaitm.
			// Queue this M.
			for {
				gp.m.nextwaitm = muintptr(v &^ locked)
				if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {
					break
				}
				v = atomic.Loaduintptr(&l.key)
				if v&locked == 0 {
					continue Loop
				}
			}
			if v&locked != 0 {
				// Queued. Wait.
				semasleep(-1)
				i = 0
			}
		}
	}
}

2.在sync包中实现各种同步算法