同步机制

由于并发编程没法直接保证协程的执行顺序,因此需要一个同步的机制来进行同步通信,以确保各个协程中的任务处于特定的状态再进行特定的后续操作。

channelsync.WaitGroup

Channel同步

使用无缓冲channel:

func main() {
    done := make(chan int)

    go func(){
        println("你好, 世界")
        done <- 1
    }()

    <-done
}

无缓存的Channel上的发送操作总在对应的接收操作完成前发生。

因此无缓冲区的channel又叫同步channel,它会保证读写是个同步操作。

使用有缓冲channel的写法:

func main() {
    done := make(chan int, 1)

    go func(){
        println("你好, 世界")
        done <- 1
    }()

    <-done
}

使用有缓冲的channel时必须注意空channel只会阻塞读操作,不会阻塞写操作,比如下面的写法就是无效的:

func main() {
    done := make(chan int)

    go func(){
        println("你好, 世界")
        <-done
    }()

    done <- 1
}

sync.WaitGroup同步

另一种常用方式是使用sync包提供的WaitGroup对象,如下:

package main

import "sync"

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            println("你好, 世界")
            wg.Done()
        }()
    }
    wg.Wait()
}

生产者-消费者模式

并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度,一个多生产者和消费者的例子:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var wg sync.WaitGroup
var producerWg sync.WaitGroup
var mu sync.Mutex

type Info struct {
    count int32
}

func countNums() map[int]*Info {
    ch := make(chan int, 1000)
    res := map[int]*Info{}
    for i := 0; i < 100; i++ {
        producerWg.Add(1)
        go func(out chan int, n int) {
            defer producerWg.Done()
            for i := 0; i < n; i++ {
                out <- rand.Int() % 10
            }
        }(ch, 10000)
    }

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(in chan int) {
            defer wg.Done()
            for key := range in {
                mu.Lock()
                if _, ok := res[key]; ok {
                    res[key].count++
                } else {
                    res[key] = &Info{
                        count: 1,
                    }
                }
                mu.Unlock()
            } 
        }(ch)
    }
    producerWg.Wait()
    close(ch)
    wg.Wait()
    return res
}

func main() {
    start := time.Now()
    res := countNums()
    fmt.Println(time.Since(start))
    for k, v := range res {
        fmt.Printf("%d: %d\n", k, v.count)
    }
}

安全的退出

Ctrl+C
// Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <-sig)

线程安全

先看一个线程不安全的累加器实现:

package main

import (
    "fmt"
    "sync"
    "time"
)

var x int32
var wg sync.WaitGroup

func UnsafeAdd(n int) {
    for i := 0; i < n; i++ {
        x += 1
    }
    wg.Done()
}

func AddWrapper(adder func(int)) {
    start := time.Now()
    wg.Add(2)
    go adder(1000000)
    go adder(1000000)
    wg.Wait()
    fmt.Println(x)
    fmt.Println(time.Since(start))
}

func main() {
    AddWrapper(UnsafeAdd)
}

-race
go run -race .\play
==================
WARNING: DATA RACE
Read at 0x00000065111c by goroutine 8:
  main.UnsafeAdd()
      E:/Dev/gogogo/play/play.go:14 +0x4e       

Previous write at 0x00000065111c by goroutine 7:
  main.UnsafeAdd()
      E:/Dev/gogogo/play/play.go:14 +0x68       

Goroutine 8 (running) created at:
  main.AddWrapper()
      E:/Dev/gogogo/play/play.go:30 +0x41

Goroutine 7 (running) created at:
  main.AddWrapper()
      E:/Dev/gogogo/play/play.go:22 +0x95
  main.main()
      E:/Dev/gogogo/play/play.go:30 +0x41
==================

意思并发存在数据竞争,不是线程安全的实现,那么要实现线程安全,我们有什么办法呢?

sync.Mutex锁机制

sync.MutexRWMutexMutex
func MutexAdd(n int) {
    for i := 0; i < n; i++ {
        mu.Lock()
        x += 1
        mu.Unlock()
    }
    wg.Done()
}

sync/atomic包的原子操作

上面的线程安全都是通过加锁的机制来保证的,实际上对于简单的数据的读写,Go还提供了一种原子操作包来保证线程安全,比如上述例子的实现可以这样:

func AtomicAdd(n int) {
    for i := 0; i < n; i++ {
        atomic.AddInt32(&x, 1)
    }
    wg.Done()
}

所谓的原子操作就是并发编程中“最小的且不可并行化”的操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。并且,由于原子操作低层是通过CPU指令的硬件层面实现,相比使用锁实现的线程安全机制开销更低。

比较两种实现的运行时间,输出如下,可以看到原子操作确实较锁实现更加高效:

func main() {
    // AddWrapper(UnsafeAdd)
    AddWrapper(MutexAdd)
    AddWrapper(AtomicAdd)
}

// output
2000000
36.0276ms
4000000
24.003ms

附完整代码:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var x int32
var wg sync.WaitGroup
var mu sync.Mutex

func UnsafeAdd(n int) {
    for i := 0; i < n; i++ {
        x += 1
    }
    wg.Done()
}

func MutexAdd(n int) {
    for i := 0; i < n; i++ {
        mu.Lock()
        x += 1
        mu.Unlock()
    }
    wg.Done()
}

func AtomicAdd(n int) {
    for i := 0; i < n; i++ {
        atomic.AddInt32(&x, 1)
    }
    wg.Done()
}

func AddWrapper(adder func(int)) {
    start := time.Now()
    wg.Add(2)
    go adder(1000000)
    go adder(1000000)
    wg.Wait()
    fmt.Println(x)
    fmt.Println(time.Since(start))
}

func main() {
    // AddWrapper(UnsafeAdd)
    AddWrapper(MutexAdd)
    AddWrapper(AtomicAdd)
}