简介

工作中经常有定时执行某些代码块的需求,如果是php代码,一般写个脚本,然后用Cron实现。

Go里提供了两种定时器:Timer(到达指定时间触发且只触发一次)和 Ticker(间隔特定时间触发)。

Timer和Ticker的实现几乎一样,Ticker相对复杂一些,这里主要讲述一下Ticker是如何实现的。

让我们先来看一下如何使用Ticker

//创建Ticker,设置多长时间触发一次
ticker := time.NewTicker(time.Second * 10)
go func() {
   for range ticker.C { //遍历ticker.C,如果有值,则会执行do someting,否则阻塞
      //do someting
   }
}()

代码很简洁,给开发者提供了巨大的便利。那GoLang是如何实现这个功能的呢?

原理 NewTicker

time/tick.go的NewTicker函数:

调用NewTicker可以生成Ticker,关于这个函数有四点需要说明

  1. NewTicker主要作用之一是初始化
  2. NewTicker中的时间是以纳秒为单位的,when返回的从当前时间+d的纳秒值,d必须为正值
  3. Ticker结构体中包含channel,sendTime是个function,逻辑为用select等待c被赋值
  4. 神秘的startTimer函数,揭示channel、sendTime是如何关联的
// NewTicker returns a new Ticker containing a channel that will send the
// time with a period specified by the duration argument.
// It adjusts the intervals or drops ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will panic.
// Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
   if d <= 0 {
      panic(errors.New("non-positive interval for NewTicker"))
   }
   // Give the channel a 1-element time buffer.
   // If the client falls behind while reading, we drop ticks
   // on the floor until the client catches up.
   c := make(chan Time, 1)
   t := &Ticker{
      C: c,
      r: runtimeTimer{
         when:   when(d),
         period: int64(d),
         f:      sendTime,
         arg:    c,
      },
   }
   startTimer(&t.r)
   return t
}

time/tick.go的Ticker数据结构

// A Ticker holds a channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
   C <-chan Time // The channel on which the ticks are delivered.
   r runtimeTimer
}

time/sleep.go的runtimeTimer

// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
   tb uintptr
   i  int
   when   int64
   period int64
   f      func(interface{}, uintptr) // NOTE: must not be closure
   arg    interface{}
   seq    uintptr
}

time/sleep.go的sendTime

func sendTime(c interface{}, seq uintptr) {
   // Non-blocking send of time on c.
   // Used in NewTimer, it cannot block anyway (buffer).
   // Used in NewTicker, dropping sends on the floor is
   // the desired behavior when the reader gets behind,
   // because the sends are periodic.
   select {
   case c.(chan Time) <- Now():
   default:
   }
}

time/sleep.go的startTimer

func startTimer(*runtimeTimer)
func stopTimer(*runtimeTimer) bool
startTimer

看完上面的代码,大家内心是不是能够猜出是怎么实现的?

有一个机制保证时间到了时,sendTime被调用,此时channel会被赋值,调用ticker.C的位置解除阻塞,执行指定的逻辑。

让我们看一下GoLang是不是这样实现的。

追踪代码的时候我们发现在time包里的startTimer,只是一个声明,那真正的实现在哪里?

runtime/time.go的startTimer

此处使用go的隐藏技能go:linkname引导编译器将当前(私有)方法或者变量在编译时链接到指定的位置的方法或者变量。另外timer和runtimeTimer的结构是一致的,所以程序运行正常。

//startTimer将new的timer对象加入timer的堆数据结构中
//startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
   if raceenabled {
      racerelease(unsafe.Pointer(t))
   }
   addtimer(t)
}

runtime/time.go的addtimer

func addtimer(t *timer) {
   tb := t.assignBucket()
   lock(&tb.lock)
   ok := tb.addtimerLocked(t)
   unlock(&tb.lock)
   if !ok {
      badTimer()
   }
}

runtime/time.go的addtimerLocked

