前言
Go 语言提供了协程并发编程模型,那么只要是并发就需要解决协程间同步问题,官方为我们提供了同步工具,sync 包。
锁(Mutex)两种锁
锁有如下特点,使用时需要注意:
- 锁在 Go 语言中都是成对出现的,也就是说每一个加锁操作必须要对应一个解锁操作,我们通常会跟一个 defer 语句进行解锁
- 对已锁的锁再次加锁会阻塞当前加锁协程
- 对未锁定的锁进行解锁会触发 panic,且无法 recover(在1.8版本之前可以recover,但会因为错误代码导致误解锁的协程永久阻塞)
锁有两种类型,互斥锁和读写锁,分别适用于不同的场景:
- 互斥锁会阻塞对临界区的访问,并且读写都是互斥的
- 读写锁同样会阻塞对临界区的访问,但是读操作可以并行,只有写互斥
假如我们需要对一个变量进行并发读写,看一下互斥锁和读写锁的性能对比
package test
import "testing"
func BenchmarkMutex(b *testing.B) {
numbter = 1
for i := 0; i <= b.N; i++ {
go Write(i)
go Read()
}
wg.Wait()
}
互斥锁:
package test
import (
"sync"
"time"
)
var wg sync.WaitGroup
var numbter int
var mutex sync.Mutex
func Write(val int) {
wg.Add(1)
mutex.Lock()
defer mutex.Unlock()
defer wg.Done()
time.Sleep(time.Nanosecond * 100)
numbter = val
}
func Read() int {
wg.Add(1)
mutex.Lock()
defer mutex.Unlock()
defer wg.Done()
time.Sleep(time.Nanosecond * 100)
return numbter
}
go test -bench=. -benchtime=5s -benchmem
goos: darwin
goarch: amd64
BenchmarkMutex-8 1000000 6640 ns/op 1168 B/op 5 allocs/op
PASS
ok _/Users/why/Desktop/go/test/test 6.968s
读写锁:
package test
import (
"sync"
"time"
)
var wg sync.WaitGroup
var numbter int
var mutex sync.RWMutex
func Write(val int) {
wg.Add(1)
mutex.Lock()
defer mutex.Unlock()
defer wg.Done()
time.Sleep(time.Nanosecond * 100)
numbter = val
}
func Read() int {
wg.Add(1)
mutex.RLock()
defer mutex.RUnlock()
defer wg.Done()
time.Sleep(time.Nanosecond * 100)
return numbter
}
go test -bench=. -benchtime=5s -benchmem
goos: darwin
goarch: amd64
BenchmarkMutex-8 1000000 5818 ns/op 1165 B/op 5 allocs/op
PASS
ok _/Users/why/Desktop/go/test/test 6.118s
锁条件变量(Cond)
条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。
使用条件变量的最大优势就是在效率方面的提升,当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了。
Cond 提供了下面三个方法:
- Wait:等待通知,需要在它基于的那个互斥锁的保护下进行
- Signal(单发通知)和Broadcast(广播通知):需要在对应的互斥锁解锁之后再做操作
Cond 提升效率主要在 Wait 方法的实现上,我们看一下方法的实现,主要做了4件事:
func (c *Cond) Wait() {
c.checker.check()
//把调用它的goroutine加入到当前条件变量的通知队列中
t := runtime_notifyListAdd(&c.notify)
//解锁当前的条件变量基于的那个互斥锁
c.L.Unlock()
//让当前的goroutine处于等待状态,等到通知来了再决定是否唤醒它,此时阻塞在调用Wait方法的那行代码上
runtime_notifyListWait(&c.notify, t)
//唤醒它之后重新锁定当前条件变量基于的互斥锁
c.L.Lock()
}
所以我们在使用 Wait 时要注意以下几点:
- 一定要在调用之前锁定与之关联的读锁,否则在 Unlock 时会触发 panic
- 一定不要忘记在处理完逻辑后及时解锁与 cond 变量关联的读锁,因为 Wait 方法最终会重新锁定与之关联的那个读锁
示例代码:
package main
import (
"fmt"
"sync"
"time"
)
type msgBox struct {
message string
isEmpty bool
sendCond *sync.Cond
recvCond *sync.Cond
}
func main() {
var lock sync.RWMutex
msgBox := msgBox{
isEmpty: true,
sendCond: sync.NewCond(&lock),
recvCond: sync.NewCond(lock.RLocker()),
}
done := make(chan struct{})
max := 5
// 写操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < max; i++ {
time.Sleep(time.Millisecond * 200)
// 先进行保护
lock.Lock()
// 再等待通知
for !msgBox.isEmpty {
msgBox.sendCond.Wait()
}
msgBox.isEmpty = false
msg := fmt.Sprintf("第 %d 条消息", i)
msgBox.message = msg
fmt.Printf("发送消息[%d]: %s\n", i, msg)
// 先解锁
lock.Unlock()
// 再发送通知
msgBox.recvCond.Signal()
}
}(max)
// 读操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max; j++ {
time.Sleep(time.Millisecond * 500)
lock.RLock()
for msgBox.isEmpty {
msgBox.recvCond.Wait()
}
msgBox.isEmpty = true
msg := msgBox.message
fmt.Printf("接收消息[%d]: %s\n", j, msg)
lock.RUnlock()
msgBox.sendCond.Signal()
}
}(max)
<-done
<-done
fmt.Println("Over")
}
原子操作
原子操作即执行过程不能被终端的操作,在针对某个值的原子操作执行的过程中,CPU 绝不会再去执行其他针对该值的操作,无论这些其他操作是否为原子操作,所以原子操作参数都是传入变量的指针,CPU 去判断地址上是否有操作。
增或减
用于实现被操作值的增大或减小,sync/atomic 包中提供了以下几个方法,具体的例子可以移步标准库文档:
如果需要对值递增,第二个参数传一个负值即可,但是对于 AddUint32 和 AddUint64 来说,第二个参数只能传无符号整数,可以通过二进制补码的特性实现:
一个负整数的补码可以通过对它按位取反后+1得到(正整数-1后取反即为它的负值)
package main
import (
"fmt"
"sync/atomic"
)
func main() {
var val uint64
val = 10
fmt.Println(val)
n := 5
atomic.AddUint64(&val, ^uint64(n-1))
fmt.Println(val)
}
比较并交换(CAS)
在不创建互斥量和不形成临界区的情况下,完成并发安全的值替换操作,这可以大大地减少同步对程序性能的损耗(但在频繁修改的场景,CAS会频繁失败,或许会造成性能的损耗),提供下面几个方法:
func AddInt64Cas(ptr *int64, val,add int64){
for {
if atomic. CompareAndSwapInt64(ptr, val, (val + add)){
break
}
}
}
载入
原子地读取变量的值,并把它赋值给另一个变量,这样可以保证在当前计算机中任何 CPU 都不会进行其他针对此值的读写操作,这样的约束受到底层硬件支持。
func LoadInt64(ptr *int64) int64 {
return atomic.LoadInt64(ptr)
}
存储
原子地写入变量的值,过程中任何 CPU 都不会进行针对同一个值的读写操作。
func StoreInt64(ptr *int64 val int64) {
atomic.StoreInt64(ptr, val)
}
交换
不关心操作的旧值,直接设置新值,比原子存储操作多做了一步,会返回被操作值的旧值。
func SwapInt64(ptr *int64 val int64) int64 {
return atomic.SwapInt64(ptr, val)
}
原子值
用于存储需要原子读写的值,提供2个方法:
- Load:加载需要读取的值
- Store:写入值,第一个参数不能为 nil,类型以第一次写入的值为准
底层还是调用的上面提供的 Compare 和 Store 方法:
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
if x == nil {
panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v))
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}
我们都知道 Go 提供的 map 和 slice 都不是并发安全的,map 有官方提供的 sync.Map,但是切片没有,我们可以用 value 实现一个并发安全的切片,代码参考:https://github.com/why444216978/go-util/blob/master/sync/int_slice.go
临时对象池(sync.Pool)sync.Pool 作为存放临时值的容器,是多协程共享的,自动伸缩,而且是并发安全的,提供 Get 和 Put 两个公开方法,有以下几个特点:
- 分为当前 Goroutine 绑定的 P 持有的本地 localPool 和全局共享 Pool
- Put 会先放到当前 P 本地(private),如果没有则放入全局区(shared)
- Get 获取到的值是任意的
- Get 优先获取 private,如果获取不到则加锁从 shared 获取,如果还是获取不到会尝试从其他 private 偷取,若还是获取不到则会调用 New 方法新建
- 假如 Get 从池中获取到值,返回之前一定会从池中删除
- 垃圾回收时会将池清空
下面看个例子:
package main
import (
"fmt"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
)
func main() {
// 禁用GC,并保证在main函数执行结束前恢复GC
defer debug.SetGCPercent(debug.SetGCPercent(-1))
var count int32
newFunc := func() interface{} {
return atomic.AddInt32(&count, 1)
}
pool := sync.Pool{New: newFunc}
// 没有值时调用 New 返回
v1 := pool.Get()
fmt.Printf("Value 1: %v\n", v1)
// 临时对象池的存入
pool.Put(10)
v2 := pool.Get()
fmt.Printf("Value 2: %v\n", v2)
// 手动触发 GC,再次 Get 将调用 New 获取
runtime.GC()
v3 := pool.Get()
fmt.Printf("Value 3: %v\n", v3)
pool.New = nil
// 当被 New 被赋值为 nil 后,再获取一律为 nil
v4 := pool.Get()
fmt.Printf("Value 4: %v\n", v4)
}