channel
锁的作用都是为了解决并发情况下共享数据的原子操作和最终一致性问题,在系统介绍 sync 包提供的各种锁之前,我们先来聊聊什么情况下需要用到锁。
什么时候需要用到锁:竞态条件与同步机制一旦数据被多个线程共享,那么就很可能会产生争用和冲突的情况,这种情况也被称为竞态条件(race condition),这往往会破坏共享数据的一致性。举个例子,同时有多个线程连续向同一个缓冲区写入数据块,如果没有一个机制去协调这些线程的写入操作的话,那么被写入的数据块就很可能会出现错乱。从而出现和预期结果不一致的现象。
在这种情况下,我们就需要采取一些措施来协调它们对共享数据的修改,这通常就会涉及到同步操作。一般来说,同步的用途有两个,一个是避免多个线程在同一时刻操作同一个数据块,另一个是协调多个线程避免它们在同一时刻执行同一个代码块。但是目的是一致的,那就是保证共享数据原子操作和一致性。
由于这样的数据块和代码块的背后都隐含着一种或多种资源(比如存储资源、计算资源、I/O 资源、网络资源等等),所以我们可以把它们看做是共享资源。我们所说的同步其实就是在控制多个线程对共享资源的访问:一个线程在想要访问某一个共享资源的时候,需要先申请对该资源的访问权限,并且只有在申请成功之后,访问才能真正开始;而当线程对共享资源的访问结束时,它还必须归还对该资源的访问权限,若要再次访问仍需申请。
你可以把这里所说的访问权限想象成一块令牌,线程一旦拿到了令牌,就可以进入指定的区域,从而访问到资源,而一旦线程要离开这个区域了,就需要把令牌还回去,绝不能把令牌带走。或者我们把共享资源看作是有锁的资源,当某个线程获取到共享资源的访问权限后,给资源上锁,这样,其他线程就不能访问它,直到该线程执行完毕,释放锁,这样其他线程才能通过竞争获取对资源的访问权限,依次类推。
这样一来,我们就可以保证多个并发运行的线程对这个共享资源的访问是完全串行的,只要一个代码片段需要实现对共享资源的串行化访问,就可以被视为一个临界区(critical section),也就是我刚刚说的,由于要访问到资源而必须进入的那个区域。
临界区总是需要通过同步机制进行保护的,否则就会产生竞态条件,导致数据不一致。
sync包互斥锁: sync.Mutex
一个互斥锁可以被用来保护一个临界区,我们可以通过它来保证在同一时刻只有一个 goroutine 处于该临界区之内(同一个时刻只有一个线程能够拿到锁)
先通过一个并发读写的例子演示一下,当多线程同时访问全局变量时,结果会怎样?
package main
import ("fmt")
var count int
func main() {
for i := 0; i < 2; i++ {
go func() {
for i := 1000000; i > 0; i-- {
count ++
}
fmt.Println(count)
}()
}
fmt.Scanf("\n") //等待子线程全部结束
}
运行结果:
980117
1011352 //最后的结果基本不可能是我们想看到的:200000
修改代码,在累加的地方添加互斥锁,就能保证我们每次得到的结果都是想要的值
package main
import ("fmt"
"sync"
)
var (
count int
lock sync.Mutex
)
func main() {
for i := 0; i < 2; i++ {
go func() {
for i := 1000000; i > 0; i-- {
lock.Lock()
count ++
lock.Unlock()
}
fmt.Println(count)
}()
}
fmt.Scanf("\n") //等待子线程全部结束
}
运行结果:
1952533
2000000 //最后的线程打印输出
每当有 goroutine 想进入临界区时,都需要先对它进行锁定,并且,每个 goroutine 离开临界区时,都要及时地对它进行解锁,锁定和解锁操作分别通过互斥锁 sync.Mutex 的 Lock 和 Unlock 方法实现。使用互斥锁的时候有以下注意事项:
- 不要重复锁定互斥锁;
- 不要忘记解锁互斥锁,必要时使用 defer 语句;
- 不要对尚未锁定或者已解锁的互斥锁解锁;
- 不要在多个函数之间直接传递互斥锁。【??????】
Mutex 是最简单的一种锁类型,同时也比较暴力,当一个 goroutine 获得了 Mutex 后,其他 goroutine 就只能乖乖等到这个 goroutine 释放该 Mutex,不管是读操作还是写操作都会阻塞,但其实我们知道为了提升性能,读操作往往是不需要阻塞的,因此 sync 包提供了 RWMutex 类型,即读/写互斥锁,简称读写锁,这是一个是单写多读模型。
读写锁:sync.RWMutex
在读多写少的环境中,可以优先使用读写互斥锁(sync.RWMutex),它比互斥锁更加高效。sync 包中的 RWMutex 提供了读写互斥锁的封装
读写锁分为:读锁和写锁
- 在读锁占用的情况下,会不允许写,但是可以让其他goroutine 读。也就是多个 goroutine 可同时获取读锁
- 当有一个goroutine获取到了写锁,那么就不允许其他任何goroutine 读或者写当前临界区。整个锁相当于由该 goroutine 独占,和 Mutex 一样
通过设置写锁,同样可以实现数据的一致性:
- 读锁调用 RLock() 方法开启,通过 RUnlock 方法释放
- 写锁通过 Lock 方法启用,通过 Unlock 方法释放
package main
import ("fmt"
"sync"
)
var (
count int
rwLock sync.RWMutex
)
func main() {
for i := 0; i < 2; i++ {
go func() {
for i := 1000000; i > 0; i-- {
rwLock.Lock()
count ++
rwLock.Unlock()
}
fmt.Println(count)
}()
}
fmt.Scanf("\n") //等待子线程全部结束
}
运行结果:
1968637
2000000
同样,使用 RWMutex 时,任何一个 Lock() 或 RLock() 均需要保证有对应的 Unlock() 或 RUnlock() 调用与之对应,否则可能导致等待该锁的所有 goroutine 处于阻塞状态,甚至可能导致死锁
条件变量:sync.Cond
sync 包还提供了一个条件变量类型 sync.Cond,它可以和互斥锁或读写锁(以下统称互斥锁)组合使用,用来协调想要访问共享资源的线程。
sync.Cond
sync.Cond
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
提供了三个方法:
// 等待通知
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// 单发通知
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// 广播通知
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
我们可以通过 sync.NewCond 返回对应的条件变量实例,初始化的时候需要传入互斥锁,该互斥锁实例会赋值给 sync.Cond 的 L 属性:
locker := &sync.Mutex{}
cond := sync.NewCond(locker)
sync.Cond 主要实现一个条件变量,假设 goroutine A 执行前需要等待另外一个 goroutine B 的通知,那么处于等待状态的 goroutine A 会保存在一个通知列表,也就是说需要某种变量状态的 goroutine A 将会等待(Wait)在那里,当某个时刻变量状态改变时,负责通知的 goroutine B 会通过对条件变量通知的方式(Broadcast/Signal)来通知处于等待条件变量的 goroutine A,这样就可以在共享内存中实现类似「消息通知」的同步机制。
示例:
假设我们有一个读取器和一个写入器,读取器必须依赖写入器对缓冲区进行数据写入后,才可以从缓冲区中读取数据,写入器每次完成写入数据后,都需要通过某种通知机制通知处于阻塞状态的读取器,告诉它可以对数据进行访问,这种场景正好可以通过条件变量来实现:
package main
import (
"bytes"
"fmt"
"io"
"sync"
"time"
)
type DataBucket struct {
buffer *bytes.Buffer // 缓冲区
mutex *sync.RWMutex // 互斥锁
cond *sync.Cond // 条件变量
}
func NewDataBucket() *DataBucket {
buf := make([]byte, 0)
db := &DataBucket{
buffer: bytes.NewBuffer(buf),
mutex: new(sync.RWMutex),
cond: nil,
}
db.cond = sync.NewCond(db.mutex.RLocker())
return db
}
// 读取器
func (db *DataBucket) Read(i int) {
db.mutex.RLock() // 打开读锁
defer db.mutex.RUnlock() // 结束后释放读锁
var data []byte
var d byte
var err error
for{
//每次读取一个字节
d, err = db.buffer.ReadByte()
if err == io.EOF { // 缓冲区数据为空时执行
fmt.Printf("reader-%d: %s\n", i, data)
db.cond.Wait() // 缓冲区为空,通过 Wait 方法等待通知,进入阻塞状态
data = data[:0] // 将 data 清空
continue
}
data = append(data, d) // 将读取到的数据添加到data中
}
}
// 写入器
func (db *DataBucket) Put(d []byte) (int, error) {
db.mutex.Lock() // 打开写锁
defer db.mutex.Unlock() // 结束后释放写锁
//写入一个数据块
n, err := db.buffer.Write(d)
db.cond.Signal() // 写入数据后通过 Signal 通知处于阻塞状态的读取器
return n, err
}
func main() {
db := NewDataBucket()
go db.Read(1) // 开启读取器协程
for j := 0; j < 10; j++ { // 启动多个写入器
go func(i int) {
d := fmt.Sprintf("data-%d", i)
db.Put([]byte(d)) // 写入数据到缓冲区
}(j)
time.Sleep(100 * time.Millisecond) // 每次启动一个写入器暂停100ms,让读取器阻塞
}
fmt.Scanf("\n")
}
通知单个阻塞线程用 Signal 方法,通知多个阻塞线程需要使用 Broadcast 方法
package main
import (
"bytes"
"fmt"
"io"
"sync"
"time"
)
type DataBucket struct {
buffer *bytes.Buffer // 缓冲区
mutex *sync.RWMutex // 互斥锁
cond *sync.Cond // 条件变量
}
func NewDataBucket() *DataBucket {
buf := make([]byte, 0)
db := &DataBucket{
buffer: bytes.NewBuffer(buf),
mutex: new(sync.RWMutex),
cond: nil,
}
db.cond = sync.NewCond(db.mutex.RLocker())
return db
}
// 读取器
func (db *DataBucket) Read(i int) {
db.mutex.RLock() // 打开读锁
defer db.mutex.RUnlock() // 结束后释放读锁
var data []byte
var d byte
var err error
for{
//每次读取一个字节
d, err = db.buffer.ReadByte()
if err == io.EOF { // 缓冲区数据为空时执行
fmt.Printf("reader-%d: %s\n", i, data)
db.cond.Wait() // 缓冲区为空,通过 Wait 方法等待通知,进入阻塞状态
data = data[:0] // 将 data 清空
continue
}
data = append(data, d) // 将读取到的数据添加到data中
}
}
// 写入器
func (db *DataBucket) Put(d []byte) (int, error) {
db.mutex.Lock() // 打开写锁
defer db.mutex.Unlock() // 结束后释放写锁
//写入一个数据块
n, err := db.buffer.Write(d)
db.cond.Broadcast() // 写入数据后通过 Broadcast 通知处于阻塞状态的读取器
return n, err
}
func main() {
db := NewDataBucket()
for i := 1; i < 3; i++ { // 启动多个读取器
go db.Read(i)
}
for j := 0; j < 10; j++ { // 启动多个写入器
go func(i int) {
d := fmt.Sprintf("data-%d", i)
db.Put([]byte(d)) // 写入数据到缓冲区
}(j)
time.Sleep(100 * time.Millisecond) // 每次启动一个写入器暂停100ms,让读取器阻塞
}
fmt.Scanf("\n")
}
可以看到,通过互斥锁+条件变量,我们可以非常方便的实现多个 Go 协程之间的通信,但是这个还是比不上 channel,因为 channel 还可以实现数据传递,条件变量只是发送信号,唤醒被阻塞的协程继续执行,另外 channel 还有超时机制,不会出现协程等不到信号一直阻塞造成内存堆积问题,换句话说,channel 可以让程序更可控。
原子操作
通过对互斥锁的合理使用,我们可以使一个 Go 协程在执行临界区中的代码时,不被其他的协程打扰,实现串行执行,不过,虽然不会被打扰,但是它仍然可能会被中断(interruption)。
所谓中断其实是 CPU 和操作系统级别的术语,并发执行的协程并不是真的并行执行,而是通过 CPU 的调度不断从运行状态切换到非运行状态,或者从非运行状态切换到运行状态,在用户看来,好像是「同时」在执行。我们把代码从运行状态切换到非运行状态称之为中断。中断的时机很多,比如任何两条语句执行的间隙,甚至在某条语句执行的过程中都是可以的,即使这些语句在临界区内也是如此。所以我们说互斥锁只能保证临界区代码的串行执行,不能保证这些代码执行的原子性,因为原子操作不能被中断。
原子操作通常是 CPU 和操作系统提供支持的,由于执行过程中不会中断,所以可以完全消除竞态条件,从而绝对保证并发安全性,此外,由于不会中断,所以原子操作本身要求也很高,既要简单,又要快速。Go 语言的原子操作也是基于 CPU 和操作系统的,由于简单和快速的要求,只针对少数数据类型的值提供了原子操作函数,这些函数都位于标准库代码包 sync/atomic 中。这些原子操作包括加法(Add)、比较并交换(Compare And Swap,简称 CAS)、加载(Load)、存储(Store)和交换(Swap)。
下面我们简单介绍下这些原子操作。
加减法
我们可以通过 atomic 包提供的下列函数实现加减法的原子操作,第一个参数是操作数对应的指针,第二个参数是加/减值:
虽然这些函数都是以 Add 前缀开头,但是对于减法可以通过传递负数实现,不过对于后三个函数,由于操作数类型是无符号的,所以无法显式传递负数来实现减法。比如我们测试下 AddInt32 函数:
var i int32 = 1
atomic.AddInt32(&i, 1)
fmt.Println("i = i + 1 =", i)
atomic.AddInt32(&i, -1)
fmt.Println("i = i - 1 =", i)
比较并交换
比较并交换相关的原子函数如下,第一个参数是操作数对应的指针,第二、三个参数是待比较和交换的旧值和新值:
这些函数会在交换之前先判断 old 和 new 对应的值是否相等,如果不相等才会交换:
var a int32 = 1
var b int32 = 2
var c int32 = 2
atomic.CompareAndSwapInt32(&a, a, b)
atomic.CompareAndSwapInt32(&b, b, c)
fmt.Println("a, b, c:", a, b, c)
加载
加载相关的原子操作函数如下,这些操作函数仅传递一个参数,即待操作数对应的指针,并且有一个返回值,返回传入指针指向的值:
这里的「原子性」指的是当读取该指针指向的值时,CPU 不会执行任何其它针对此值的读写操作。例如,我们可以这样调用 LoadInt32 函数:
var x int32 = 100
y := atomic.LoadInt32(&x)
fmt.Println("x, y:", x, y)
存储
存储相关的原子函数如下所示,第一个参数表示待操作变量对应的指针,第二个参数表示要存储到待操作变量的数值:
该操作可以看作是加载操作的逆向操作,一个用于读取,一个用于写入,通过上述原子函数存储数值的时候,不会出现存储流程进行到一半被中断的情况,比如我们可以通过 StoreInt32 函数改写上述设置 y 变量的操作代码:
var x int32 = 100
var y int32
atomic.StoreInt32(&y, atomic.LoadInt32(&x))
fmt.Println("x, y:", x, y)
交换
交换和比较并交换看起来有点类似,但是交换不关心待操作数的旧值,不管旧值和新值是否相等,都会通过新值替换旧值,不过,交换函数有一个返回值,会返回旧值:
var j int32 = 1
var k int32 = 2
j_old := atomic.SwapInt32(&j, k)
fmt.Println("old,new:", j_old, j)
原子类型
为了扩大原子操作的适用范围,Go 语言在 1.4 版本发布的时候向 sync/atomic 包中添加了一个新的类型 Value,此类型的值相当于一个容器,可以被用来「原子地」存储和加载任意的值:
type Value struct {
v interface{}
}
atomic.Value 类型是开箱即用的,我们声明一个该类型的变量(以下简称原子变量)之后就可以直接使用了。这个类型使用起来很简单,它只有 Store 和 Load 两个指针方法,这两个方法都是原子操作:
var v atomic.Value
v.Store(100)
fmt.Println("v:", v.Load())
不过,虽然简单,但还是有一些需要注意的地方。首先,存储值不能是 nil;其次,我们向原子类型存储的第一个值,决定了它今后能且只能存储该类型的值。如果违背这两条,编译时会抛出 panic。
sync.WaitGroup 和 sync.Once
在介绍通道的时候,如果启用了多个子协程,我们是这样实现主协程等待子协程执行完毕并退出的:声明一个和子协程数量一致的通道数组,然后为每个子协程分配一个通道元素,在子协程执行完毕时向对应的通道发送数据;然后在主协程中,我们依次读取这些通道接收子协程发送的数据,只有所有通道都接收到数据才会退出主协程。
代码看起来是这样的:
chs := make([]chan int, 10)
for i := 0; i < 10; i++ {
chs[i] = make(chan int)
go add(1, i, chs[i])
}
for _, ch := range chs {
<- ch
}
那有没有更好的实现呢?这就要引入我们今天要讨论的主题:sync 包提供的 sync.WaitGroup 类型。
sync.WaitGroup
sync.WaitGroup
AddWaitGroupAddDoneDoneWaitGroupWaitWaitGroup
package main
import (
"fmt"
"sync"
)
func add_num(a, b int, deferFunc func()) {
defer func() {
deferFunc()
}()
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
}
func main() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go add_num(i, 1, wg.Done)
}
wg.Wait()
}
以上就是 sync.WaitGroup 类型的典型使用场景,通过它我们可以轻松实现一主多子的协程协作。需要注意的是,该类型计数器不能小于0,否则会抛出如下 panic:
panic: sync: negative WaitGroup counter
sync.Once
sync.Once
//sync.Once 还提供了一个 uint32 类型的 done 字段,它的作用是记录 Do 传入函数被调用次数,显然,其对应的值只能是 0 和 1
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 { // 如果 done 字段的值已经是 1 了, 表示该函数已经调用过
return
}
// 调用 sync.Once 提供的互斥锁阻塞其它代码对该类型的访问,然后通过原子操作将 done 的值设置为 1,并调用传入函数。
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
示例:
package main
import (
"fmt"
"sync"
)
func dosomething(o *sync.Once) {
fmt.Println("start:")
o.Do(func() {
fmt.Println("Do something...")
})
fmt.Println("Finished")
}
func main() {
o := &sync.Once{}
go dosomething(o)
go dosomething(o)
go dosomething(o)
go dosomething(o)
fmt.Scanf("\n")
}
sync.Once.Do
通过 context 包提供的函数实现多协程之间的协作
在 sync.WaitGroup 时必须直到子协程的总量,如果不知道怎么办?
一种解决方案是通过 sync.WaitGroup 分批启动子协程,具体实现代码如下:
package main
import (
"fmt"
"sync"
)
func addNum(a, b int, deferFunc func()) {
defer func() {
deferFunc()
}()
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
}
func main() {
total := 10
step := 2
fmt.Println("启动子协程...")
var wg sync.WaitGroup
for i := 0; i < total; i = i + step {
wg.Add(step)
for j := 0; j < step; j++ {
go addNum(i + j, 1, wg.Done)
}
wg.Wait()
}
fmt.Println("所有子协程执行完毕.")
}
context