Goroutine

Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。

Goroutine 的优势:

  • 与线程相比,Goroutines 成本很低。 它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需要增长和缩小,context switch 也很快,而在线程的情况下,堆栈大小必须指定并固定。

  • Goroutine 被多路复用到更少数量的 OS 线程。 一个包含数千个 Goroutine 的程序中可能只有一个线程。如果该线程中的任何 Goroutine 阻塞等待用户输入,则创建另一个 OS 线程并将剩余的 Goroutine 移动到新的 OS 线程。所有这些都由运行时处理,作为开发者无需耗费心力关心,这也使得我们有很干净的 API 来支持并发。

  • Goroutines 使用 channel 进行通信。 channel 的设计有效防止了在使用 Goroutine 访问共享内存时发生竞争条件(race conditions) 。channel 可以被认为是 Goroutine 进行通信的管道。

下文中我们会以「协程」来代指 Goroutine。

协程池

在高并发场景下,我们可能会启动大量的协程来处理业务逻辑。协程池是一种利用池化技术,复用对象,减少内存分配的频率以及协程创建开销,从而提高协程执行效率的技术。

gopoolKitexgopool
gopool
gopoolgo
gopoolgo func(){...}gopool.Go(func(){...})
gopool

old:

go func() {
	// do your job
}()
复制代码

new:

import (
    "github.com/bytedance/gopkg/util/gopool"
)

gopool.Go(func(){
	/// do your job
})
复制代码

核心实现

gopool

Pool

Pool
type Pool interface {
	// 池子的名称
	Name() string
        
	// 设置池子内Goroutine的容量
	SetCap(cap int32)
        
	// 执行 f 函数
	Go(f func())
        
	// 带 ctx,执行 f 函数
	CtxGo(ctx context.Context, f func())
        
	// 设置发生panic时调用的函数
	SetPanicHandler(f func(context.Context, interface{}))
}
复制代码
gopoolpool
Kitex
type pool struct {
	// 池子名称
	name string

	// 池子的容量, 即最大并发工作的 goroutine 的数量
	cap int32
        
	// 池子配置
	config *Config
        
	// task 链表
	taskHead  *task
	taskTail  *task
	taskLock  sync.Mutex
	taskCount int32

	// 记录当前正在运行的 worker 的数量
	workerCount int32

	// 当 worker 出现panic时被调用
	panicHandler func(context.Context, interface{})
}

// NewPool 创建一个新的协程池,初始化名称,容量,配置
func NewPool(name string, cap int32, config *Config) Pool {
	p := &pool{
		name:   name,
		cap:    cap,
		config: config,
	}
	return p
}
复制代码
NewPoolPoolpool

Task

type task struct {
	ctx context.Context
	f   func()

	next *task
}
复制代码
taskftask
poolpooltask
pooltaskHeadtaskTailtaskCounttaskLock

Worker

type worker struct {
	pool *pool
}
复制代码
workerpoolworkergoroutinepooltask
func (w *worker) run() {
	go func() {
		for {
                        // 声明即将执行的 task
			var t *task
                        
                        // 操作 pool 中的 task 链表,加锁
			w.pool.taskLock.Lock()
			if w.pool.taskHead != nil {
                                // 拿到 taskHead 准备执行
				t = w.pool.taskHead
                                
                                // 更新链表的 head 以及数量
				w.pool.taskHead = w.pool.taskHead.next
				atomic.AddInt32(&w.pool.taskCount, -1)
			}
                        // 如果前一步拿到的 taskHead 为空,说明无任务需要执行,清理后返回
			if t == nil {
				w.close()
				w.pool.taskLock.Unlock()
				w.Recycle()
				return
			}
			w.pool.taskLock.Unlock()
                        
                        // 执行任务,针对 panic 会recover,并调用配置的 handler
			func() {
				defer func() {
					if r := recover(); r != nil {
						msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
						logger.CtxErrorf(t.ctx, msg)
						if w.pool.panicHandler != nil {
							w.pool.panicHandler(t.ctx, r)
						}
					}
				}()
				t.f()
			}()
			t.Recycle()
		}
	}()
}
复制代码

整体来看

CtxGo(context.Context, f func())

func Go(f func()) {
	CtxGo(context.Background(), f)
}

func CtxGo(ctx context.Context, f func()) {
	defaultPool.CtxGo(ctx, f)
}

func (p *pool) CtxGo(ctx context.Context, f func()) {

        // 创建一个 task 对象,将 ctx 和待执行的函数赋值
	t := taskPool.Get().(*task)
	t.ctx = ctx
	t.f = f
        
        // 将 task 插入 pool 的链表的尾部,更新链表数量
	p.taskLock.Lock()
	if p.taskHead == nil {
		p.taskHead = t
		p.taskTail = t
	} else {
		p.taskTail.next = t
		p.taskTail = t
	}
	p.taskLock.Unlock()
	atomic.AddInt32(&p.taskCount, 1)
        
        
	// 以下两个条件满足时,创建新的 worker 并唤起执行:
	// 1. task的数量超过了配置的限制 
	// 2. 当前运行的worker数量小于上限(或无worker运行)
	if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
        
                // worker数量+1
		p.incWorkerCount()
                
                // 创建一个新的worker,并把当前 pool 赋值
		w := workerPool.Get().(*worker)
		w.pool = p
                
                // 唤起worker执行
		w.run()
	}
}
复制代码

相信看了代码注释,大家就能理解发生了什么。

gopooldefaultPoolpoolgopool.CtxGo()defaultPool

func init() {
	defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}

const (
	defaultScalaThreshold = 1
)

// Config is used to config pool.
type Config struct {
	// 控制扩容的门槛,一旦待执行的 task 超过此值,且 worker 数量未达到上限,就开始启动新的 worker
	ScaleThreshold int32
}

// NewConfig creates a default Config.
func NewConfig() *Config {
	c := &Config{
		ScaleThreshold: defaultScalaThreshold,
	}
	return c
}

复制代码
defaultPoolgopool.DefaultPool
CtxGogopoolworker
workerworkergoroutineworkerworker.run()workerpool

三个角色的定位

taskworkergoroutinetaskpooltasktaskworker

使用 sync.Pool 进行性能优化

gopoolgopoolsync.Pool

这里建议大家直接看源码,其实在上面的代码中已经有所涉及。

  • task 池化
var taskPool sync.Pool

func init() {
	taskPool.New = newTask
}

func newTask() interface{} {
	return &task{}
}

func (t *task) Recycle() {
	t.zero()
	taskPool.Put(t)
}
复制代码
  • worker 池化

var workerPool sync.Pool

func init() {
	workerPool.New = newWorker
}

func newWorker() interface{} {
	return &worker{}
}

func (w *worker) Recycle() {
	w.zero()
	workerPool.Put(w)
}