前言

Go 语言提供了协程并发编程模型,那么只要是并发就需要解决协程间同步问题,官方为我们提供了同步工具,sync 包。

锁(Mutex)

两种锁

锁有如下特点,使用时需要注意:

  • 锁在 Go 语言中都是成对出现的,也就是说每一个加锁操作必须要对应一个解锁操作,我们通常会跟一个 defer 语句进行解锁
  • 对已锁的锁再次加锁会阻塞当前加锁协程
  • 对未锁定的锁进行解锁会触发 panic,且无法 recover(在1.8版本之前可以recover,但会因为错误代码导致误解锁的协程永久阻塞)

锁有两种类型,互斥锁和读写锁,分别适用于不同的场景:

  • 互斥锁会阻塞对临界区的访问,并且读写都是互斥的
  • 读写锁同样会阻塞对临界区的访问,但是读操作可以并行,只有写互斥

假如我们需要对一个变量进行并发读写,看一下互斥锁和读写锁的性能对比

package test

import "testing"

func BenchmarkMutex(b *testing.B) {
	numbter = 1

	for i := 0; i <= b.N; i++ {
		go Write(i)
		go Read()
	}
	wg.Wait()
}

互斥锁:

package test

import (
	"sync"
	"time"
)

var wg sync.WaitGroup

var numbter int

var mutex sync.Mutex

func Write(val int) {
	wg.Add(1)
	mutex.Lock()
	defer mutex.Unlock()
	defer wg.Done()
	time.Sleep(time.Nanosecond * 100)
	numbter = val
}

func Read() int {
	wg.Add(1)
	mutex.Lock()
	defer mutex.Unlock()
	defer wg.Done()
	time.Sleep(time.Nanosecond * 100)
	return numbter
}
go test -bench=. -benchtime=5s -benchmem  
goos: darwin
goarch: amd64
BenchmarkMutex-8         1000000              6640 ns/op            1168 B/op          5 allocs/op
PASS
ok      _/Users/why/Desktop/go/test/test        6.968s

读写锁:

package test

import (
	"sync"
	"time"
)

var wg sync.WaitGroup

var numbter int

var mutex sync.RWMutex

func Write(val int) {
	wg.Add(1)
	mutex.Lock()
	defer mutex.Unlock()
	defer wg.Done()
	time.Sleep(time.Nanosecond * 100)
	numbter = val
}

func Read() int {
	wg.Add(1)
	mutex.RLock()
	defer mutex.RUnlock()
	defer wg.Done()
	time.Sleep(time.Nanosecond * 100)
	return numbter
}
go test -bench=. -benchtime=5s -benchmem  
goos: darwin
goarch: amd64
BenchmarkMutex-8         1000000              5818 ns/op            1165 B/op          5 allocs/op
PASS
ok      _/Users/why/Desktop/go/test/test        6.118s

锁条件变量(Cond)

条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。
使用条件变量的最大优势就是在效率方面的提升,当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了

Cond 提供了下面三个方法:

  • Wait:等待通知,需要在它基于的那个互斥锁的保护下进行
  • Signal(单发通知)和Broadcast(广播通知):需要在对应的互斥锁解锁之后再做操作

Cond 提升效率主要在 Wait 方法的实现上,我们看一下方法的实现,主要做了4件事:

func (c *Cond) Wait() {
	c.checker.check()
    //把调用它的goroutine加入到当前条件变量的通知队列中
	t := runtime_notifyListAdd(&c.notify)
    //解锁当前的条件变量基于的那个互斥锁
	c.L.Unlock()
    //让当前的goroutine处于等待状态,等到通知来了再决定是否唤醒它,此时阻塞在调用Wait方法的那行代码上
	runtime_notifyListWait(&c.notify, t)
    //唤醒它之后重新锁定当前条件变量基于的互斥锁
	c.L.Lock()
}

所以我们在使用 Wait 时要注意以下几点:

  • 一定要在调用之前锁定与之关联的读锁,否则在 Unlock 时会触发 panic
  • 一定不要忘记在处理完逻辑后及时解锁与 cond 变量关联的读锁,因为 Wait 方法最终会重新锁定与之关联的那个读锁

示例代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

type msgBox struct {
	message  string
	isEmpty  bool
	sendCond *sync.Cond
	recvCond *sync.Cond
}

func main() {

	var lock sync.RWMutex
	msgBox := msgBox{
		isEmpty:  true,
		sendCond: sync.NewCond(&lock),
		recvCond: sync.NewCond(lock.RLocker()),
	}

	done := make(chan struct{})
	max := 5

	// 写操作的goroutine
	go func(max int) {
		defer func() {
			done <- struct{}{}
		}()
		for i := 0; i < max; i++ {
			time.Sleep(time.Millisecond * 200)
			// 先进行保护
			lock.Lock()
			// 再等待通知
			for !msgBox.isEmpty {
				msgBox.sendCond.Wait()
			}
			msgBox.isEmpty = false
			msg := fmt.Sprintf("第 %d 条消息", i)
			msgBox.message = msg
			fmt.Printf("发送消息[%d]: %s\n", i, msg)
			// 先解锁
			lock.Unlock()
			// 再发送通知
			msgBox.recvCond.Signal()
		}
	}(max)

	// 读操作的goroutine
	go func(max int) {
		defer func() {
			done <- struct{}{}
		}()
		for j := 0; j < max; j++ {
			time.Sleep(time.Millisecond * 500)
			lock.RLock()
			for msgBox.isEmpty {
				msgBox.recvCond.Wait()
			}
			msgBox.isEmpty = true
			msg := msgBox.message
			fmt.Printf("接收消息[%d]: %s\n", j, msg)
			lock.RUnlock()
			msgBox.sendCond.Signal()
		}
	}(max)
	<-done
	<-done
	fmt.Println("Over")
}
原子操作

