一、runner
runner包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间过长,也可以使用runner包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。下面是runner包里面的runner.go。
其展示了依据调度运行的无人值守的面向任务的程序,及其所使用的并发模式:
1、程序可以在分配的时间内完成工作,正常终止
2、程序没有及时完成工作,自杀
3、接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作
package runner
import (
"errors"
"os"
"os/signal"
"times"
)
//Runner在给定的超时时间内执行一组任务
//并且在操作系统发送中断信号时结束这些任务
type Runner struct {
//interrupt通道 从操作系统发送的信号
interrupt chan os.Signal
//complete通道 报告处理任务已经完成
complete chan error
// timeout报告处理任务已经超时
time <-chan time.Time
//tasks持有一组以索引顺序依次执行的函数
tasks []func(int)
}
//ErrTimeout会在任务超时时返回
var ErrTimeout = errors.New("received timeout")
//ErrInterrupt会在接收到操作系统的事件时返回
var ErrInterrupt = errors.New("received interrupt")
//New返回一个新的准备使用的Runner
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: timr.After(d),
}
}
//Add将一个任务附加到Runner上。这个任务是一个接收一个int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
func (r *Runner) Start() error {
//我们希望接收所有中断信号
signal.Notify(r.interrupt, os.interrupt)
//用不同的goroutine执行不同的任务
go func() {
r.complete <- r.run()
}()
select {
//当任务处理完成时发出的信号
case err := <-r.complete :
return err
//当任务处理程序运行超时时发出的信号
case <-r.timeout:
return ErrTimeout
}
}
//run执行每一个已注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupt
}
//执行已注册的任务
task(id)
}
return nil
}
//验证是否接收到了中断信号
func (r *Runner) gotInterrupt() bool {
select {
//中断事件被触发时发出的信号
case <-r.interrupt:
//停止接收后续的任何信号
signal.Stop(r.interrupt)
return true
//继续正常运行
default:
return false
}
}
下面演示一下如何使用通道来监视程序的运行时间以在程序运行时间过长时终止程序
package main
import (
"log"
"time"
"github.com/goinaction/code/chapter7/patterns/runner"
)
//timeout规定了必须在多少秒内处理完成
const timeout = 3 * time.Second
func main() {
log.Println("Starting work.")
//为本次执行分配超时时间
r := runner.New(timeout)
//加入要执行的任务
r.Add(createTask(), createTask(), createTask())
//执行任务并处理结果
if err := r.Start(); err := nil {
switch err {
case runner.ErrTimeout:
log.Println("Termination due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Termination due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
//createTask返回一个根据id休眠指定秒数的示例任务
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
二、pool
pool包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况非常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。
package pool
import (
"errors"
"log"
"io"
"sync"
)
//pool管理一组可以安全地在多个goroutine间共享的资源。被管理的资源必须实现io.Closer接口
type pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
//表示请求了一个已经关闭的池
var ErrPoolClosed = errors.New("Pool has been closed.")
//创建一个用来管理资源的池
//这个池需要一个可以分配新资源的函数并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Size value too small.")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
//Acquire从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
//检查是否有空闲的资源
case r, ok := <-p.resources:
log.Println("Acquire:", "Shared Resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
//没有空闲资源可用,所以提供一个新资源
default:
log.Println("Acquire:", "New Resource")
return p.factory()
}
}
//Release将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
//保证本操作和Close操作的安全
P.m.Lock()
defer p.m.Unlock()
//池被关闭则销毁
if p.closed {
r.Close()
return
}
select {
//试图将这个资源放入队列
case p.resources <- r:
log.Println("Realse:", "In Queue")
//如果队列已满则关闭这个资源
default:
log.Println("Realse:", "Closing")
r.Close()
}
}
func (p *Pool) Close() {
//保证本操作与release操作的安全
p.m.Lock()
defer p.m.Unlock()
//如果pool已经被关闭,什么也不做
if p.closed {
return
}
p.closed = true
//清空通道资源之前将通道关闭,否则会发生死锁
close(p.resources)
//关闭资源
for r := range p.resources {
r.Close()
}
}
下面的代码展示了如何使用pool包来共享一组模拟的数据库连接
package main
import (
"log"
"io"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/goinaction/code/chapter7/patterns/pool"
)
const (
maxGoroutines = 25 //要使用的goroutine的数量
pooledResources = 2 //池中的资源的数量
)
//模拟要共享的资源
type dbConnection struct {
ID int32
}
//Close实现了io.Closer接口,以便dbConnection可以被池管理
//Close用来完成任意资源的释放管理
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
//idCounter用来给每个连接分配一个独一无二的id
var idCounter int32
//这是一个工厂函数,当需要一个新连接时,资源池会调用这个函数
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
//创建用来管理连接的池
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
//使用池里的连接来完成查询
for query := 0; query < maxGoroutines; query++ {
//每个goroutine需要自己复制一份要查询值的副本,不然所有的查询会共享同一个查询变量
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
//等待goroutine结束
wg.Wait()
//关闭池
log.Println("Shutdown Program.")
p.close()
}
//用来测试连接池里的资源
func performQueries(query int, p *pool.Pool) {
//从池里请求一个连接
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
//将该连接释放回池里
defer p.release(conn)
//用等待来模拟查询响应
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}
三、work
work包的目的是展示如何使用无缓冲的通道来创建一个goroutine池,这些goroutine执行并控制一组工作,让其并发执行。
work.go
package work
import "sync"
//Work必须满足接口类型才能使用工作池
type Worker interface {
Task()
}
//Pool提供一个goroutine池
//这个池可以完成任何已提交的Worker任务
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
func New(maxGoroutines int) *pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
//Run提交工作到工作池
func (p *Pool) Run(w Worker) {
p.work <- w
}
//Shutdown等待所有goroutine停止工作
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
main.go
package main
import (
"log"
"sync"
"time"
"github.com/goinaction/code/chapter7/patterns/work"
)
var names = []string {
"steve",
"bob",
"mary",
"therese",
"jason",
}
//namePrinter使用特定方式打印名字
type namePrinter struct {
name string
}
//Task实现Worker接口
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
func main() {
//使用两个goroutine来创建工作池
p := work.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
for _, name := range names {
np := namePrinter{
name: name,
}
go func() {
//将任务提交执行,当run返回我们就已经知道任务已经完成
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
//让工作池停止,等待现有工作完成
p.Shutdown()
}