计时器分 Timer 和 Ticker 两种,它们底层基本是一样的,两差的区别请参考 https://blog.haohtml.com/archives/19859, 这里我们的介绍对象是 Timer 。

golang timer
计时器结构体
 // NewTimer creates a new Timer that will send
 // the current time on its channel after at least duration d.
 func NewTimer(d Duration) *Timer {
     c := make(chan Time, 1)
     t := &Timer{
         C: c,
         r: runtimeTimer{
             when: when(d),
             f:    sendTime,
             arg:  c,
         },
     }
     startTimer(&t.r)
     return t
 }
NewTimer()TimerTimer

注意这里的 runtimeTimer.f 字段是一个函数 sendTime ,其实现如下

func sendTime(c interface{}, seq uintptr) {
	// Non-blocking send of time on c.
	// Used in NewTimer, it cannot block anyway (buffer).
	// Used in NewTicker, dropping sends on the floor is
	// the desired behavior when the reader gets behind,
	// because the sends are periodic.
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

当 sendTime 函数主要用在 newTimer() 时,它以无阻塞的方式将当前时间 Now() 发送到 c 通道里。如果用在 newTicker() 时,如果读取落后,会将发送丢弃,它是周期性的。

我们给出 Timer 的结构体声明。

 type Timer struct {
  C <-chan Time
  r runtimeTimer
 }
runtimeTimer
runtimeTimer
 // Interface to timers implemented in package runtime.
 // Must be in sync with ../runtime/time.go:/^type timer
 type runtimeTimer struct {
  pp       uintptr
  when     int64
  period   int64
  f        func(interface{}, uintptr) // NOTE: must not be closure
  arg      interface{}
  seq      uintptr
  nextwhen int64
  status   uint32
 }
runnerTimertimer

结构体字段说明

0timerModifiedXX status

每次开启一个goroutine 执行 f(arg, now),基中when表示执行的时间,而 when+period 表示下次执行的时间。(这时有点疑问,对调用的函数参数,f的第二个参数是 now, 但后面介绍的时候第二个参数却是 seq)

Timerwhenperiodfargseqtime.runtimeTimerruntime.timeraddtimerstartTimer()addtimerdeltimertime.stopTimerfargseqwhenaddtimeraddtimermodtimerresetTimerwhenresetTimeraddtimerdeltimermodtimerresettimercleantimersadjusttimersruntimeraddtimer/deltimer/modtimer/resettimeradjusttimerruntimerPheaptimerpp

通过注释我们大概知道了一些与 timer 相关的信息,其中提到了一些操作函数,在 runtime 里都有其介绍,下面我们分析看看每个函数的实现来加深一下理解。

计时器的几种状态

与计时器相关的状态一共有 十种

p.timerswhennextwhenwhennextwhen
p.timersremoved

每种状态相关流转状态

 // addtimer:
 //   timerNoStatus   -> timerWaiting
 //   anything else   -> panic: invalid value
 // deltimer:
 //   timerWaiting         -> timerModifying -> timerDeleted
 //   timerModifiedEarlier -> timerModifying -> timerDeleted
 //   timerModifiedLater   -> timerModifying -> timerDeleted
 //   timerNoStatus       -> do nothing
 //   timerDeleted         -> do nothing
 //   timerRemoving       -> do nothing
 //   timerRemoved         -> do nothing
 //   timerRunning         -> wait until status changes
 //   timerMoving         -> wait until status changes
 //   timerModifying       -> wait until status changes
 // modtimer:
 //   timerWaiting   -> timerModifying -> timerModifiedXX
 //   timerModifiedXX -> timerModifying -> timerModifiedYY
 //   timerNoStatus   -> timerModifying -> timerWaiting
 //   timerRemoved   -> timerModifying -> timerWaiting
 //   timerDeleted   -> timerModifying -> timerModifiedXX
 //   timerRunning   -> wait until status changes
 //   timerMoving     -> wait until status changes
 //   timerRemoving   -> wait until status changes
 //   timerModifying -> wait until status changes
 // cleantimers (looks in P's timer heap):
 //   timerDeleted   -> timerRemoving -> timerRemoved
 //   timerModifiedXX -> timerMoving -> timerWaiting
 // adjusttimers (looks in P's timer heap):
 //   timerDeleted   -> timerRemoving -> timerRemoved
 //   timerModifiedXX -> timerMoving -> timerWaiting
 // runtimer (looks in P's timer heap):
 //   timerNoStatus   -> panic: uninitialized timer
 //   timerWaiting   -> timerWaiting or
 //   timerWaiting   -> timerRunning -> timerNoStatus or
 //   timerWaiting   -> timerRunning -> timerWaiting
 //   timerModifying -> wait until status changes
 //   timerModifiedXX -> timerMoving -> timerWaiting
 //   timerDeleted   -> timerRemoving -> timerRemoved
 //   timerRunning   -> panic: concurrent runtimer calls
 //   timerRemoved   -> panic: inconsistent timer heap
 //   timerRemoving   -> panic: inconsistent timer heap
 //   timerMoving     -> panic: inconsistent timer heap
计时器的创建 addtimer
startTimeraddtimer
 // startTimer adds t to the timer heap.
 //go:linkname startTimer time.startTimer
 func startTimer(t *timer) {
  if raceenabled {
  racerelease(unsafe.Pointer(t))
  }
  addtimer(t)
 }

此函数会将计时器添加到 timer heap 中。

addtimer

timer
&Timer{
         C: c,
         r: runtimeTimer{
             when: when(d),
             f:    sendTime,
             arg:  c,
         },
     }
P
 /src/runtime/time.go#L245-L278
 func addtimer(t *timer) {
  // 必要条件检查
  if t.when <= 0 {
  throw("timer when must be positive")
  }
  if t.period < 0 {
  throw("timer period must be non-negative")
  }
  if t.status != timerNoStatus {
  throw("addtimer called with initialized timer")
  }
  t.status = timerWaiting
 ​
  when := t.when
 ​
  // Disable preemption while using pp to avoid changing another P's heap.
  mp := acquirem()
 ​
  pp := getg().m.p.ptr()
  lock(&pp.timersLock)
 
  // 清理 timer heap 中首个定时器,以加快创建(添加)和删除timer的速度,如果保留在堆中速度会慢
  cleantimers(pp)
  doaddtimer(pp, t)
  unlock(&pp.timersLock)
 ​
  wakeNetPoller(when)
 ​
  releasem(mp)
 }

整体流程

status != timerNoStatustimerNoStatus -> timerWaitingmp := acquirem()mm.locks++paddtimerdoaddtimer()

doaddtimer

这里我们看下添加定时器的主要实现函数 doaddtimer

 /src/runtime/time.go#L280-L300
 // doaddtimer adds t to the current P's heap.
 // The caller must have locked the timers for pp.
 func doaddtimer(pp *p, t *timer) {
  // Timers rely on the network poller, so make sure the poller
  // has started.
  if netpollInited == 0 {
  netpollGenericInit()
  }
 ​
  if t.pp != 0 {
  throw("doaddtimer: P already set in timer")
  }
  t.pp.set(pp)
 
  // 新添加的 timer 放置到堆的最后一位,然后向前调整
  i := len(pp.timers)
  pp.timers = append(pp.timers, t)
  siftupTimer(pp.timers, i)
 
  if t == pp.timers[0] {
  atomic.Store64(&pp.timer0When, uint64(t.when))
  }
  atomic.Xadd(&pp.numTimers, 1)
 }
lock(&pp.timersLock)

实现步骤:

addtimeraddtimert.pp.set(pp)pp.timesp.timest.whenpp.timers[0]pp.timer0Whent.when
heap

可以看到实现中主要使用了四叉小顶堆算法,将新添加的定时器放在合适的位置,同时在 p 中对定时器进行了维护。

清理堆顶 cleantimers

cleantimers 清理定时器队列的头部。 这加快了创建和删除计时器的程序; 将它们留在堆中会减慢 addtimer。

 /src/runtime/time.go#L542-L592
 // cleantimers cleans up the head of the timer queue. This speeds up
 // programs that create and delete timers; leaving them in the heap
 // slows down addtimer. Reports whether no timer problems were found.
 // The caller must have locked the timers for pp.
 func cleantimers(pp *p) {
  gp := getg()
  for {
  if len(pp.timers) == 0 {
  return
  }
 ​
  // This loop can theoretically run for a while, and because
  // it is holding timersLock it cannot be preempted.
  // If someone is trying to preempt us, just return.
  // We can clean the timers later.
  if gp.preemptStop {
  return
  }
 ​
  // 当前定时器所属的 p 不是当前的p,则抛出异常
  t := pp.timers[0]
  if t.pp.ptr() != pp {
  throw("cleantimers: bad p")
  }
 
  // 根据定时器的状态分别处理
  switch s := atomic.Load(&t.status); s {
  case timerDeleted:
  // 状态 timerDeleted 变为 timerRemoving
  if !atomic.Cas(&t.status, s, timerRemoving) {
  continue
  }
 
  // 删除堆的首个元素,从堆中删除
  dodeltimer0(pp)
 
  // 再把状态从 timerRemoving 变为 timerRemoved
  if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
  badTimer()
  }
 
  // 更新当前 p 中删除定时器的统计个数
  atomic.Xadd(&pp.deletedTimers, -1)
 
  case timerModifiedEarlier, timerModifiedLater:
  if !atomic.Cas(&t.status, s, timerMoving) {
  continue
  }
 
  // 修改原来计划执行的时间
  t.when = t.nextwhen
 ​
  dodeltimer0(pp)
  doaddtimer(pp, t)
  if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
  badTimer()
  }
 
  default:
  // Head of timers does not need adjustment.
  return
  }
  }
 }
