图片.png

1. 深入理解并发和并行

  • 并行的关键是同时可以做很多事情,常见的有多机并行,多核并行;并发是同时管理很多事情,在规定时间内这些事情都能得到执行。
  • 并发重在避免阻塞,使程序不会因为阻塞而停止执行;并行则是依赖硬件和操作系统的资源。
  • 并发在于结构,并行在于执行。应用程序具备好的并发结构,操作系统才能更好地利用硬件并行执行,合理地进行调度,提升CPU利用率。
  • 应用层程序员提升程序并发处理能力的一个重要手段就是为程序设计良好的并发结构。请看下面几张趣图:
图片.png
很多时候,并发都是要比并行好,并发强调的是过程,而并行强调的是动作,但在一些复杂的场景中,并发+并行这种混合模式才是效率最高的。

2. goroutine介绍

(1) 什么是goroutine

goroutine是建立在线程之上的轻量级的抽象。在Go语言中,当一个函数被创建为goroutine时,Go会将其视为一个独立的工作单元,并且能够以非常低的代价实现并发执行多个goroutine。

(2) goroutine与线程的区别

  • 操作系统的线程的切换需要进行系统调用(内核态),在高并发的场景下比较耗时耗资源,如下图所示。而Go语言的goroutine是在用户态下完成调度的,可以看成用户态下的线程,执行效率更高。
图片.png
  • goroutine相比于线程更加轻量化,关键点在于goroutine所占用的内存,均在栈中进行管理,而所占用栈空间的大小又是由runtime按需分配的,初始时只有2kb~8kb,当栈容量不够时会创建一个两倍旧栈大小的新栈,然后把旧栈的内容拷贝进来(连续栈机制, Go1.4之后开始使用)。而以64位环境的JVM为例,会为每个线程分配1MB的栈空间。
  • goroutine创建和销毁的开销更小,由于线程创建时需要向操作系统申请资源,并且在销毁时将资源归还,因此它的创建和销毁的开销比较大。但goroutine的创建和销毁是由go语言在运行时自己管理的,因此开销更低。如下图所示。
图片.png
  • 切换开销更小。这是goroutine于线程的主要区别,也是golang能够实现高并发的主要原因。线程的调度方式是抢占式的,如果一个线程的执行时间超过了分配给它的时间片,就会被其它可执行的线程抢占。在线程切换的过程中需要保存/恢复所有的寄存器信息。而goroutine的调度是协同式的,它不会直接地与操作系统内核打交道。当goroutine进行切换的时候,之后很少量的寄存器需要保存和恢复(PC和SP)。说是协同式的调度,其实只是用户态下一种更加合理的抢占式的调度,因此gouroutine的切换效率更高。

3. goroutine的并发调度模型——MPG

(1) 常见的并发模型

多进程模型

进程被多核CPU并发调度,优点是每个进程都有自己独立的内存空间,隔离性好,健壮性高,进程的切换消耗较大,进程间的通信需要多次在内核区和用户区之间赋值数据。

多线程模型

多线程的优点是通过共享内存进行通信,更快捷,切换代价小;缺点是多个线程共享内存空间,容易导致数据访问混乱,某个线程的误操作内存挂掉可能危及线程组,健壮性不高。

用户级多线程模型
  • 多用户级线程又分为两种,一种是以M:1的方式,M个用户线程对应一个内核进程,这种情况容易因为一个系统阻塞,其它用户线程都会阻塞,不能利用多核的优势;
  • 另一种模式就是M:N的方式,M个用户线程对应N个内核线程,这种模式一般需要语言运行时或库的支持,效率最高。
协程
  • 协程的由来:程序并发处理的要求越来越高,但时不能无限制地增加系统线程数,线程数过多会导致操作系统的调度开销变大,单个线程被分配的运行时间片减少,单个线程的运行速度降低,单靠增加系统线程数不能满足要求。为了不让系统线程无限膨胀,于是就有了协程的概念。
  • 协程是一种用户态轻量级的线程,协程的调度完全由用户态程序控制,协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存,下次切回来后恢复。每个内核线程可以对应多个用户线程,当一个协程执行体被阻塞了,调度器会调度另一个协程执行,最大效率地利用操作系统分给系统线程的时间片。前面提到的用户级线程的M:N模型就是一种高效的协程模型。
  • 协程的优点

    • 控制了系统线程数,保证每个线程的运行时间片充足;
    • 调度层能进行用户态的切换,不会导致单个线程阻塞整个程序的情况,尽量减少上下文切换,提升运行效率;
    • 实质上就是把复杂耗时的调度逻辑从内核态搬到了用户态,让操作系统能够更专注于执行,提高CPU的利用率。如下图所示(这样画不太准确,会意即可)。
