Golang中的sync包提供了以下几种较为常用的异步操作方法,下面将一一进行介绍。

  • Mutex:互斥锁
  • RWMutex:读写锁
  • WaitGroup:等待组
  • Once:仅执行一次
  • Cond:信号量
  • Pool:临时对象池
  • Map:并发map

本文参考了文章:【Go】Golang Sync包

1. Mutex互斥锁

通过给临界资源加锁,以让多个协程互斥访问临界资源。相关函数有:

mutex.Lock()mutex.Unlock()

下面代码是模拟一共有10张票,有4个窗口并发售票的场景,票就是临界资源,在同一时刻只允许有一个对象对其进行访问。

package main

import (
   "fmt"
   "sync"
)

var ticktsNum = 10
var wg sync.WaitGroup
var mutex sync.Mutex

func main() {
   wg.Add(4)
   go saleTickts(1)
   go saleTickts(2)
   go saleTickts(3)
   go saleTickts(4)
   wg.Wait()
}

func saleTickts(window int) {
   defer wg.Done()
   for {
      mutex.Lock()
      if ticktsNum > 0 {
         fmt.Printf("窗口%d售卖,剩余票%d张\n", window, ticktsNum)
         ticktsNum--
         mutex.Unlock()
      } else {
         fmt.Printf("窗口%d售卖,票数不足\n", window)
         mutex.Unlock()
         break
      }
   }
}

2. RWMutex读写锁

Go语言包中的sync包提供了两种锁类型:互斥锁sync.Mutex和读写锁sync.RWMutex。其中RWMutex是基于Mutex实现的,只读锁的实现使用类似引用计数器的功能。读写锁与互斥锁的最大不同是它可以针对读操作或写操作单独加锁,性能更高。读写锁的特点:

  • 多个协程可以同时读
  • 在一个协程写的时候,其他协程读写操作都不能进行

读写锁的相关函数有:

rwMutex.RLock()rwMutex.RUnlock()rwMutex.Lock()rwMutex.Unlock()
package main

import (
   "fmt"
   "sync"
   "time"
)

var wg sync.WaitGroup
var rwMutex sync.RWMutex

func main() {
   wg.Add(3)
   go readData(1)
   go readData(2)
   go readData(3)
   wg.Wait()
   fmt.Println("读锁测试结束\n")
   time.Sleep(3 * time.Second)

   wg.Add(3)
   go writeData(1)
   go readData(2)
   go writeData(3)
   wg.Wait()
   fmt.Println("写锁测试结束")
}

func readData(num int) {
   defer wg.Done()

   fmt.Println(num, "开始读")
   rwMutex.RLock()
   fmt.Println(num, "正在读...")
   time.Sleep(2 * time.Second)
   rwMutex.RUnlock()
   fmt.Println(num, "读结束,释放锁")
}

func writeData(num int) {
   defer wg.Done()

   fmt.Println(num, "开始写")
   rwMutex.Lock()
   fmt.Println(num, "正在写...")
   time.Sleep(2 * time.Second)
   rwMutex.Unlock()
   fmt.Println(num, "写结束,释放锁")
}

3. WaitGroup等待组

WaitGroup通过一个计数器counter来让主协程在还有子协程运行的时候进行等待

wg.Add(num)wg.Wait()wg. Done()
package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func main() {
	wg.Add(2)
	go func1()
	go func2()
	fmt.Println("WaitGroup进入阻塞...")
	wg.Wait()
	fmt.Println("WaitGroup结束阻塞...")
}

func func1() {
	for i := 0; i < 10; i++ {
		fmt.Println("func1执行... ", i)
	}
	wg.Done()
}

func func2() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		fmt.Println("\tfunc2执行... ", i)
	}
}

4. Once仅执行一次

sync.Once可以控制某个函数仅执行一次,可以用它来实现单例模式或系统初始化等。使用主要分为以下两步:

once := &sync.Once{}once.Do(func () {})
package main

import (
   "fmt"
   "sync"
)

func main() {
	once := sync.Once{}
	for i := 0; i < 100; i++ {
		fmt.Printf("index: %v\n", i)
		go func(index int) {
			once.Do(func() {
				fmt.Printf("hello world! index: %v\n", index)
			})
		}(i)
	}
	fmt.Println("sync.Once test end.")
}

5. Cond信号量

sync.Cond指的是同步条件变量,一般需要与互斥锁/读写锁组合使用,它实现了一个条件变量,在 Locker 的基础上增加的一个消息通知的功能,保存了一个通知列表,用来唤醒一个或所有因等待条件变量而阻塞的协程,以此来实现多个协程间的同步。相关方法有:

func NewCond(l Locker) *Condfunc (c *Cond) Broadcast()func (c *Cond) Signal()func (c *Cond) Wait()
package main

import (
    "fmt"
    "sync"
    "time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func main() {
    for i := 0; i < 10; i++ {
        go func(x int) {
            cond.L.Lock()         //获取锁
            defer cond.L.Unlock() //释放锁
            cond.Wait()           //等待通知,阻塞当前goroutine
            fmt.Println(x)
        }(i)
    }   
    time.Sleep(time.Second * 1)	// 睡眠1秒,使所有goroutine进入 Wait 阻塞状态
    fmt.Println("Signal...")
    cond.Signal()               // 1秒后下发一个通知给已经获取锁的goroutine
    time.Sleep(time.Second * 1)
    fmt.Println("Signal...")
    cond.Signal()               // 1秒后下发下一个通知给已经获取锁的goroutine
    time.Sleep(time.Second * 1)
    cond.Broadcast()            // 1秒后下发广播给所有等待的goroutine
    fmt.Println("Broadcast...")
    time.Sleep(time.Second * 1)	// 睡眠1秒,等待所有goroutine执行完毕
}

6. Pool临时对象池

sync.Pool可以作为临时对象池来使用,不再自己单独创建对象,而是从临时对象池中获取出一个对象。

在使用时,首先要创建一个pool实例,并且配置一个New方法,声明pool元素的创建方法。如:

bufferpool := &sync.Pool {
    New: func() interface {} {
        println("Create new instance")
        return struct{}{}
    }
}

在创建好pool实例后,主要通过以下两个方法进行操作:

func (p *Pool) Get() interface{}func (p *Pool) Put(x interface{})
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// 用来统计实例真正创建的次数
var numCalcsCreated int32

// 创建实例的函数
func createBuffer() interface{} {
    // 这里要注意下,非常重要的一点。这里必须使用原子加,不然有并发问题;
    atomic.AddInt32(&numCalcsCreated, 1)
    buffer := make([]byte, 1024)
    return &buffer
}

func main() {
    // 创建实例
    bufferPool := &sync.Pool{
        New: createBuffer,
    }

    // 多 goroutine 并发测试
    numWorkers := 1024 * 1024
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    for i := 0; i < numWorkers; i++ {
        go func() {
            defer wg.Done()
            // 申请一个 buffer 实例
            buffer := bufferPool.Get()
            _ = buffer.(*[]byte)
            // 释放一个 buffer 实例
            defer bufferPool.Put(buffer)
        }()
    }
    wg.Wait()
    fmt.Printf("%d buffer objects were created.\n", numCalcsCreated)
}

7. Map并发map

Golang中自带的map对象不是并发安全的,Golang1.9版本后新增了sync.Map,它是原生支持并发安全的map。相关的方法有以下几个:

func (m *Map) Load(key interface{}) (value interface{}, ok bool)func (m *Map) Store(key, value interface{})func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)func (m *Map) Delete(key interface{})func (m *Map) Range(f func(key, value interface{}) bool)