t.status
timerDeleted -> timerRemoving -> timerRemovedp.deletedTimers
timerMovingt.when = t.nextwhendodeltimer0t.ppdoaddtimertimerMovingtimerWaiting
t.whent.ppdodeltimer0()timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaitingtimerWaiting

其它状态:不做任何操作

这里用到了 cas 操作,如果一次失败,则通过 for 语句再试,直到修改成功。

删除堆顶元素 dodeltimer0

dodeltimer0()
// dodeltimer0 removes timer 0 from the current P's heap.
// We are locked on the P when this is called.
// It reports whether it saw no problems due to races.
// The caller must have locked the timers for pp.
func dodeltimer0(pp *p) {
if t := pp.timers[0]; t.pp.ptr() != pp {
throw("dodeltimer0: wrong P")
} else {
t.pp = 0
}
last := len(pp.timers) - 1

// 说明至少有两个定时器元素
if last > 0 {
pp.timers[0] = pp.timers[last]
}

// 赋值nil, 以便GC
pp.timers[last] = nil
pp.timers = pp.timers[:last]
if last > 0 {
siftdownTimer(pp.timers, 0)
}
updateTimer0When(pp)
atomic.Xadd(&pp.numTimers, -1)
}
t.pp = 0siftdownTimer()p.timer0Whenwhen0p.numTimers
计时器的停止/删除 deltimer

