锁CAS原子操作

如要对一个变量进行计数统计,两种实现方式分别为

package main

import (
	"fmt"
	"sync"
)

// 锁实现方式
func main() {
	var count int64
	var wg sync.WaitGroup
	var mu sync.Mutex

	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func(wg *sync.WaitGroup) {
			defer wg.Done()
			mu.Lock()
			count = count + 1
			mu.Unlock()
		}(&wg)
	}
	wg.Wait()

	// count = 10000
	fmt.Println("count = ", count)
}

package main

import (
	"fmt"
	"sync"

	"sync/atomic"
)

// atomic CAS 原子操作
func main() {
	var count int64
	var wg sync.WaitGroup

	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func(wg *sync.WaitGroup) {
			defer wg.Done()
			// 失败一直重试
			for {
				old := atomic.LoadInt64(&count)
				if atomic.CompareAndSwapInt64(&count, old, old+1) {
					break
				}
			}

		}(&wg)
	}
	wg.Wait()

	// count = 10000
	fmt.Println("count = ", count)
}

可以看到两种用法的执行结果是一样的,我们再看一下两者的性能区别。

性能差别
package main_test

import (
	"sync"
	"sync/atomic"
	"testing"
)

func lockTest() {
	var count int64
	var wg sync.WaitGroup
	var mu sync.Mutex

	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func(wg *sync.WaitGroup) {
			defer wg.Done()
			mu.Lock()
			count = count + 1
			mu.Unlock()
		}(&wg)
	}
	wg.Wait()
}
func atomicTest() {
	var count int64
	var wg sync.WaitGroup

	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func(wg *sync.WaitGroup) {
			defer wg.Done()
			// 通过
			for {
				old := atomic.LoadInt64(&count)
				if atomic.CompareAndSwapInt64(&count, old, old+1) {
					break
				}
			}

		}(&wg)
	}
	wg.Wait()
}

func BenchmarkLock(b *testing.B) {
	for i := 0; i < b.N; i++ {
		lockTest()
	}

}
func BenchmarkAtomic(b *testing.B) {
	for i := 0; i < b.N; i++ {
		atomicTest()
	}
}

下面分别用一个、两个、四个CPU来测试他们两者的性能差别

一个CPU

➜  gotest go test -bench=".*" -v -benchmem -cpu=1
goos: darwin
goarch: amd64
pkg: gotest
cpu: Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
BenchmarkLock
BenchmarkLock                194           6312940 ns/op              32 B/op          3 allocs/op
BenchmarkAtomic
BenchmarkAtomic              189           6342975 ns/op              24 B/op          2 allocs/op
PASS
ok      gotest  3.296s

在只用一个CPU的情况下,看着差别不是太大,每次 op 的内存使用 atomic 比 lock 少了1/3。

两个CPU

➜  gotest go test -bench=".*" -v -benchmem -cpu=2
goos: darwin
goarch: amd64
pkg: gotest
cpu: Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
BenchmarkLock
BenchmarkLock-2              350           3178037 ns/op            2071 B/op          7 allocs/op
BenchmarkAtomic
BenchmarkAtomic-2            364           3035188 ns/op             439 B/op          3 allocs/op
PASS
ok      gotest  3.370s

可以看到相比一个CPU来说,每个 op 的时间基于快了一倍,分配的内存也大大减小了。

四个CPU

➜  gotest go test -bench=".*" -v -benchmem -cpu=4
goos: darwin
goarch: amd64
pkg: gotest
cpu: Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
BenchmarkLock
BenchmarkLock-4              349           3462298 ns/op            3613 B/op         17 allocs/op
BenchmarkAtomic
BenchmarkAtomic-4            330           3382266 ns/op              24 B/op          2 allocs/op
PASS
ok      gotest  4.221s
opopallocs

总结

atomiclock
atomiclock
实现原理
硬件底层

但原子也有一定的弊端,在被操作值频繁变更的情况下,很可能失败,需要一直重试直到成功为止,这种重试行为也可以理解为自旋spinning,长时间处于spinning将浪费CPU,参考https://www.bilibili.com/video/BV1Sf4y1s7Np/。

原子操作

从硬件的层面实现原子操作,有两种方式:

LOCK#
X86CMPXCHG

CAS 在Golang中是以共享内存的方式来实现的一种同步机制,它是一个原子操作,一般格式如下

fun addValue(delta int32){
    for{
        oldValue := atomic.LoadInt32(&addr)
        if atomic.CompareAndSwapInt32(&addr, oldValue, oldValue+delta){
            break;
        }
    }
}
&addratomic.CompareAndSwapInt32
atomic.CompareAndSwapIntxx()
atomic.CompareAndSwapInt32atomic.CompareAndSwapUint32runtime∕internal∕atomic·Cas
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
//	if(*val == old){
//		*val = new;
//		return 1;
//	} else
//		return 0;
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
	MOVQ	ptr+0(FP), BX
	MOVL	old+8(FP), AX
	MOVL	new+12(FP), CX
	LOCK
	CMPXCHGL	CX, 0(BX)
	SETEQ	ret+16(FP)
	RET
atomic.CompareAndSwapInt64atomic.CompareAndSwapUint64runtime∕internal∕atomic·Cas64
// bool	runtime∕internal∕atomic·Cas64(uint64 *val, uint64 old, uint64 new)
// Atomically:
//	if(*val == *old){
//		*val = new;
//		return 1;
//	} else {
//		return 0;
//	}
TEXT runtime∕internal∕atomic·Cas64(SB), NOSPLIT, $0-25
	MOVQ	ptr+0(FP), BX
	MOVQ	old+8(FP), AX
	MOVQ	new+16(FP), CX
	LOCK
	CMPXCHGQ	CX, 0(BX)
	SETEQ	ret+24(FP)
	RET
LOCKCMPXCHGQ
LOCK#LOCK#
CMPXCHG

上面我们提到过锁的实现是由操作系统来实现的,所以它的锁粒度要保证最小越好。对锁的理解很简单,这里就不再详细介绍了。

参考资料