图片.png

(2) Go语言MPG模式实现CSP

CSP并发模型介绍
  • CSP(communicating sequential processes)并发模型。不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的方式来共享内存”。 “不要以共享内存的方式来通信,相反,要通过通信来共享内存。”
  • 其最基本的思想是:将并发系统抽象为Channel和Process两部分,Channel用来传递消息,Process用于执行,Channel和Process之间相互独立,没有从属关系,消息的发送和接收有严格的时序限制。在Go语言中Channel就是通道,用于通信,Process就是goroutine,用于执行。
MPG模式
  • Go语言的调度模型抽象出三个实体:M,P,G

    • G是Go运行时对goroutine的抽象描述,G中存放并发执行的代码入口地址,上下文,运行栈等执行相关的元信息,G并不是执行体。
    • M时关联了OS内核线程的用户线程,是操作系统层面调度和执行的实体。M仅负责执行G。M启动时进入go运行时(runtime)的管理代码,获取G和P资源,然后执行调度。
    • P代表M运行G所需要的上下文环境,也是处理用户级代码逻辑的处理器。P本地持有G的队列(LRQ)。
  • go运行时调度器(重点!!!)

    • 管理和调度被创建的goroutine,将其分配到空闲P的队列中等待被执行。
    • 同时调度器能将M和P进行绑定,构成一个运行时环境,让G可以被执行。
    • 如果P本地的G队列(LRQ)空了,调度器则会去全局队列(GRQ)"偷取"一部分G给P,如果全局队列也空了,调度器则去其它的P"偷取"一部分G给P,这就是Work Stealing算法的基本原理。调度流程如下图所示。
图片.png
  • 如果正在运行的G需要执行一个阻塞的系统调用,如打开一个文件,此时M和G会从逻辑处理器P上分离,该线程会继续阻塞,等待系统调用的返回。同时,调度器会创建一个新的M并绑定到P上。之后,调度器会从P的本地队列(LRQ)取出另一个G来运行。具体结果如下图所示。
图片.png
  • 如果一个G(goroutine)运行时需要进行网络I/O调用,会比较特殊,此时G会和当前P分离,并转移到网络轮询器的运行时上等待调用,一旦轮询器准备就绪对应的G就可以重新分配到P上来处理了,如下图所示。
图片.png
  • M0和G0:M0是程序启动后的主线程,M0负责执行初始化操作和启动第一个G,之后M0和其它的M就一样了。同时每个M都会有一个自己的管理堆栈G0,G0不指向任何可执行的函数,在没有执行并发程序的过程中,M一直是在G0栈上工作的。有新的G要被执行时,M就跳转到G的程序代码点开始执行。
  • Go程序启动的初始化过程

    • 分配和检查栈空间
    • 初始化参数和环境变量
    • 当前运行线程标记为M0,M0是程序启动的主线程。
    • 用运行时初始化函数runtime.schedinit进行初始化,主要是初始化内存空间分配器,GC,生成空闲P列表
    • 在M0上调度第一个G,这个G运行runtime.main函数。
    • runtime.main会拉起运行时的监控线程(执行调度器的功能),然后调用main包的init()初始化函数,最后执行main函数。
  • 什么时候创建M,P, G

    • 在程序启动过程中会初始化空闲P列表,P被创建出来,同时第一个G也是在初始化的过程中被创建出来。后续在有并发的地方都有可能创建G,由于G不是执行实体,只是一个数据结构(存放的元信息),所以G是可以被复用的,在需要G结构时,首先要去P的空闲G的列表里寻找已经结束的goroutine,其中G会被缓存起来。
    • 每个并发调用都会初始化一个新的G任务,然后唤醒M执行任务。唤醒时不是特定唤醒某个M去工作,而是先尝试获取当前线程M,如果无法获取,则从全局的调度的空闲M列表中获取可用的M,如果没有可用的,则新建M,然后绑定P和G进行运行。所以M和P不是一一对应的,M时按需分配的,但运行时有上限值(默认时1000)
  • “m:n”关系:

    • P的数目默认是CPU逻辑核心的数量,如下图所示,我的笔记本有8个逻辑处理器,可以通过sync.GOMAXPROCS函数设置或查询,M和P的数目差不多,但运行时会根据当前的状态动态地创建M,如上文所言;G的数量与P的数量是m:n的关系,m可以远大于n, 达到百万级别。
    图片.png
    • 因此,我们可以把gorutine的调度模型看成一个线程池,线程池里维护了n个左右的线程M,m个G(groutine)映射到n个M上,goroutine的内存模型如下图所示