当我们不再需要定时器的时候,调用 timer.Stop() 方法即可,下面我们看看这一块的实现原理。

// 如果停止计时器成功,则返回true, 如果计时器已经过期或已经停止,则返回false。说明多次调用 Stop() 方法并不会抛出异常。
// Stop() 方法并不会关闭 channel, 以防止从通道读取错误成功。
//
// 为确保调用 Stop() 后, channel是空,请检查返回值并清空channel。
// 例如,假设程序还没有从 t.C 接收到
//
// if !t.Stop() {
// <-t.C
// }
//
// 这不能与来自 Timer channel 的其他接收 或 对 Timer 的 Stop 方法的其他调用 同时完成。
//
// 通过 AfterFunc(d, f) 创建一个Timer, 如果 t.Stop 返回false, 那么 Timer 已经过期且函数f 已经在它自己的 goroutine 启动;
// Stop 并不会等待函数f 执行完成才返回
// 如果调用方需要知道函数f 是否执行完成,它必须与函数f 明确协调才可以。
//
func (t *Timer) Stop() bool {
if t.r.f == nil {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(&t.r)
}

这里的实现主要是调用 stopTimer 来实现的,我们再看下它的实现

// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
return deltimer(t)
}

它只是对 deltimer 函数进行了一次封装,再看下其实现方法

// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:

mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {

tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)

return true
} else {
releasem(mp)
}
case timerModifiedEarlier:

mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {

tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)

return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
return false
case timerRunning, timerMoving:

osyield()
case timerNoStatus:

return false
case timerModifying:

osyield()
default:
badTimer()
}
}
}

从函数的注释及实现原理得知此函数并不是真正的将计时器从当前P堆里删除,由于计时器可能工作在任何一个P上,所以只能对定时器的状态作删除标记,删除工作由定时器所在的P在合适的时机( dodeltimerdodeltimer0 )进行真正的删除。

addtimert.statustimerDeleted

可以看到 time.Stop() 的实现对应的是 deltimer() 函数,

可以看到 deltimer 函数也是对各种状态做不同的处理。

计时器的修改 modtimer
TimerTicker
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
return resetTimer(&t.r, w)
}
Timerd Duration

通过阅读官网对此函数的注释得知以下信息

NewTimerTimert.CStop()
resetTimer
// resettimer resets the time when a timer should fire.
// If used for an inactive timer, the timer will become active.
// This should be called instead of addtimer if the timer value has been,
// or may have been, used previously.
// Reports whether the timer was modified before it was run.
func resettimer(t *timer, when int64) bool {
return modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
modtimer
// modtimer modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
if when <= 0 {
throw("timer when must be positive")
}
if period < 0 {
throw("timer period must be non-negative")
}

status := uint32(timerNoStatus)
wasRemoved := false
var pending bool
var mp *m
loop:
for {
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
pending = true // timer not yet run
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()

// Timer was already run and t is no longer in a heap.
// Act like addtimer.
if atomic.Cas(&t.status, status, timerModifying) {
wasRemoved = true
pending = false // timer already run or stopped
break loop
}
releasem(mp)
case timerDeleted:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
pending = false // timer already stopped
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerModifying:
// Multiple simultaneous calls to modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}

t.period = period
t.f = f
t.arg = arg
t.seq = seq

if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
// The timer is in some other P's heap, so we can't change
// the when field. If we did, the other P's heap would
// be out of order. So we put the new when value in the
// nextwhen field, and let the other P set the when field
// when it is prepared to resort the heap.
t.nextwhen = when

newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}

tpp := t.pp.ptr()

if newStatus == timerModifiedEarlier {
updateTimerModifiedEarliest(tpp, when)
}

// Set the new status of the timer.
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
releasem(mp)

// If the new status is earlier, wake up the poller.
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}

return pending
}
计时器的执行 runtimer
p.timersLock
func runtimer(pp *p, now int64) int64 {
for {
// 堆顶元素
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
// Not ready to run.
return t.when
}

if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
// Note that runOneTimer may temporarily unlock
// pp.timersLock.

// 真正的执行函数
runOneTimer(pp, t, now)
return 0

case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
if len(pp.timers) == 0 {
return -1
}

case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}

case timerModifying:
// Wait for modification to complete.
osyield()

case timerNoStatus, timerRemoved:
// Should not see a new or inactive timer on the heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// These should only be set when timers are locked,
// and we didn't do it.
badTimer()
default:
badTimer()
}
}
}
p.timerstimers[0]timer[0]p.timers[0]0-1

函数的实现原理是根据当前定时器元素的不同状态分别处理.。