原子操作即执行过程不能被终端的操作,在针对某个值的原子操作执行的过程中,CPU 绝不会再去执行其他针对该值的操作,无论这些其他操作是否为原子操作,所以原子操作参数都是传入变量的指针,CPU 去判断地址上是否有操作。

增或减

用于实现被操作值的增大或减小,sync/atomic 包中提供了以下几个方法,具体的例子可以移步标准库文档:

如果需要对值递增,第二个参数传一个负值即可,但是对于 AddUint32 和 AddUint64 来说,第二个参数只能传无符号整数,可以通过二进制补码的特性实现:

一个负整数的补码可以通过对它按位取反后+1得到(正整数-1后取反即为它的负值)

package main

import (
	"fmt"
	"sync/atomic"
)

func main() {
	var val uint64
	val = 10
	fmt.Println(val)
	n := 5
	atomic.AddUint64(&val, ^uint64(n-1))
	fmt.Println(val)
}

比较并交换(CAS)

在不创建互斥量和不形成临界区的情况下,完成并发安全的值替换操作,这可以大大地减少同步对程序性能的损耗(但在频繁修改的场景,CAS会频繁失败,或许会造成性能的损耗),提供下面几个方法:

func AddInt64Cas(ptr *int64, val,add int64){
    for {
        if atomic. CompareAndSwapInt64(ptr, val, (val + add)){
            break
        }
    }
}

载入

原子地读取变量的值,并把它赋值给另一个变量,这样可以保证在当前计算机中任何 CPU 都不会进行其他针对此值的读写操作,这样的约束受到底层硬件支持。

func LoadInt64(ptr *int64) int64 {
    return atomic.LoadInt64(ptr)
}

存储

原子地写入变量的值,过程中任何 CPU 都不会进行针对同一个值的读写操作。

func StoreInt64(ptr *int64 val int64) {
    atomic.StoreInt64(ptr, val)
}

交换

不关心操作的旧值,直接设置新值,比原子存储操作多做了一步,会返回被操作值的旧值。

func SwapInt64(ptr *int64 val int64) int64 {
    return atomic.SwapInt64(ptr, val)
}

原子值

用于存储需要原子读写的值,提供2个方法:

  • Load:加载需要读取的值
  • Store:写入值,第一个参数不能为 nil,类型以第一次写入的值为准

底层还是调用的上面提供的 Compare 和 Store 方法:

// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (x interface{}) {
	vp := (*ifaceWords)(unsafe.Pointer(v))
	typ := LoadPointer(&vp.typ)
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		// First store not yet completed.
		return nil
	}
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}

// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	vp := (*ifaceWords)(unsafe.Pointer(v))
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	for {
		typ := LoadPointer(&vp.typ)
		if typ == nil {
			// Attempt to start first store.
			// Disable preemption so that other goroutines can use
			// active spin wait to wait for completion; and so that
			// GC does not see the fake type accidentally.
			runtime_procPin()
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin()
				continue
			}
			// Complete first store.
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin()
			return
		}
		if uintptr(typ) == ^uintptr(0) {
			// First store in progress. Wait.
			// Since we disable preemption around the first store,
			// we can wait with active spinning.
			continue
		}
		// First store completed. Check type and overwrite data.
		if typ != xp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
		StorePointer(&vp.data, xp.data)
		return
	}
}

我们都知道 Go 提供的 map 和 slice 都不是并发安全的,map 有官方提供的 sync.Map,但是切片没有,我们可以用 value 实现一个并发安全的切片,代码参考:https://github.com/why444216978/go-util/blob/master/sync/int_slice.go

临时对象池(sync.Pool)

sync.Pool 作为存放临时值的容器,是多协程共享的,自动伸缩,而且是并发安全的,提供 Get 和 Put 两个公开方法,有以下几个特点:

  • 分为当前 Goroutine 绑定的 P 持有的本地 localPool 和全局共享 Pool
  • Put 会先放到当前 P 本地(private),如果没有则放入全局区(shared)
  • Get 获取到的值是任意的
  • Get 优先获取 private,如果获取不到则加锁从 shared 获取,如果还是获取不到会尝试从其他 private 偷取,若还是获取不到则会调用 New 方法新建
  • 假如 Get 从池中获取到值,返回之前一定会从池中删除
  • 垃圾回收时会将池清空

下面看个例子:

package main

import (
	"fmt"
	"runtime"
	"runtime/debug"
	"sync"
	"sync/atomic"
)

func main() {
	// 禁用GC,并保证在main函数执行结束前恢复GC
	defer debug.SetGCPercent(debug.SetGCPercent(-1))

	var count int32
	newFunc := func() interface{} {
		return atomic.AddInt32(&count, 1)
	}
	pool := sync.Pool{New: newFunc}

	// 没有值时调用 New 返回
	v1 := pool.Get()
	fmt.Printf("Value 1: %v\n", v1)

	// 临时对象池的存入
	pool.Put(10)
	v2 := pool.Get()
	fmt.Printf("Value 2: %v\n", v2)

	// 手动触发 GC,再次 Get 将调用 New 获取
	runtime.GC()
	v3 := pool.Get()
	fmt.Printf("Value 3: %v\n", v3)
	pool.New = nil

	// 当被 New 被赋值为 nil 后,再获取一律为 nil
	v4 := pool.Get()
	fmt.Printf("Value 4: %v\n", v4)
}