图片.png

4. goroutine的使用

(1) goroutine的创建(1个逻辑处理器)

package main

import (
    "fmt"
    "runtime"
    "sync"
)

// goroutine的创建
func main() {
    // 分配一个逻辑处理器给调度器使用
    runtime.GOMAXPROCS(1)
    // wg用来等待程序完成
    // 计数加2,表示要等待两个goroutine
    var wg sync.WaitGroup
    wg.Add(2)
    fmt.Println("Start goroutines:")

    // 以匿名函数的形式创建goroutine
    go func() {
        // 在函数退出时调用Done来通知main函数已经完成
        defer wg.Done()  // defer 注册一个延迟调用
        // 显示小写字母表10次
        for i:=0; i<10; i++ {
            for ch := 'a'; ch < 'a' + 26; ch++ {
                fmt.Printf("%c ", ch)
            }
        }
    }()

    // 以匿名函数的形式创建另外一个goroutine
    go func() {
        // 在函数退出时调用Done来通知main函数已经完成
        defer wg.Done()  // defer 注册一个延迟调用
        // 显示大写字母表10次
        for i:=0; i<10; i++ {
            for ch := 'A'; ch < 'A' + 26; ch++ {
                fmt.Printf("%c ", ch)
            }
        }
    }()
    // 等待所有goroutine结束
    wg.Wait()
    fmt.Println("\nProgram terminate.")
}

编译上述代码运行结果如下图:
图片.png
从代码的运行结果可以看出,当只有一个逻辑处理器时,两个goroutine只能排队,一个一个执行,但需要说明的是,当一个goroutine执行时间超过了调度器分配给它的时间片时,即使当前的goroutine没有执行完毕,也会切换执行下一个goroutine,上面代码的执行结果说明在时间片内,一个goroutine已经执行完毕,无需切换,执行流程如下:
图片.png
但如果一个goroutine之一个时间片内执行不完,即使只有一个一个逻辑处理器也会切换goroutine,我们对上面的代码做如下修改,其它代码均不变。
图片.png
再次编译运行代码,部分结果如下图:
图片.png
可以看到,由于每个goroutine打印1000次字母表,比较耗时,此时两个goroutine依次切换执行,直到执行结束,如下图所示:
图片.png

(2) goroutine的创建(多个逻辑处理器)

继续对上面的代码进行修改,修改如下,其余代码不变:
图片.png
编译运行修改后的代码,结果如下:
图片.png
此时两个goroutine同时执行,如下图所示:
图片.png

(3) goroutine的竞争状态

如果两个或者多个goroutine在没有互相同步的情况下,访问每个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态(race condition)。竞争状态的存在会让程序变得复杂。对一个共享资源的读写操作必须保证线程安全的,看如下代码:

package main

import (
    "fmt"
    "runtime"
    "sync"
)

// goroutine的竞争状态

var (
    counter int // 所有goroutine的共享变量
    wg      sync.WaitGroup
)

// 改变counter的函数
func add(id int) {
    // 在函数退出时调用Done来通知main函数工作已经完成
    defer wg.Done()

    for i := 0; i < 2; i++ {
        // 捕获counter的值
        value := counter
        // 当前goroutine从线程中退出,重新放回到队列,切换其它线程
        runtime.Gosched()

        // 增加本地value的值
        value++
        // 将该值保存到counter
        counter = value
    }

}

func main() {
    // 计数加2,表示要等待两个goroutine
    wg.Add(2)

    // 创建两个goroutine
    go add(1)
    go add(2)

    // 等待goroutine执行结束
    wg.Wait()
    fmt.Println("Final counter:", counter)
}

编译以上代码运行结果如下:

D:\projects\goproject\src\go_code\Args\exe03>main.exe
Final counter: 2

