Golang中的sync包提供了以下几种较为常用的异步操作方法,下面将一一进行介绍。
- Mutex:互斥锁
- RWMutex:读写锁
- WaitGroup:等待组
- Once:仅执行一次
- Cond:信号量
- Pool:临时对象池
- Map:并发map
本文参考了文章:【Go】Golang Sync包
1. Mutex互斥锁
通过给临界资源加锁,以让多个协程互斥访问临界资源。相关函数有:
mutex.Lock()mutex.Unlock()
下面代码是模拟一共有10张票,有4个窗口并发售票的场景,票就是临界资源,在同一时刻只允许有一个对象对其进行访问。
package main
import (
"fmt"
"sync"
)
var ticktsNum = 10
var wg sync.WaitGroup
var mutex sync.Mutex
func main() {
wg.Add(4)
go saleTickts(1)
go saleTickts(2)
go saleTickts(3)
go saleTickts(4)
wg.Wait()
}
func saleTickts(window int) {
defer wg.Done()
for {
mutex.Lock()
if ticktsNum > 0 {
fmt.Printf("窗口%d售卖,剩余票%d张\n", window, ticktsNum)
ticktsNum--
mutex.Unlock()
} else {
fmt.Printf("窗口%d售卖,票数不足\n", window)
mutex.Unlock()
break
}
}
}
2. RWMutex读写锁
Go语言包中的sync包提供了两种锁类型:互斥锁sync.Mutex和读写锁sync.RWMutex。其中RWMutex是基于Mutex实现的,只读锁的实现使用类似引用计数器的功能。读写锁与互斥锁的最大不同是它可以针对读操作或写操作单独加锁,性能更高。读写锁的特点:
- 多个协程可以同时读
- 在一个协程写的时候,其他协程读写操作都不能进行
读写锁的相关函数有:
rwMutex.RLock()rwMutex.RUnlock()rwMutex.Lock()rwMutex.Unlock()
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
var rwMutex sync.RWMutex
func main() {
wg.Add(3)
go readData(1)
go readData(2)
go readData(3)
wg.Wait()
fmt.Println("读锁测试结束\n")
time.Sleep(3 * time.Second)
wg.Add(3)
go writeData(1)
go readData(2)
go writeData(3)
wg.Wait()
fmt.Println("写锁测试结束")
}
func readData(num int) {
defer wg.Done()
fmt.Println(num, "开始读")
rwMutex.RLock()
fmt.Println(num, "正在读...")
time.Sleep(2 * time.Second)
rwMutex.RUnlock()
fmt.Println(num, "读结束,释放锁")
}
func writeData(num int) {
defer wg.Done()
fmt.Println(num, "开始写")
rwMutex.Lock()
fmt.Println(num, "正在写...")
time.Sleep(2 * time.Second)
rwMutex.Unlock()
fmt.Println(num, "写结束,释放锁")
}
3. WaitGroup等待组
WaitGroup通过一个计数器counter来让主协程在还有子协程运行的时候进行等待
wg.Add(num)wg.Wait()wg. Done()
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
wg.Add(2)
go func1()
go func2()
fmt.Println("WaitGroup进入阻塞...")
wg.Wait()
fmt.Println("WaitGroup结束阻塞...")
}
func func1() {
for i := 0; i < 10; i++ {
fmt.Println("func1执行... ", i)
}
wg.Done()
}
func func2() {
defer wg.Done()
for i := 0; i < 10; i++ {
fmt.Println("\tfunc2执行... ", i)
}
}
4. Once仅执行一次
sync.Once可以控制某个函数仅执行一次,可以用它来实现单例模式或系统初始化等。使用主要分为以下两步:
once := &sync.Once{}once.Do(func () {})
package main
import (
"fmt"
"sync"
)
func main() {
once := sync.Once{}
for i := 0; i < 100; i++ {
fmt.Printf("index: %v\n", i)
go func(index int) {
once.Do(func() {
fmt.Printf("hello world! index: %v\n", index)
})
}(i)
}
fmt.Println("sync.Once test end.")
}
5. Cond信号量
sync.Cond指的是同步条件变量,一般需要与互斥锁/读写锁组合使用,它实现了一个条件变量,在 Locker 的基础上增加的一个消息通知的功能,保存了一个通知列表,用来唤醒一个或所有因等待条件变量而阻塞的协程,以此来实现多个协程间的同步。相关方法有:
func NewCond(l Locker) *Condfunc (c *Cond) Broadcast()func (c *Cond) Signal()func (c *Cond) Wait()
package main
import (
"fmt"
"sync"
"time"
)
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
func main() {
for i := 0; i < 10; i++ {
go func(x int) {
cond.L.Lock() //获取锁
defer cond.L.Unlock() //释放锁
cond.Wait() //等待通知,阻塞当前goroutine
fmt.Println(x)
}(i)
}
time.Sleep(time.Second * 1) // 睡眠1秒,使所有goroutine进入 Wait 阻塞状态
fmt.Println("Signal...")
cond.Signal() // 1秒后下发一个通知给已经获取锁的goroutine
time.Sleep(time.Second * 1)
fmt.Println("Signal...")
cond.Signal() // 1秒后下发下一个通知给已经获取锁的goroutine
time.Sleep(time.Second * 1)
cond.Broadcast() // 1秒后下发广播给所有等待的goroutine
fmt.Println("Broadcast...")
time.Sleep(time.Second * 1) // 睡眠1秒,等待所有goroutine执行完毕
}
6. Pool临时对象池
sync.Pool可以作为临时对象池来使用,不再自己单独创建对象,而是从临时对象池中获取出一个对象。
在使用时,首先要创建一个pool实例,并且配置一个New方法,声明pool元素的创建方法。如:
bufferpool := &sync.Pool {
New: func() interface {} {
println("Create new instance")
return struct{}{}
}
}
在创建好pool实例后,主要通过以下两个方法进行操作:
func (p *Pool) Get() interface{}func (p *Pool) Put(x interface{})
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// 用来统计实例真正创建的次数
var numCalcsCreated int32
// 创建实例的函数
func createBuffer() interface{} {
// 这里要注意下,非常重要的一点。这里必须使用原子加,不然有并发问题;
atomic.AddInt32(&numCalcsCreated, 1)
buffer := make([]byte, 1024)
return &buffer
}
func main() {
// 创建实例
bufferPool := &sync.Pool{
New: createBuffer,
}
// 多 goroutine 并发测试
numWorkers := 1024 * 1024
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
// 申请一个 buffer 实例
buffer := bufferPool.Get()
_ = buffer.(*[]byte)
// 释放一个 buffer 实例
defer bufferPool.Put(buffer)
}()
}
wg.Wait()
fmt.Printf("%d buffer objects were created.\n", numCalcsCreated)
}
7. Map并发map
Golang中自带的map对象不是并发安全的,Golang1.9版本后新增了sync.Map,它是原生支持并发安全的map。相关的方法有以下几个:
func (m *Map) Load(key interface{}) (value interface{}, ok bool)func (m *Map) Store(key, value interface{})func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)func (m *Map) Delete(key interface{})func (m *Map) Range(f func(key, value interface{}) bool)