计时器分 Timer 和 Ticker 两种,它们底层基本是一样的,两差的区别请参考 https://blog.haohtml.com/archives/19859, 这里我们的介绍对象是 Timer 。
计时器结构体// NewTimer creates a new Timer that will send // the current time on its channel after at least duration d. func NewTimer(d Duration) *Timer { c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t }
NewTimer()TimerTimer
注意这里的 runtimeTimer.f 字段是一个函数 sendTime ,其实现如下
func sendTime(c interface{}, seq uintptr) { // Non-blocking send of time on c. // Used in NewTimer, it cannot block anyway (buffer). // Used in NewTicker, dropping sends on the floor is // the desired behavior when the reader gets behind, // because the sends are periodic. select { case c.(chan Time) <- Now(): default: } }
当 sendTime 函数主要用在 newTimer() 时,它以无阻塞的方式将当前时间 Now() 发送到 c 通道里。如果用在 newTicker() 时,如果读取落后,会将发送丢弃,它是周期性的。
我们给出 Timer 的结构体声明。
type Timer struct {
C <-chan Time
r runtimeTimer
}
runtimeTimer
runtimeTimer
// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
pp uintptr
when int64
period int64
f func(interface{}, uintptr) // NOTE: must not be closure
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
runnerTimertimer
结构体字段说明
0timerModifiedXX status
每次开启一个goroutine 执行 f(arg, now),基中when表示执行的时间,而 when+period 表示下次执行的时间。(这时有点疑问,对调用的函数参数,f的第二个参数是 now, 但后面介绍的时候第二个参数却是 seq)
Timerwhenperiodfargseqtime.runtimeTimerruntime.timeraddtimerstartTimer()addtimerdeltimertime.stopTimerfargseqwhenaddtimeraddtimermodtimerresetTimerwhenresetTimeraddtimerdeltimermodtimerresettimercleantimersadjusttimersruntimeraddtimer/deltimer/modtimer/resettimeradjusttimerruntimerPheaptimerpp
通过注释我们大概知道了一些与 timer 相关的信息,其中提到了一些操作函数,在 runtime 里都有其介绍,下面我们分析看看每个函数的实现来加深一下理解。
计时器的几种状态与计时器相关的状态一共有 十种
p.timerswhennextwhenwhennextwhen
p.timersremoved
每种状态相关流转状态
// addtimer:计时器的创建 addtimer
// timerNoStatus -> timerWaiting
// anything else -> panic: invalid value
// deltimer:
// timerWaiting -> timerModifying -> timerDeleted
// timerModifiedEarlier -> timerModifying -> timerDeleted
// timerModifiedLater -> timerModifying -> timerDeleted
// timerNoStatus -> do nothing
// timerDeleted -> do nothing
// timerRemoving -> do nothing
// timerRemoved -> do nothing
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerModifying -> wait until status changes
// modtimer:
// timerWaiting -> timerModifying -> timerModifiedXX
// timerModifiedXX -> timerModifying -> timerModifiedYY
// timerNoStatus -> timerModifying -> timerWaiting
// timerRemoved -> timerModifying -> timerWaiting
// timerDeleted -> timerModifying -> timerModifiedXX
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerRemoving -> wait until status changes
// timerModifying -> wait until status changes
// cleantimers (looks in P's timer heap):
// timerDeleted -> timerRemoving -> timerRemoved
// timerModifiedXX -> timerMoving -> timerWaiting
// adjusttimers (looks in P's timer heap):
// timerDeleted -> timerRemoving -> timerRemoved
// timerModifiedXX -> timerMoving -> timerWaiting
// runtimer (looks in P's timer heap):
// timerNoStatus -> panic: uninitialized timer
// timerWaiting -> timerWaiting or
// timerWaiting -> timerRunning -> timerNoStatus or
// timerWaiting -> timerRunning -> timerWaiting
// timerModifying -> wait until status changes
// timerModifiedXX -> timerMoving -> timerWaiting
// timerDeleted -> timerRemoving -> timerRemoved
// timerRunning -> panic: concurrent runtimer calls
// timerRemoved -> panic: inconsistent timer heap
// timerRemoving -> panic: inconsistent timer heap
// timerMoving -> panic: inconsistent timer heap
startTimeraddtimer
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
此函数会将计时器添加到 timer heap 中。
addtimer
timer
&Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, }
P
/src/runtime/time.go#L245-L278
func addtimer(t *timer) {
// 必要条件检查
if t.when <= 0 {
throw("timer when must be positive")
}
if t.period < 0 {
throw("timer period must be non-negative")
}
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
t.status = timerWaiting
when := t.when
// Disable preemption while using pp to avoid changing another P's heap.
mp := acquirem()
pp := getg().m.p.ptr()
lock(&pp.timersLock)
// 清理 timer heap 中首个定时器,以加快创建(添加)和删除timer的速度,如果保留在堆中速度会慢
cleantimers(pp)
doaddtimer(pp, t)
unlock(&pp.timersLock)
wakeNetPoller(when)
releasem(mp)
}
整体流程
status != timerNoStatustimerNoStatus -> timerWaitingmp := acquirem()mm.locks++paddtimerdoaddtimer()
doaddtimer
这里我们看下添加定时器的主要实现函数 doaddtimer
/src/runtime/time.go#L280-L300
// doaddtimer adds t to the current P's heap.
// The caller must have locked the timers for pp.
func doaddtimer(pp *p, t *timer) {
// Timers rely on the network poller, so make sure the poller
// has started.
if netpollInited == 0 {
netpollGenericInit()
}
if t.pp != 0 {
throw("doaddtimer: P already set in timer")
}
t.pp.set(pp)
// 新添加的 timer 放置到堆的最后一位,然后向前调整
i := len(pp.timers)
pp.timers = append(pp.timers, t)
siftupTimer(pp.timers, i)
if t == pp.timers[0] {
atomic.Store64(&pp.timer0When, uint64(t.when))
}
atomic.Xadd(&pp.numTimers, 1)
}
lock(&pp.timersLock)
实现步骤:
addtimeraddtimert.pp.set(pp)pp.timesp.timest.whenpp.timers[0]pp.timer0Whent.when
可以看到实现中主要使用了四叉小顶堆算法,将新添加的定时器放在合适的位置,同时在 p 中对定时器进行了维护。
清理堆顶 cleantimers
cleantimers 清理定时器队列的头部。 这加快了创建和删除计时器的程序; 将它们留在堆中会减慢 addtimer。
/src/runtime/time.go#L542-L592
// cleantimers cleans up the head of the timer queue. This speeds up
// programs that create and delete timers; leaving them in the heap
// slows down addtimer. Reports whether no timer problems were found.
// The caller must have locked the timers for pp.
func cleantimers(pp *p) {
gp := getg()
for {
if len(pp.timers) == 0 {
return
}
// This loop can theoretically run for a while, and because
// it is holding timersLock it cannot be preempted.
// If someone is trying to preempt us, just return.
// We can clean the timers later.
if gp.preemptStop {
return
}
// 当前定时器所属的 p 不是当前的p,则抛出异常
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("cleantimers: bad p")
}
// 根据定时器的状态分别处理
switch s := atomic.Load(&t.status); s {
case timerDeleted:
// 状态 timerDeleted 变为 timerRemoving
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
// 删除堆的首个元素,从堆中删除
dodeltimer0(pp)
// 再把状态从 timerRemoving 变为 timerRemoved
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
// 更新当前 p 中删除定时器的统计个数
atomic.Xadd(&pp.deletedTimers, -1)
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
// 修改原来计划执行的时间
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
default:
// Head of timers does not need adjustment.
return
}
}
}
t.status
timerDeleted -> timerRemoving -> timerRemovedp.deletedTimers
timerMovingt.when = t.nextwhendodeltimer0t.ppdoaddtimertimerMovingtimerWaiting
t.whent.ppdodeltimer0()timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaitingtimerWaiting
其它状态:不做任何操作
这里用到了 cas 操作,如果一次失败,则通过 for 语句再试,直到修改成功。
删除堆顶元素 dodeltimer0
dodeltimer0()
// dodeltimer0 removes timer 0 from the current P's heap.
// We are locked on the P when this is called.
// It reports whether it saw no problems due to races.
// The caller must have locked the timers for pp.
func dodeltimer0(pp *p) {
if t := pp.timers[0]; t.pp.ptr() != pp {
throw("dodeltimer0: wrong P")
} else {
t.pp = 0
}
last := len(pp.timers) - 1
// 说明至少有两个定时器元素
if last > 0 {
pp.timers[0] = pp.timers[last]
}
// 赋值nil, 以便GC
pp.timers[last] = nil
pp.timers = pp.timers[:last]
if last > 0 {
siftdownTimer(pp.timers, 0)
}
updateTimer0When(pp)
atomic.Xadd(&pp.numTimers, -1)
}
t.pp = 0siftdownTimer()p.timer0Whenwhen0p.numTimers
计时器的停止/删除 deltimer
当我们不再需要定时器的时候,调用 timer.Stop() 方法即可,下面我们看看这一块的实现原理。
// 如果停止计时器成功,则返回true, 如果计时器已经过期或已经停止,则返回false。说明多次调用 Stop() 方法并不会抛出异常。
// Stop() 方法并不会关闭 channel, 以防止从通道读取错误成功。
//
// 为确保调用 Stop() 后, channel是空,请检查返回值并清空channel。
// 例如,假设程序还没有从 t.C 接收到
//
// if !t.Stop() {
// <-t.C
// }
//
// 这不能与来自 Timer channel 的其他接收 或 对 Timer 的 Stop 方法的其他调用 同时完成。
//
// 通过 AfterFunc(d, f) 创建一个Timer, 如果 t.Stop 返回false, 那么 Timer 已经过期且函数f 已经在它自己的 goroutine 启动;
// Stop 并不会等待函数f 执行完成才返回
// 如果调用方需要知道函数f 是否执行完成,它必须与函数f 明确协调才可以。
//
func (t *Timer) Stop() bool {
if t.r.f == nil {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(&t.r)
}
这里的实现主要是调用 stopTimer 来实现的,我们再看下它的实现
// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
return deltimer(t)
}
它只是对 deltimer 函数进行了一次封装,再看下其实现方法
// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerModifiedEarlier:
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
return false
case timerRunning, timerMoving:
osyield()
case timerNoStatus:
return false
case timerModifying:
osyield()
default:
badTimer()
}
}
}
从函数的注释及实现原理得知此函数并不是真正的将计时器从当前P堆里删除,由于计时器可能工作在任何一个P上,所以只能对定时器的状态作删除标记,删除工作由定时器所在的P在合适的时机( dodeltimer 或 dodeltimer0 )进行真正的删除。
addtimert.statustimerDeleted
可以看到 time.Stop() 的实现对应的是 deltimer() 函数,
可以看到 deltimer 函数也是对各种状态做不同的处理。
计时器的修改 modtimerTimerTicker
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
return resetTimer(&t.r, w)
}
Timerd Duration
通过阅读官网对此函数的注释得知以下信息
NewTimerTimert.CStop()
resetTimer
// resettimer resets the time when a timer should fire.
// If used for an inactive timer, the timer will become active.
// This should be called instead of addtimer if the timer value has been,
// or may have been, used previously.
// Reports whether the timer was modified before it was run.
func resettimer(t *timer, when int64) bool {
return modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
modtimer
// modtimer modifies an existing timer.计时器的执行 runtimer
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
if when <= 0 {
throw("timer when must be positive")
}
if period < 0 {
throw("timer period must be non-negative")
}
status := uint32(timerNoStatus)
wasRemoved := false
var pending bool
var mp *m
loop:
for {
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
pending = true // timer not yet run
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
// Timer was already run and t is no longer in a heap.
// Act like addtimer.
if atomic.Cas(&t.status, status, timerModifying) {
wasRemoved = true
pending = false // timer already run or stopped
break loop
}
releasem(mp)
case timerDeleted:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
pending = false // timer already stopped
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerModifying:
// Multiple simultaneous calls to modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}
t.period = period
t.f = f
t.arg = arg
t.seq = seq
if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
// The timer is in some other P's heap, so we can't change
// the when field. If we did, the other P's heap would
// be out of order. So we put the new when value in the
// nextwhen field, and let the other P set the when field
// when it is prepared to resort the heap.
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
tpp := t.pp.ptr()
if newStatus == timerModifiedEarlier {
updateTimerModifiedEarliest(tpp, when)
}
// Set the new status of the timer.
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
releasem(mp)
// If the new status is earlier, wake up the poller.
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
return pending
}
p.timersLock
func runtimer(pp *p, now int64) int64 {
for {
// 堆顶元素
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
// Not ready to run.
return t.when
}
if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
// Note that runOneTimer may temporarily unlock
// pp.timersLock.
// 真正的执行函数
runOneTimer(pp, t, now)
return 0
case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
if len(pp.timers) == 0 {
return -1
}
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
case timerModifying:
// Wait for modification to complete.
osyield()
case timerNoStatus, timerRemoved:
// Should not see a new or inactive timer on the heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// These should only be set when timers are locked,
// and we didn't do it.
badTimer()
default:
badTimer()
}
}
}
p.timerstimers[0]timer[0]p.timers[0]0-1
函数的实现原理是根据当前定时器元素的不同状态分别处理.。
nowt.whentimerRunningrunOneTimer()dodeltimer0()p.timersp.DeletedTimersdeltimertimerDeletedtimerDeletedtimerMovingt.when = t.nextwhendodeltimer0t.ppdoaddtimertimerMovingtimerWaitingt.whent.ppdodeltimer0()timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaiting
runOneTimer
// runOneTimer runs a single timer.计时器的获取 timeSleepUntil
// The caller must have locked the timers for pp.
// This will temporarily unlock the timers while running the timer function.
//go:systemstack
func runOneTimer(pp *p, t *timer, now int64) {
// 函数名
f := t.f
// 函数参数
arg := t.arg
seq := t.seq
if t.period > 0 {
// 调整 t.when 字段
// Leave in heap but adjust next time to fire.
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
// 溢出处理
if t.when < 0 { // check for overflow.
t.when = maxWhen
}
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {
// Remove from heap.
// 彻底从堆中删除
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}
...
// 先解锁,再执行,最后再加上原来的锁
unlock(&pp.timersLock)
f(arg, seq) // 函数调用
lock(&pp.timersLock)
...
}
timeSleepUntil
// timeSleepUntil returns the time when the next timer should fire,
// and the P that holds the timer heap that that timer is on.
// This is only called by sysmon and checkdead.
func timeSleepUntil() (int64, *p) {
// 默认一个系统允许的最大值
next := int64(maxWhen)
var pret *p
// Prevent allp slice changes. This is like retake.
lock(&allpLock)
for _, pp := range allp {
if pp == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
// 如果 timer0When 字段值为0, 则表示当前P没有定时器
w := int64(atomic.Load64(&pp.timer0When))
if w != 0 && w < next {
next = w
pret = pp
}
w = int64(atomic.Load64(&pp.timerModifiedEarliest))
if w != 0 && w < next {
next = w
pret = pp
}
}
unlock(&allpLock)
return next, pret
}
P
allpLockPpp.timer0Whenpp.timerModifiedEarliestpp.timer0Whenpp.timers[0]whensysmon
pp.timers[0]
定时器的管理 checkTimers
检查指定P上面所有timers 是否已准备就绪。
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {}
now0rnow
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// 如果当前p 没有任何定时器或第一次调整计时器,则什么也不做
next := int64(atomic.Load64(&pp.timer0When))
nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
if next == 0 || (nextAdj != 0 && nextAdj < next) {
next = nextAdj
}
if next == 0 {
// 直接返回
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next {
// Next timer is not ready to run, but keep going
// if we would clear deleted timers.
// This corresponds to the condition below where
// we decide whether to call clearDeletedTimers.
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
lock(&pp.timersLock)
// 调整所有定时器
if len(pp.timers) > 0 {
adjusttimers(pp, now)
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
// 如果当前所检查的P正是当前使用的P,则调用 clearDeleteTimers 函数清理掉所有被标记为删除的timer,将其从P的heap中删除,以减少对 timersLock 锁的使用。
// 再次说明了每个P只能清理自己的timer
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return now, pollUntil, ran
}
调整计时器 adjusttimers
在当前 P 的堆中查找任何已修改为更早运行的定时器,并将它们放在堆中的正确位置。
在查找这些计时器时,它还会移动已修改为稍后运行的计时器,并删除已删除的计时器。 调用者必须锁定 pp 的计时器。
/src/runtime/time.go#L653
func adjusttimers(pp *p, now int64) {
first := atomic.Load64(&pp.timerModifiedEarliest)
if first == 0 || int64(first) > now {
if verifyTimers {
verifyTimerHeap(pp)
}
return
}
// We are going to clear all timerModifiedEarlier timers.
atomic.Store64(&pp.timerModifiedEarliest, 0)
var moved []*timer
for i := 0; i < len(pp.timers); i++ {
t := pp.timers[i]
if t.pp.ptr() != pp {
throw("adjusttimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
if atomic.Cas(&t.status, s, timerRemoving) {
changed := dodeltimer(pp, i)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
// Now we can change the when field.
t.when = t.nextwhen
// Take t off the heap, and hold onto it.
// We don't add it back yet because the
// heap manipulation could cause our
// loop to skip some other timer.
changed := dodeltimer(pp, i)
moved = append(moved, t)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
badTimer()
case timerWaiting:
// OK, nothing to do.
case timerModifying:
// Check again after modification is complete.
osyield()
i--
default:
badTimer()
}
}
if len(moved) > 0 {
addAdjustedTimers(pp, moved)
}
if verifyTimers {
verifyTimerHeap(pp)
}
}
遍历 p.timers 字段,根据每个计时器状态分别处理
timerDeletedtimerRemovingtimerRemovedp.deletedTimerstimerMovingt.when=t.nextwhenmoved
清理计时器 clearDeletedTimers
此函数是除了STW期间运行 moveTimers 之外的唯一一个遍历整个timer heap 的函数。此函数的执行一定要持有 timerLock 锁
func clearDeletedTimers(pp *p) {
// 清理掉P中的 timerModifiedEarlier 计时器
// Do this now in case new ones show up while we are looping.
atomic.Store64(&pp.timerModifiedEarliest, 0)
// 删除的数量
cdel := int32(0)
// 切片索引位置,首个要删除的索引位置
to := 0
changedHeap := false
timers := pp.timers
// 遍历当前P所有的计时器,针对不同状态作相应的处理,最张将要清除的定时器移到最后面
nextTimer:
for _, t := range timers {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if changedHeap {
timers[to] = t
siftupTimer(timers, to)
}
to++
continue nextTimer
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
t.when = t.nextwhen
timers[to] = t
siftupTimer(timers, to)
to++
changedHeap = true
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
continue nextTimer
}
case timerDeleted:
// 真正的删除操作
if atomic.Cas(&t.status, s, timerRemoving) {
t.pp = 0
cdel++
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
changedHeap = true
continue nextTimer
}
case timerModifying:
// Loop until modification complete.
osyield()
case timerNoStatus, timerRemoved:
// We should not see these status values in a timer heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// Some other P thinks it owns this timer,
// which should not happen.
badTimer()
default:
badTimer()
}
}
}
// 清理定时器,以便GC回收,同时更新统计数据
for i := to; i < len(timers); i++ {
timers[i] = nil
}
// 更新统计数据
atomic.Xadd(&pp.deletedTimers, -cdel)
atomic.Xadd(&pp.numTimers, -cdel)
// 更新 p.timers 字段
timers = timers[:to]
pp.timers = timers
updateTimer0When(pp)
// 检查堆的状态
if verifyTimers {
verifyTimerHeap(pp)
}
}
nilp.deleteTimersp.numTimersupdateTimer0When
移除计时器 dodeltimer
我们上面已介绍过另一个移除计时器的函数 dodeltimer0() ,它实现移除堆顶计时器。这里我们介绍的是另一个相同功能的函数,可以实现自定义移除指定索引位置的元素。
i
// dodeltimer removes timer i from the current P's heap.
// We are locked on the P when this is called.
// It returns the smallest changed index in pp.timers.
// The caller must have locked the timers for pp.
func dodeltimer(pp *p, i int) int {
if t := pp.timers[i]; t.pp.ptr() != pp {
throw("dodeltimer: wrong P")
} else {
// 解除计时器与 P 的绑定关系
t.pp = 0
}
last := len(pp.timers) - 1
if i != last {
pp.timers[i] = pp.timers[last]
}
pp.timers[last] = nil
pp.timers = pp.timers[:last]
smallestChanged := i
if i != last {
// Moving to i may have moved the last timer to a new parent,
// so sift up to preserve the heap guarantee.
// 重新调整位置
smallestChanged = siftupTimer(pp.timers, i)
siftdownTimer(pp.timers, i)
}
if i == 0 {
// 删除的是当前仅有的一个timer,同重置 time0When 字段
updateTimer0When(pp)
}
atomic.Xadd(&pp.numTimers, -1)
return smallestChanged
}
操作流程
nilsiftupTimersiftdownTimer