nowt.whentimerRunningrunOneTimer()dodeltimer0()p.timersp.DeletedTimersdeltimertimerDeletedtimerDeletedtimerMovingt.when = t.nextwhendodeltimer0t.ppdoaddtimertimerMovingtimerWaitingt.whent.ppdodeltimer0()timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaiting
runOneTimer
// runOneTimer runs a single timer.
// The caller must have locked the timers for pp.
// This will temporarily unlock the timers while running the timer function.
//go:systemstack
func runOneTimer(pp *p, t *timer, now int64) {
// 函数名
f := t.f
// 函数参数
arg := t.arg
seq := t.seq

if t.period > 0 {
// 调整 t.when 字段
// Leave in heap but adjust next time to fire.
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)

// 溢出处理
if t.when < 0 { // check for overflow.
t.when = maxWhen
}
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {
// Remove from heap.
// 彻底从堆中删除
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}

...

// 先解锁,再执行,最后再加上原来的锁
unlock(&pp.timersLock)
f(arg, seq) // 函数调用
lock(&pp.timersLock)

...
}

计时器的获取 timeSleepUntil
timeSleepUntil
// timeSleepUntil returns the time when the next timer should fire,
// and the P that holds the timer heap that that timer is on.
// This is only called by sysmon and checkdead.
func timeSleepUntil() (int64, *p) {
// 默认一个系统允许的最大值
next := int64(maxWhen)
var pret *p

// Prevent allp slice changes. This is like retake.
lock(&allpLock)
for _, pp := range allp {
if pp == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}

// 如果 timer0When 字段值为0, 则表示当前P没有定时器
w := int64(atomic.Load64(&pp.timer0When))
if w != 0 && w < next {
next = w
pret = pp
}

w = int64(atomic.Load64(&pp.timerModifiedEarliest))
if w != 0 && w < next {
next = w
pret = pp
}
}
unlock(&allpLock)

return next, pret
}
P
allpLockPpp.timer0Whenpp.timerModifiedEarliestpp.timer0Whenpp.timers[0]whensysmon
pp.timers[0]
定时器的管理 checkTimers

检查指定P上面所有timers 是否已准备就绪。

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {}
now0rnow
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// 如果当前p 没有任何定时器或第一次调整计时器,则什么也不做
next := int64(atomic.Load64(&pp.timer0When))
nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
if next == 0 || (nextAdj != 0 && nextAdj < next) {
next = nextAdj
}

if next == 0 {
// 直接返回
return now, 0, false
}

if now == 0 {
now = nanotime()
}
if now < next {
// Next timer is not ready to run, but keep going
// if we would clear deleted timers.
// This corresponds to the condition below where
// we decide whether to call clearDeletedTimers.
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}

lock(&pp.timersLock)

// 调整所有定时器
if len(pp.timers) > 0 {
adjusttimers(pp, now)
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}

// 如果当前所检查的P正是当前使用的P,则调用 clearDeleteTimers 函数清理掉所有被标记为删除的timer,将其从P的heap中删除,以减少对 timersLock 锁的使用。
// 再次说明了每个P只能清理自己的timer
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}

unlock(&pp.timersLock)

return now, pollUntil, ran
}

调整计时器 adjusttimers

在当前 P 的堆中查找任何已修改为更早运行的定时器,并将它们放在堆中的正确位置。

在查找这些计时器时,它还会移动已修改为稍后运行的计时器,并删除已删除的计时器。 调用者必须锁定 pp 的计时器。

