一、并发和并行
- 并发(concurrency)
- 两个或多个事件在同一时间间隔发生
- 并行(parallellism)
- 两个或多个事件在同一时刻发生
二、协程
2.1、进程
- 分配系统资源(CPU时间、内存等)基本单位
- 有独立的内存空间,切换开销大(可能多个进程映射到同一个物理内存空间地址,所以当做进程切换时,开销就比较大)
2.2、线程
- 同一进程中的多个线程共享内存空间,线程切换代价小
- 多线程通信方便
- 从内核层面来看线程其实也是一个特殊的进程,它跟父进程共享了打开的文件和文件系统信息,共享了地址空间和信号处理函数
从内核层面,进程和线程没有太大区别,还是fork出来的一个进程,只不过这个进程共享了打开的文件和文件系统等等这些信息
进程和协程都是内核感知的,都是task_struct
2.3、协程
- Go语言中的轻量级线程实现(用户态)
- Golang在runtime、系统调用等多个方面对goroutine调度进行了封装和处理,当遇到长时间执行或者进行系统调用时,会主动把当前goroutine的CPU(p)转让出去,让其他goroutine能被调度并执行,也就是Golang从语言层面支持了协程
三、基于Communicating Sequential Process模型做多线程通信
3.1、CSP
- 描述两个独立的并发实体通过共享的通讯channel进行通信和并发模型
3.2、Go协程goroutine
- 是一种轻量线程,它不是操作系统的线程,而是将一个操作系统线程分段使用,通过调度器实现协作式调度
- 是一种绿色线程,微线程,它与Coroutine协程也有区别,能够在发现堵塞后启动新的微线程
3.3、通道channel
- 类似Unix的Pipe,用于协程之间通讯和同步。协程之间虽然解耦,但是他们和channel有着耦合
四、线程和协程的差异
4.1、每个goroutine(协程)默认占用内存远比Java、C的线程少
- goroutine:2KB
- 线程:8MB
4.2、线程goroutine切换开销方面,goroutine远比线程小
- 线程:涉及模式切换(从用户态切换到内核态)、16个寄存器、PC、SP...等寄存器的刷新
- goroutine:只有三个寄存器的值修改- PC/SP/DX
4.3、GOMAXPROCS
- 控制并行线程数量
4.4、协程示例
go functionName()
for i:=0;i<10;i++{
go fmt.Pringln(i)
}
time.Sleep(time.Second)
五、channel-多线程通信
5.1、channel是多个协程之间通讯的管道
- 一端发送数据,一端接收数据
- 同一时间只有一个协程可以访问数据,无共享内存模式可能出现的内存竞争
- 协调协程的执行顺序
5.2、声明方式
var identifier chan datatype
操作符<-
5.3、示例
ch := make(chan int)
go func(){
fmt.Println("hello from goroutine")
ch<-0 //数据写入channel
}()
i:=<-ch //从channel中取数据并赋值
六、通道缓冲
- 基于channel的通信时同步的
- 当缓冲区满时,数据的发送是阻塞的
- 通过make关键字创建通道时定义缓冲区容量,默认缓冲区容量为0
ch := make(chan int)
ch := make(chan int,1)
七、遍历通道缓冲区
ch := make(chan int, 10)
go func() {
for i := 0; i < 10; i++ {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(10) // n will be between 0 and 10
fmt.Println("putting: ", n)
ch <- n
}
close(ch)
}()
fmt.Println("hello from main")
for v := range ch {
fmt.Println("receiving: ", v)
}
八、单向通道
- 只发送通道
var sendOnly chan <- int
- 只接受通道
var readOnly <- chan int
- Isto webhook controller
func (w *WebhookCertPatcher) runWebhookController(stopChan <-chan struct{}){}
- 如何使用:双向通道转换
var c=make(chan int)
go prod(c)
go consume(c)
func prod(ch chan<-int){
for {ch <- 1}
}
func consume(ch<-chan int){
for {<-ch}
}
九、关闭通道
- 通道无需每次关闭
- 关闭的作用是告诉接收者该通道再无新数据发送
- 只有发送方需要关闭通道
ch := make(chan int)
defer close(ch)
if v,notClosed := <-ch;notClosed{
fmt.Println(v)
}
十、select
- 当多个协程同时运行时,可通过select轮询多个通道
- 如果所有通道都阻塞则等待,如定义了default则执行default
- 如多个通道就绪则随机选择
select {
case v:=<-ch1:
...
case v:=<-ch2:
...
default:
...
}
十一、定时器Timer
time.Ticker
timer := time.NewTimer(time.Second)
select {
// check normal channel
case <- ch:
fmt.Println("received form ch")
case <- timer.C:
fmt.Println("timeout waiting from channel ch")
}
十二、上下文Context
- 超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作
- Context是设置截止日期、同步信号,传递请求相关的结构体
type Context interface{
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
- 用法
context.Background
- Background通常被用于主函数、初始化以及测试中,作为一个顶层的Context,也就是说一般我们创建的context都是基于Background
context.TODO
- TODO是在不确定使用什么context的时候才会使用
context.WithDeadline
- 超时时间
context.WithValue
- 向context添加键值对
context.WithCancel
- 创建一个可取消的context
- 示例
package main
import (
"context"
"fmt"
"time"
)
func main() {
baseCtx := context.Background()
ctx := context.WithValue(baseCtx, "a", "b")
go func(c context.Context) {
fmt.Println(c.Value("a"))
}(ctx)
timeoutCtx, cancel := context.WithTimeout(baseCtx, time.Second)
defer cancel()
go func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
select {
case <-ctx.Done():
fmt.Println("child process interrupt...")
return
default:
fmt.Println("enter default")
}
}
}(timeoutCtx)
select {
case <-timeoutCtx.Done():
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}
// time.Sleep(time.Second * 5)
}
十三、如何停止一个子协程
done := make(chan bool)
go func() {
for {
select {
case <-done:
fmt.Println("done channel is triggerred, exit child go routine")
return
}
}
}()
close(done)
十四、基于context停止子协程
- context是Go语言对go routine和timer的封装
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
go process(ctx, 100*time.Millisecond)
<-ctx.Done()
fmt.Println("main:", ctx.Err())
- 示例
package main
import (
"fmt"
"time"
)
func main() {
messages := make(chan int, 10)
done := make(chan bool)
defer close(messages)
// consumer
go func() {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
select {
case <-done:
fmt.Println("child process interrupt...")
return
default:
fmt.Printf("send message: %d\n", <-messages)
}
}
}()
// producer
for i := 0; i < 10; i++ {
messages <- i
}
time.Sleep(5 * time.Second)
close(done)
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}
十五、理解线程安全
15.1、为什么会有线程不安全的情况?
其实跟CPU多核的架构相关,假定有一个程序,该程序跑了一些线程,对于传统语言而言,如Java,共享内存是比较常用的一种多线程通信机制,假设内存里保存了一个键值对,key:value1,此时有两个线程同时访问这块内存,在多核体系架构里,这两个线程可能从内核层面会被调度到多个CPU上。众所周知,在计算机里访问内存虽然比较快,但是针对CPU速度来说,内存的速度是远远不及的,如果每次内存的读取都去物理内存里面读,那么整个程序的效率就不会高。
15.2、该如何解决这个问题呢?
计算机里面在每一个CPU的内部都有一段缓存,比如说两个线程,线程1和线程2同时去访问这一组变量,那么这组变量在被第一次读取的时候都是去物理内存读取,读取完后,这一个变量会被缓存到每一个CPU的cache里面,这个时候就会出现问题了,假设说我第一个线程把当前的键值对做了一下修改,因为修改会先修改本地缓存,这个缓存不会立即把这个修改写回到内存(因为这样又意味着低效),那么在线程2里面它能看到的值依然是原值value1,所以此时两个线程对同一个变量,读取到的值就不一样了。如果我们做多线程代码编写,第一个线程set一个值,第二个线程去读的时候就应该是新值,但事实上并不是,因为你的缓存很可能并没有同步,这样就导致程序会出现bug,如何解决问题呢?最重要的就是加锁。
十六、锁-
什么是加锁呢?就是在修改内存的时候,如上图,当多个线程要访问同一块共享内存的时候,需要加一个锁,当第一个线程去访问这段内存的时候它加锁,在锁的内部,如果它需要去写这份数据,那这份数据会被同步回主内存里面,那么线程2要去访问这块内存的时候也需要加锁,而且这是一个互斥锁,所以当线程1对这块内存加锁的时候,线程2是没办法访问这块内存的,只有当线程1把线程锁释放掉,此时线程2才可以获取锁访问这块内存。那么通过这种方式来实现多线程的安全访问,确保一个线程修改数据,另一个线程确保读取到的是最新的数据。
sync包提供了锁的基本原语
sync.Mutex
Lock()Unlock()
sync.RWMutex
- 不限制并发读,只限制并发写和并发读写
sync.WaitGroup
- 等待一组goroutine返回
sync.Once
- 保证某段代码只执行一次
sync.Cond
- 让一组goroutine在满足特定条件时被唤醒
十七、Mutex示例
kubernetes 中的informer factory
informer首先主要目的是用来监听kubernetes的对象,informer本身有一个start,这里可以看到它有一个锁,这个地方加锁是因为接下来要去遍历这个informer,这个informer是一个map,map本身是不能并发读写的,如果你针对map一个线程在修改里面的值,另外一个map在读。或者说你两个线程同时修改map里的同一个key,那么这个时候就会发生冲突,整个程序会被panic掉,因为此时程序会认为多个人在改同一个map,它不知道怎么处理了,为了效率map本身它的原语是没有线程安全保证的。
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
- 示例
package main
import (
"fmt"
"sync"
"time"
)
func main() {
go rLock()
go wLock()
go lock()
time.Sleep(5 * time.Second)
}
func lock() {
lock := sync.Mutex{}
for i := 0; i < 3; i++ {
lock.Lock()
// defer 是整个函数执行完后执行
defer lock.Unlock()
fmt.Println("lock:", i)
}
}
func rLock() {
lock := sync.RWMutex{}
for i := 0; i < 3; i++ {
lock.RLock()
defer lock.RUnlock()
fmt.Println("rLock:", i)
}
}
func wLock() {
lock := sync.RWMutex{}
// 错误的写法,第二个循环就无法继续,无法获取锁
for i := 0; i < 3; i++ {
lock.Lock()
defer lock.Unlock()
fmt.Println("wLock:", i)
}
}
十八、 WaitGroup示例
协调主线程与子线程之间的调用关系
// CreateBatch create a batch of pods. All pods are created before
waiting.
func (c *PodClient) CreateBatch(pods []*v1.Pod) []*v1.Pod {
ps := make([]*v1.Pod, len(pods))
var wg sync.WaitGroup
for i, pod := range pods {
wg.Add(1)
go func(i int, pod *v1.Pod) {
defer wg.Done()
defer GinkgoRecover()
ps[i] = c.CreateSync(pod)
}(i, pod)
}
wg.Wait()
return ps
}
waitgroup是什么场景下使用呢?
就是一个主的线程,要启多个子线程,但是我希望多个子线程都处理完成以后主线程才继续,在kubernetes代码里就有这样的例子,上述例子是一个它的端到端测试案例里的一个代码案例,这里面有个函数叫CreaterBatch,然后传入了一堆的pod,就是kubernetes里面的一个对象,接下来就是定义了一个waitgroup,然后遍历pod数组,就传入了一个pod数组,去遍历,每次遍历到一个pod就waitgroup加1,然后就执行自己的逻辑,接下来waitgroup在这里wait,这里的wait就意味着当你这里加1。比如说这里有三个pod,循环运行三次,那么Add(1)就会变成执行三次,那么waitgroup就有3。所以它会等三个线程一起结束之后再去继续往下走。
- 示例
package main
import (
"fmt"
"sync"
"time"
)
func main(){
// waitBySleep()
waitByChannel()
}
func waitBySleep() {
/*sleep不能确保业务逻辑是要多久执行完
很难正确评估,所有具有局限性
*/
for i:=0;i<100;i++{
go fmt.Println(i)
}
time.Sleep(time.Second)
}
func waitByChannel() {
/*channel是父子线程相互通信的逻辑
第一可以用来传输数据
第二可以用来协调过个线程的执行顺序
*/
c := make(chan bool,100)
for i:=0;i<100;i++{
fmt.Println(i)
c<-true
}
for i:=0;i<100;i++{
<-c
}
}
/*针对多线程协调最正确的做法*/
func waitByWG() {
wg := sync.WaitGroup{}
// 初始化有100个线程
wg.Add(100)
for i:=0;i<100;i++{
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}
十九、Cond示例
kubernetes 中的队列,标准的生产者消费者模式
sync.NewCond(&sync.Mutex{})
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
kubernetes任何控制器都是生产者消费者模型,生产者和消费者之间就是通过这样一个queue去通信的。
二十、示例
- 基于 Channel 编写一个简单的单线程生产者消费者模型
- 队列:队列长度10,队列元素类型为 int
- 生产者:每1秒往队列中放入一个类型为 int 的元素,队列满时生产者可以阻塞
- 消费者:每一秒从队列中获取一个元素并打印,队列为空时消费者阻塞
package main
import (
"fmt"
"time"
)
/*
队列:
队列长度10,队列元素类型为int
生产者:
每 1 秒往队列中放入一个类型为 int 的元素,队列满时生产者可以阻塞
消费者:
每一秒从队列中获取一个元素并打印,队列为空时消费者阻塞
*/
func main() {
ch := make(chan int, 10)
go producer(ch)
consumer(ch)
}
// 生产者:只写通道
func producer(out chan<- int) {
defer close(out)
for i := 0; i < 10; i++ {
out <- i
// fmt.Printf("生产者: 生产了%v\n",i)
time.Sleep(time.Second)
}
}
// 消费者:只读通道
func consumer(in <-chan int) {
for res := range in{
fmt.Printf("消费者: 消费了%v\n", res)
time.Sleep(time.Second)
}
}