1.5.2 原子操作

所谓的原子操作就是并发编程中“最小的且不可并行化”的操作。通常,如果多个并发体对同一个共享资源进行的操作是原子的话,那么同一时刻最多只能有一个并发体对该资源进行操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。

sync.Mutex
import (
    "sync"
)

var total struct {
    sync.Mutex
    value int
}

func worker(wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i <= 100; i++ {
        total.Lock()
        total.value += i
        total.Unlock()
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go worker(&wg)
    go worker(&wg)
    wg.Wait()

    fmt.Println(total.value)
}
workertotal.value += isync.Mutextotal
sync/atomic
import (
    "sync"
    "sync/atomic"
)

var total uint64

func worker(wg *sync.WaitGroup) {
    defer wg.Done()

    var i uint64
    for i = 0; i <= 100; i++ {
        atomic.AddUint64(&total, i)
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)

    go worker(&wg)
    go worker(&wg)
    wg.Wait()
}
atomic.AddUint64total

原子操作配合互斥锁可以实现非常高效的单件模式。互斥锁的代价比普通整数的原子读写高很多,在性能敏感的地方可以增加一个数字型的标志位,通过原子检测标志位状态降低互斥锁的使用次数来提高性能。

type singleton struct {}

var (
    instance    *singleton
    initialized uint32
    mu          sync.Mutex
)

func Instance() *singleton {
    if atomic.LoadUint32(&initialized) == 1 {
        return instance
    }

    mu.Lock()
    defer mu.Unlock()

    if instance == nil {
        defer atomic.StoreUint32(&initialized, 1)
        instance = &singleton{}
    }
    return instance
}
sync.Once
type Once struct {
    m    Mutex
    done uint32
}

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 1 {
        return
    }

    o.m.Lock()
    defer o.m.Unlock()

    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}
sync.Once
var (
    instance *singleton
    once     sync.Once
)

func Instance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}
sync/atomicatomic.ValueLoadStoreinterface{}
var config atomic.Value // 保存当前配置信息

// 初始化配置信息
config.Store(loadConfig())

// 启动一个后台线程, 加载更新后的配置信息
go func() {
    for {
        time.Sleep(time.Second)
        config.Store(loadConfig())
    }
}()

// 用于处理请求的工作者线程始终采用最新的配置信息
for i := 0; i < 10; i++ {
    go func() {
        for r := range requests() {
            c := config.Load()
            // ...
        }
    }()
}

这是一个简化的生产者消费者模型:后台线程生成最新的配置信息;前台多个工作者线程获取最新的配置信息。所有线程共享配置信息资源。

1.5.3 顺序一致性内存模型

如果只是想简单地在线程之间进行数据同步的话,原子操作已经为编程人员提供了一些同步保障。不过这种保障有一个前提:顺序一致性的内存模型。要了解顺序一致性,我们先看看一个简单的例子:

var a string
var done bool

func setup() {
    a = "hello, world"
    done = true
}

func main() {
    go setup()
    for !done {}
    print(a)
}
setupadonetruemainfor !done {}donetrue
maindoneasetupdonemainmain

在Go语言中,同一个Goroutine线程内部,顺序一致性内存模型是得到保证的。但是不同的Goroutine之间,并不满足顺序一致性内存模型,需要通过明确定义的同步事件来作为同步的参考。如果两个事件不可排序,那么就说这两个事件是并发的。为了最大化并行,Go语言的编译器和处理器在不影响上述规定的前提下可能会对执行语句重新排序(CPU也会对一些指令进行乱序执行)。

a = 1; b = 2;a = 1;b = 2;b = 2;a = 1;a = 1; b = 2;
func main() {
    go println("你好, 世界")
}
mainmain

用前面的原子操作并不能解决问题,因为我们无法确定两个原子操作之间的顺序。解决问题的办法就是通过同步原语来给两个事件明确排序:

func main() {
    done := make(chan int)

    go func(){
        println("你好, 世界")
        done <- 1
    }()

    <-done
}
<-donedone <- 1done <- 1println("你好, 世界")
sync.Mutex
func main() {
    var mu sync.Mutex

    mu.Lock()
    go func(){
        println("你好, 世界")
        mu.Unlock()
    }()

    mu.Lock()
}
mu.Unlock()println("你好, 世界")mainmu.Lock()mu.Unlock()sync.Mutex

1.5.4 初始化顺序

前面函数章节中我们已经简单介绍过程序的初始化顺序,这是属于Go语言面向并发的内存模型的基础规范。

main.mainmainmaininitinitinitinitmaininitmain.main

图 1-12 包初始化流程

main.maininitmain.main
initmain

1.5.5 Goroutine的创建

go
var a string

func f() {
    print(a)
}

func hello() {
    a = "hello, world"
    go f()
}
go f()hellohellof()hellohello"hello, world"hello

1.5.6 基于Channel的通信

Channel通信是在Goroutine之间进行同步的主要方法。在无缓存的Channel上的每一次发送操作都有与其对应的接收操作相配对,发送和接收操作通常发生在不同的Goroutine上(在同一个Goroutine上执行2个操作很容易导致死锁)。无缓存的Channel上的发送操作总在对应的接收操作完成前发生.

var done = make(chan bool)
var msg string

func aGoroutine() {
    msg = "你好, 世界"
    done <- true
}

func main() {
    go aGoroutine()
    <-done
    println(msg)
}
msgdonedoneprintln
close(c)done <- false
var done = make(chan bool)
var msg string

func aGoroutine() {
    msg = "你好, 世界"
    close(done)
}

func main() {
    go aGoroutine()
    <-done
    println(msg)
}

对于从无缓冲Channel进行的接收,发生在对该Channel进行的发送完成之前。

基于上面这个规则可知,交换两个Goroutine中的接收和发送操作也是可以的(但是很危险):

var done = make(chan bool)
var msg string

func aGoroutine() {
    msg = "hello, world"
    <-done
}
func main() {
    go aGoroutine()
    done <- true
    println(msg)
}
maindone <- true<-donemsg = "hello, world"println(msg)msgdonemaindoneprintlndone = make(chan bool, 1)maindone <- true<-done
KK+CCC

我们可以根据控制Channel的缓存大小来控制并发执行的Goroutine的最大数目, 例如:

var limit = make(chan int, 3)

func main() {
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    select{}
}
select{}mainfor{}<-make(chan int)mainos.Exit(0)

1.5.7 不靠谱的同步

前面我们已经分析过,下面代码无法保证正常打印结果。实际的运行效果也是大概率不能正常输出结果。

func main() {
    go println("你好, 世界")
}

刚接触Go语言的话,可能希望通过加入一个随机的休眠时间来保证正常的输出:

func main() {
    go println("hello, world")
    time.Sleep(time.Second)
}
mainprintlnmainmainmain
sync

解决同步问题的思路是相同的:使用显式的同步。