0. 前言
  • 最近使用 Golang 写一个并发执行的测试脚本
  • 之前习惯使用 Java,习惯性想先建一个线程池。然后意识到 Golang 没有封装好的线程池
  • 结合之前学习的 Goroutine 原理和 Golang 大道至简的设计思想,可能 Goroutine 的开销和切换代价比较低,不需要对并发数有过多限制
  • 但是 Goroutine 启动数量过多的话总感觉不太好,于是利用锁和通道实现了简单的线程池做并发控制,欢迎大家点评
1. 相关接口
  • 接口仿照 Java 的 ExecutorService 和 Runnable 接口定义:
// Task represents an in-process Goroutine task.
type Task interface {
    // Run method corresponds to Run method of Java's Runnable interface.
    Run()
}

// Executor defines the actions associated with the Goroutine pool.
type Executor interface {
    // Execute method corresponds to Execute method of Java's ExecutorService interface.
    Execute(task Task)
    // Wait waits for all the tasks to complete.
    Wait()
    // Done returns a channel which is closed if all the tasks completed.
    Done() chan struct{}
}
  • 接口调用:
func TestNewExecutor(t *testing.T) {
    t.Log(t.Name())
    ex := NewExecutor(4)

    for _, domain := range domains {
        ex.Execute(&TestTask{
            fmt.Sprintf("ping %s -c 10", domain),
        })
    }
    ex.Wait()
}
  • 首先定义一个 Executor,然后通过 Execute 传入 Task 对象,调用 Wait 方法等待所有任务结束
2. 具体实现
  • 主要利用 sync.Mutex 和 []channel struct{} 维护一个等待执行的任务队列
  • 任务传入时,等待一个 startCh 通道信号
  • 对于符合执行条件(未设定并发数或者当前执行任务小于并发数时)关闭 startCh 信号,解除阻塞
  • 任务执行完毕后,关闭 stopCh 信号,允许等待队列里的任务继续执行
  • 对于所有任务执行完毕(正在执行数和等待执行数均为 0),关闭 done 信号,解除整个 Executor 的阻塞,表示所有任务执行完毕
  • 部分代码如下:
package pkg

import (
    "sync"
)

// Task represents an in-process Goroutine task.
type Task interface {
    // Run method corresponds to Run method of Java's Runnable interface.
    Run()
}

// Executor defines the actions associated with the Goroutine pool.
type Executor interface {
    // Execute method corresponds to Execute method of Java's ExecutorService interface.
    Execute(task Task)
    // Wait waits for all the tasks to complete.
    Wait()
    // Done returns a channel which is closed if all the tasks completed.
    Done() chan struct{}
}

type executor struct {
    lock             sync.Mutex
    waitingTasks     []chan struct{}
    activeTasks      int64
    concurrencyLimit int64
    done           chan struct{}
}

func (ex *executor) Execute(task Task) {
    ex.start(task)
}

func (ex *executor) Wait() {
    <-ex.done
}

func (ex *executor) Done() chan struct{} {
    return ex.done
}

func (ex *executor) start(task Task) {
    startCh := make(chan struct{})
    stopCh := make(chan struct{})

    go startTask(startCh, stopCh, task)
    ex.enqueue(startCh)
    go ex.waitDone(stopCh)

}

// NewExecutor returns a new Executor.
func NewExecutor(concurrencyLimit int64) Executor {
    ex := &executor{
        waitingTasks:     make([]chan struct{}, 0),
        activeTasks:      0,
        concurrencyLimit: concurrencyLimit,
        done:           make(chan struct{}),
    }
    return ex
}

func startTask(startCh, stopCh chan struct{}, task Task) {
    defer close(stopCh)

    <-startCh
    task.Run()
}

func (ex *executor) enqueue(startCh chan struct{}) {
    ex.lock.Lock()
    defer ex.lock.Unlock()

    if ex.concurrencyLimit == 0 || ex.activeTasks < ex.concurrencyLimit {
        close(startCh)
        ex.activeTasks++
    } else {
        ex.waitingTasks = append(ex.waitingTasks, startCh)
    }
}

func (ex *executor) waitDone(stopCh chan struct{}) {
    <-stopCh

    ex.lock.Lock()
    defer ex.lock.Unlock()

    if len(ex.waitingTasks) == 0 {
        ex.activeTasks--
        if ex.activeTasks == 0 {
            close(ex.done)
        }
    } else {
        close(ex.waitingTasks[0])
        ex.waitingTasks = ex.waitingTasks[1:]
    }
}
  • 通过 Executor 传入的任务首先开始执行 start 方法
  • start 方法里定义了该任务的 startCh 和 stopCh 信号
  • 各启动一个 Goroutine 等待任务开始和任务结束
  • 同时把表示任务的 startCh 加入等待队列中表示,队列需要靠 sync.Mutex 保护
  • 当一个任务结束时,解除 waitDone 方法的阻塞,启动队首的任务,解除 startTask 里的阻塞
  • 所有任务结束后,解除 Wait 方法里的阻塞
  • 完整代码参见上述链接