在以上代码中,共享变量counter会被4次读和写操作,每个goroutine执行两次,但是程序结束后,counter的值却为2.原因如下图所示:
图片.png
由上图可知,每个goroutine都会覆盖另一个goroutine的工作。这种覆盖发生在goroutine切换的时候。每个goroutine获取一个counter变量的副本后,我们就调用runtime.Gosched()强制切换goroutine,当这个goroutine再次运行的时候,counter的值已经发生改变了,它会继续用它旧的counter的值递增,来更新已经改变的counter,就导致结果覆盖了另一个goroutine已经完成的工作。
用竞争检测器来编译并运行以上代码,会检测代码中是否有goroutine竞争存在,结果如下所示:

D:\projects\goproject\src\go_code\Args\exe03>go build -race main.go

D:\projects\goproject\src\go_code\Args\exe03>main.exe
==================
WARNING: DATA RACE
Read at 0x000000619d08 by goroutine 8:
  main.add()
      D:/projects/goproject/src/go_code/Args/exe03/main.go:23 +0x80

Previous write at 0x000000619d08 by goroutine 7:
  main.add()
      D:/projects/goproject/src/go_code/Args/exe03/main.go:30 +0xa1

Goroutine 8 (running) created at:
  main.main()
      D:/projects/goproject/src/go_code/Args/exe03/main.go:41 +0x90

Goroutine 7 (finished) created at:
  main.main()
      D:/projects/goproject/src/go_code/Args/exe03/main.go:40 +0x6f
==================
Final counter: 4
Found 1 data race(s)

Go语言提供了传统的同步机制,对共享资源加锁,包括原子操作,互斥锁,读写锁在atomic和sync包里都有相应的实现。

互斥锁解决方案
package main

import (
    "fmt"
    "runtime"
    "sync"
)

// goroutine的竞争状态

var (
    counter int // 所有goroutine的共享变量
    wg      sync.WaitGroup

    mutex sync.Mutex  // 定义互斥锁
)

// 改变counter的函数
func add(id int) {
    // 在函数退出时调用Done来通知main函数工作已经完成
    defer wg.Done()

    for i := 0; i < 2; i++ {
        // 捕获counter的值
        mutex.Lock()
        {
            value := counter
            // 当前goroutine从线程中退出,重新放回到队列,切换到其它goroutine
            runtime.Gosched()

            // 增加本地value的值
            value++
            // 将该值保存到counter
            counter = value
        }
        mutex.Unlock()
    }

}

func main() {
    // 计数加2,表示要等待两个goroutine
    wg.Add(2)

    // 创建两个goroutine
    go add(1)
    go add(2)

    // 等待goroutine执行结束
    wg.Wait()
    fmt.Println("Final counter:", counter)
}

用竞争检查编译运行,结果如下所示:正确,不存在竞争

D:\projects\goproject\src\go_code\Args\exe03>go build -race main.go

D:\projects\goproject\src\go_code\Args\exe03>main.exe
Final counter: 4
原子操作解决方案
package main

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
)

// goroutine的竞争状态

var (
    counter int64 // 所有goroutine的共享变量
    wg      sync.WaitGroup

)

// 改变counter的函数
func add(id int) {
    // 在函数退出时调用Done来通知main函数工作已经完成
    defer wg.Done()

    for i := 0; i < 2; i++ {
        atomic.AddInt64(&counter, 1)
        // 当前goroutine从线程中退出,重新放回到队列,切换到其它goroutine
        runtime.Gosched()

    }

}

func main() {
    // 计数加2,表示要等待两个goroutine
    wg.Add(2)

    // 创建两个goroutine
    go add(1)
    go add(2)

    // 等待goroutine执行结束
    wg.Wait()
    fmt.Println("Final counter:", counter)
}

用竞争检查编译运行,结果如下所示:正确,不存在竞争

D:\projects\goproject\src\go_code\Args\exe03>go build -race main.go

D:\projects\goproject\src\go_code\Args\exe03>main.exe
Final counter: 4

参考文献

  1. 李文塔《Go语言核心编程》第5章相关内容
  2. William Kennedy《Go语言实战》第6章相关内容

这篇文章陆陆续续写了3天,但对细节方面很有很多不满意,限于篇幅,有很多点还是没有讲清楚,其中可能还有很多错误或不当之处,欢迎各位大佬能批评指正,本菜鸡会及时更新。

我是lioney,年轻的后端攻城狮一枚,爱钻研,爱技术,爱分享。
个人笔记,整理不易,感谢阅读、点赞和收藏。
文章有任何问题欢迎大家指出,也欢迎大家一起交流后端各种问题!