// Add a timer to the heap and start or kick timerproc if the new timer is
// earlier than any of the others.
// Timers are locked.
// Returns whether all is well: false if the data structure is corrupt
// due to user-level races.
func (tb *timersBucket) addtimerLocked(t *timer) bool {
   // when must never be negative; otherwise timerproc will overflow
   // during its delta calculation and never expire other runtime timers.
   if t.when < 0 {
      t.when = 1<<63 - 1
   }
   t.i = len(tb.t)
   tb.t = Append(tb.t, t)
   if !siftupTimer(tb.t, t.i) {
      return false
   }
   if t.i == 0 {
      // siftup moved to top: new earliest deadline.
      if tb.sleeping && tb.sleepUntil > t.when {
         tb.sleeping = false
         notewakeup(&tb.waitnote)
      }
      if tb.rescheduling {
         tb.rescheduling = false
         goready(tb.gp, 0)
      }
      if !tb.created {
         tb.created = true
         go timerproc(tb)
      }
   }
   return true
}

runtime/time.go的timerproc

func timerproc(tb *timersBucket) {
    tb.gp = getg()
    for {
        lock(&tb.lock)
        tb.sleeping = false
        now := nanotime()
        delta := int64(-1)
        for {
            if len(tb.t) == 0 { //无timer的情况
                delta = -1
                break
            }
            t := tb.t[0] //拿到堆顶的timer
            delta = t.when - now
            if delta > 0 { // 所有timer的时间都没有到期
                break
            }
            if t.period > 0 { // t[0] 是ticker类型,调整其到期时间并调整timer堆结构
                // leave in heap but adjust next time to fire
                t.when += t.period * (1 + -delta/t.period)
                siftdownTimer(tb.t, 0)
            } else {
                //Timer类型的定时器是单次的,所以这里需要将其从堆里面删除
                // remove from heap
                last := len(tb.t) - 1
                if last > 0 {
                    tb.t[0] = tb.t[last]
                    tb.t[0].i = 0
                }
                tb.t[last] = nil
                tb.t = tb.t[:last]
                if last > 0 {
                    siftdownTimer(tb.t, 0)
                }
                t.i = -1 // mark as removed
            }
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&tb.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq) //sendTimer被调用的位置 ---------------------------------------
            lock(&tb.lock)
        }
        if delta < 0 || faketime > 0 {
            // No timers left - put goroutine to sleep.
            tb.rescheduling = true
            goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
            continue
        }
        // At least one timer pending. Sleep until then.
        tb.sleeping = true
        tb.sleepUntil = now + delta
        noteclear(&tb.waitnote)
        unlock(&tb.lock)
        notetsleepg(&tb.waitnote, delta)
    }
}

追踪了一圈,最终追踪到timerproc,发现了sendTimer被调用位置f(arg, seq) ,而且可以看到将channel c传到了sendTimer中。

上面的这堆代码逻辑是什么意思呢?

  1. 所有timer统一使用一个最小堆结构去维护,按照timer的when(到期时间)比较大小;
  2. for循环过程中,如果delta = t.when - now的时间大于0,则break,直到有到时间的timer才进行操作;
  3. timer处理线程从堆顶开始处理每个timer,对于到期的timer,如果其period>0,则表明该timer 属于Ticker类型,调整其下次到期时间并调整其在堆中的位置,否则从堆中移除该timer;
  4. 调用该timer的处理函数以及其他相关工作;
总结

读完这篇文章,有没有奇怪的知识又增加了一些的感觉。写这些源码的大神们,对Go的理解很深刻,编码的功能也很深厚。

本质上GoLang用channel和堆实现了定时器功能,让我们来mock一下,伪代码如下:

func cronMock() {
   for {
      //从堆中获取时间最近的定时器
      t := getNearestTime()
      //如果时间还没到,则continue
      t.delta > 0 {
         continue
      }else{
         //时间到了,将当前的定时器再加一个钟
         t.when += t.duration
         //将堆重新排序
         siftdownTimer()
         //执行当前定时器指定的函数,即sendTimer
         t.sendTimer()
      }
   }
}
资料
  1. golang进阶(八)——隐藏技能go:linkname https://blog.csdn.net/lastsweetop/article/details/78830772
  2. 从99.9%CPU浅谈Golang的定时器实现原理https://www.jianshu.com/p/c9e8aaa13415
最后

大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)