/src/runtime/time.go#L653
func adjusttimers(pp *p, now int64) {
first := atomic.Load64(&pp.timerModifiedEarliest)
if first == 0 || int64(first) > now {
if verifyTimers {
verifyTimerHeap(pp)
}
return
}

// We are going to clear all timerModifiedEarlier timers.
atomic.Store64(&pp.timerModifiedEarliest, 0)

var moved []*timer
for i := 0; i < len(pp.timers); i++ {
t := pp.timers[i]
if t.pp.ptr() != pp {
throw("adjusttimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
if atomic.Cas(&t.status, s, timerRemoving) {
changed := dodeltimer(pp, i)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
// Now we can change the when field.
t.when = t.nextwhen
// Take t off the heap, and hold onto it.
// We don't add it back yet because the
// heap manipulation could cause our
// loop to skip some other timer.
changed := dodeltimer(pp, i)
moved = append(moved, t)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
badTimer()
case timerWaiting:
// OK, nothing to do.
case timerModifying:
// Check again after modification is complete.
osyield()
i--
default:
badTimer()
}
}

if len(moved) > 0 {
addAdjustedTimers(pp, moved)
}

if verifyTimers {
verifyTimerHeap(pp)
}
}

遍历 p.timers 字段,根据每个计时器状态分别处理

timerDeletedtimerRemovingtimerRemovedp.deletedTimerstimerMovingt.when=t.nextwhenmoved

清理计时器 clearDeletedTimers

此函数是除了STW期间运行 moveTimers 之外的唯一一个遍历整个timer heap 的函数。此函数的执行一定要持有 timerLock 锁

func clearDeletedTimers(pp *p) {
// 清理掉P中的 timerModifiedEarlier 计时器
// Do this now in case new ones show up while we are looping.
atomic.Store64(&pp.timerModifiedEarliest, 0)

// 删除的数量
cdel := int32(0)
// 切片索引位置,首个要删除的索引位置
to := 0
changedHeap := false
timers := pp.timers

// 遍历当前P所有的计时器,针对不同状态作相应的处理,最张将要清除的定时器移到最后面
nextTimer:
for _, t := range timers {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if changedHeap {
timers[to] = t
siftupTimer(timers, to)
}
to++
continue nextTimer
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
t.when = t.nextwhen
timers[to] = t
siftupTimer(timers, to)
to++
changedHeap = true
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
continue nextTimer
}
case timerDeleted:
// 真正的删除操作
if atomic.Cas(&t.status, s, timerRemoving) {
t.pp = 0
cdel++
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
changedHeap = true
continue nextTimer
}
case timerModifying:
// Loop until modification complete.
osyield()
case timerNoStatus, timerRemoved:
// We should not see these status values in a timer heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// Some other P thinks it owns this timer,
// which should not happen.
badTimer()
default:
badTimer()
}
}
}


// 清理定时器,以便GC回收,同时更新统计数据
for i := to; i < len(timers); i++ {
timers[i] = nil
}

// 更新统计数据
atomic.Xadd(&pp.deletedTimers, -cdel)
atomic.Xadd(&pp.numTimers, -cdel)

// 更新 p.timers 字段
timers = timers[:to]
pp.timers = timers
updateTimer0When(pp)

// 检查堆的状态
if verifyTimers {
verifyTimerHeap(pp)
}
}
nilp.deleteTimersp.numTimersupdateTimer0When

移除计时器 dodeltimer

我们上面已介绍过另一个移除计时器的函数 dodeltimer0() ,它实现移除堆顶计时器。这里我们介绍的是另一个相同功能的函数,可以实现自定义移除指定索引位置的元素。

i
// dodeltimer removes timer i from the current P's heap.
// We are locked on the P when this is called.
// It returns the smallest changed index in pp.timers.
// The caller must have locked the timers for pp.
func dodeltimer(pp *p, i int) int {
if t := pp.timers[i]; t.pp.ptr() != pp {
throw("dodeltimer: wrong P")
} else {
// 解除计时器与 P 的绑定关系
t.pp = 0
}
last := len(pp.timers) - 1
if i != last {
pp.timers[i] = pp.timers[last]
}
pp.timers[last] = nil
pp.timers = pp.timers[:last]
smallestChanged := i
if i != last {
// Moving to i may have moved the last timer to a new parent,
// so sift up to preserve the heap guarantee.
// 重新调整位置
smallestChanged = siftupTimer(pp.timers, i)
siftdownTimer(pp.timers, i)
}
if i == 0 {
// 删除的是当前仅有的一个timer,同重置 time0When 字段
updateTimer0When(pp)
}
atomic.Xadd(&pp.numTimers, -1)
return smallestChanged
}

操作流程

nilsiftupTimersiftdownTimer