有时候我们会需要去管理一些后台任务或者定时任务的执行状态或者生命周期等等,又或者后台任务执行超时后如何退出,或者异常情况下应该如何管理。本文通过系统中断简单模拟异常中断。

Runner包的代码如下:

package runner

import (
	"errors"
	"os"
	"os/signal"
	"time"
)

// runner包用于展示如何通过通道监视程序的执行时间, 如果程序运行时间太长
// 也可以用runner包来终止程序。当开发需要调度后台处理任务的程序时,这种模式会很有用。
// 这个程序可能会作为cron作业执行, 或者基于定时任务的云环境里执行

type Runner struct {
	// interrupt通道报告从操作系统发送的信号
	interrupt chan os.Signal

	// complete通道报告处理任务已经完成
	complete chan error

	// timeout 报告处理任务已经超时
	timeout <-chan time.Time

	// tasks持有一组以索引顺序依次执行的函数
	tasks 	[]func(int)
}

// ErrTimeout会在任务执行超时时返回
var ErrTimeout = errors.New("received timeout")

// ErrInterrupt会在接收到操作系统的事件时返回
var ErrInterrupt = errors.New("received interrupt")

// New返回一个新的准备使用的Runner
func New(d time.Duration) *Runner {
	return &Runner{
		interrupt: make(chan os.Signal, 1),
		complete:  make(chan error),
		timeout:   time.After(d),
	}
}

// Add将一个任务附加到Runner上。这个任务是一个
// 接受一个int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {
	r.tasks = append(r.tasks, tasks...)
}

// Start执行所有任务, 并监视通道事件
func (r *Runner) Start() error {
	// 我们希望接收所有中断信号
	signal.Notify(r.interrupt, os.Interrupt)

	// 启动一个goroutine执行任务数组
	go func() {
		r.complete <- r.run()
	}()

	select {
	// 当任务完成时发出的信号
	case err := <-r.complete:
		return err
	// 当任务处理超时时发出的信号
	case <-r.timeout:
		return ErrTimeout
	}
}

// run执行每一个已注册的任务
func (r *Runner) run() error {
	for id, task := range r.tasks {
		// 检测操作系统的中断信号
		if r.gotInterrupt() {
			return ErrInterrupt
		}
		// 执行已注册的任务
		task(id)
	}
	return nil
}

// 验证是否接收到了中断信号
func (r *Runner) gotInterrupt() bool {
	select {
	    // 当中断事件被触发时发出的信号
		case <-r.interrupt:
			// 停止接收后续的任何信号
			signal.Stop(r.interrupt)
			return true
		//	继续正常运行
		default:
			return false
	}
}

然后一起来看看main包中如何使用Runner包来管理后台任务的生命周期:

package main

import (
	"GoPratice/runner"
	"log"
	"os"
	"time"
)

// 这个示例程序演示如何使用通道来监视
// 程序运行的事件, 以及在程序运行时间过长时如何终止程序

// timeout规定了必须在多少秒内完成处理
const timeout = 10 * time.Second

func init() {
	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}

func main() {
	log.Println("Starting work.")

	// 本次执行分配超时时间
	r := runner.New(timeout)

	// 加入要执行的任务
	r.Add(createTask(), createTask(), createTask())

	// 执行任务并处理结果
	if err := r.Start(); err != nil {
		switch err {
		case runner.ErrTimeout:
			log.Println("Termination due to timeout")
			os.Exit(1)
		case runner.ErrInterrupt:
			log.Println("Termination due to interrupt")
			os.Exit(2)
		}
	}

	log.Println("Process ended.")
}

func createTask() func(int) {
	return func(id int) {
		log.Println("Processor - Task #%d.", id)
		time.Sleep(time.Duration(id) * time.Second)
	}
}