一文看懂Golang 定时器源码

计时器分 Timer 和 Ticker 两种,它们底层基本是一样的,两差的区别请参考 https://blog.haohtml.com/archives/19859, 这里我们的介绍对象是 Timer 。

golang timer

计时器结构体

https://github.com/golang/go/blob/go1.17.6/src/time/sleep.go#L84-L98

 // NewTimer creates a new Timer that will send
 // the current time on its channel after at least duration d.
 func NewTimer(d Duration) *Timer {
     c := make(chan Time, 1)
     t := &Timer{
         C: c,
         r: runtimeTimer{
             when: when(d),
             f:    sendTime,
             arg:  c,
         },
     }
     startTimer(&t.r)
     return t
 }

通过调用 NewTimer() 函数创建一个 Timer,首先创建一个长度为1的有缓冲channel,再创建一个Timer的结构体,并将 channel 置于 Timer 结构体内。

注意这里的 runtimeTimer.f 字段是一个函数 sendTime ,其实现如下

func sendTime(c interface{}, seq uintptr) {
	// Non-blocking send of time on c.
	// Used in NewTimer, it cannot block anyway (buffer).
	// Used in NewTicker, dropping sends on the floor is
	// the desired behavior when the reader gets behind,
	// because the sends are periodic.
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

当 sendTime 函数主要用在 newTimer() 时,它以无阻塞的方式将当前时间 Now() 发送到 c 通道里。如果用在 newTicker() 时,如果读取落后,会将发送丢弃,它是周期性的。

我们给出 Timer 的结构体声明。

 type Timer struct {
  C <-chan Time
  r runtimeTimer
 }

一共两个字段,为了理解方面我们称 runtimeTimer 为 timer 值。

我们再看一下其中的 runtimeTimer 结构体的声明

 // Interface to timers implemented in package runtime.
 // Must be in sync with ../runtime/time.go:/^type timer
 type runtimeTimer struct {
  pp       uintptr
  when     int64
  period   int64
  f        func(interface{}, uintptr) // NOTE: must not be closure
  arg      interface{}
  seq      uintptr
  nextwhen int64
  status   uint32
 }

对于 runnerTimer结构体要与在 runtime/time.go 文件中的 timer 结构体保持同步。

结构体字段说明

  • pp 指针类型,这里指 GPM 中的 P。如果这个计时器 timer 在一个heap 上,它在哪个 P 的堆上
  • when 表示唤醒执行的时间,表示什么时间开始执行
  • period 周期,一定是大于 0; when+period 表示下次唤醒执行的时间
  • f 执行函数,不允许为匿名函数,最好为非阻塞函数
  • arg 上面f函数的参数
  • seq 同 arg,其在 runOneTimer 函数中的调用方式为 f(arg, seq)
  • nextwhen 下次运行的时间,其值只有在 timerModifiedXX status 状态下才设置
  • status 状态,其定义的的可用值有10种,定义在 runtime/time.go,我们下面对这些状态进行了介绍。

每次开启一个goroutine 执行 f(arg, now),基中when表示执行的时间,而 when+period 表示下次执行的时间。(这时有点疑问,对调用的函数参数,f的第二个参数是 now, 但后面介绍的时候第二个参数却是 seq)

通过查看 https://github.com/golang/go/blob/go1.17.6/src/runtime/time.go#L41-L116 可知以下几点:

  1. 在新创建一个 Timer 的时候,可以设置的只有 whenperiodfargseq 这五个字段。
  2. 创建完一个 Timer 后,对于 Timer 的值( time.runtimeTimerruntime.timer ),只能通过 addtimer 传递( 调用 startTimer() 函数),以后不能修改任何字段。
  3. 一个活动计时器(一个已经传递给 addtimer )可能会被传递给 deltimer (time.stopTimer),之后它就不再是一个活动计时器了,称其为非活动计时器。 而在一个非活动周期内,f, arg, 和 seq 三个字段可以被修改,但 when 字段不会被修改。当删除一个非活动计时器,GC会将其自动回收,只有新分配的计时器值可以传递给 addtimer,而不能将非活动计时器传递给 addtimer
  4. 可以将一个活动计时器传递给 modtimer ,它不会修改任何内容,其仍是一个活动计时器。
  5. 一个非活动计时器可以传递给 resetTimer , 通过修改 when 字段,让其变成活动计时器。新创建的定时器值传递给 resetTimer 也是可以的(上面说过也可以传递给 addtimer)。
  6. 计时器的操作函数有 addtimer, deltimer, modtimer, resettimer,cleantimers, adjusttimers, 和 runtimer
  7. 我们不允许同时调用 addtimer/deltimer/modtimer/resettimer,但 adjusttimerruntimer可以与任何一个同时调用。
  8. 活动计时器存在于 Pheap 中, 即 timer 结构体中的 pp 字段,非活动计时器也暂时存储在这里的,不过是临时的,直到被移走。
  9. 计时器状态的流转关系,这里不一一列出,可以直接通过查看注释。

通过注释我们大概知道了一些与 timer 相关的信息,其中提到了一些操作函数,在 runtime 里都有其介绍,下面我们分析看看每个函数的实现来加深一下理解。

计时器的几种状态

与计时器相关的状态一共有 十种

  • timerNoStatus 未知状态。只有通过 NewTimer 函数创建的才处理这种状态
  • timerWaiting 等待计时器启动,timer 处于堆中
  • timerRunning 正在运行,此状态比较短暂
  • timerDeleted 已删除,等待被从堆 p.timers 中 Remove 移除
  • timerRemoving 正在移除,短暂状态,从timerDeleted 到 timerRemoved 中的中间过滤状态
  • timerRemoved 已被移除,当前已不在堆中
  • timerModifying 修改中状态,短暂状态
  • ttimerModifiedEarlier 被修改为更早的状态,新的 when 值在 nextwhen 字段中,表示下次立即执行,在堆中或者错误的位置
  • timerModifiedLater 被修改为相同或更晚的状态,新的 when 值在 nextwhen 字段中,表示下次立即执行,在堆中或者错误的位置
  • timerMoving 已修改并在移动,短暂状态

活动计时器们于P的 heap 中,即 p.timers 字段;非活动的计时器也可能短暂的存在heap里,直到它们被 removed ;

每种状态相关流转状态

 // addtimer:
 //   timerNoStatus   -> timerWaiting
 //   anything else   -> panic: invalid value
 // deltimer:
 //   timerWaiting         -> timerModifying -> timerDeleted
 //   timerModifiedEarlier -> timerModifying -> timerDeleted
 //   timerModifiedLater   -> timerModifying -> timerDeleted
 //   timerNoStatus       -> do nothing
 //   timerDeleted         -> do nothing
 //   timerRemoving       -> do nothing
 //   timerRemoved         -> do nothing
 //   timerRunning         -> wait until status changes
 //   timerMoving         -> wait until status changes
 //   timerModifying       -> wait until status changes
 // modtimer:
 //   timerWaiting   -> timerModifying -> timerModifiedXX
 //   timerModifiedXX -> timerModifying -> timerModifiedYY
 //   timerNoStatus   -> timerModifying -> timerWaiting
 //   timerRemoved   -> timerModifying -> timerWaiting
 //   timerDeleted   -> timerModifying -> timerModifiedXX
 //   timerRunning   -> wait until status changes
 //   timerMoving     -> wait until status changes
 //   timerRemoving   -> wait until status changes
 //   timerModifying -> wait until status changes
 // cleantimers (looks in P's timer heap):
 //   timerDeleted   -> timerRemoving -> timerRemoved
 //   timerModifiedXX -> timerMoving -> timerWaiting
 // adjusttimers (looks in P's timer heap):
 //   timerDeleted   -> timerRemoving -> timerRemoved
 //   timerModifiedXX -> timerMoving -> timerWaiting
 // runtimer (looks in P's timer heap):
 //   timerNoStatus   -> panic: uninitialized timer
 //   timerWaiting   -> timerWaiting or
 //   timerWaiting   -> timerRunning -> timerNoStatus or
 //   timerWaiting   -> timerRunning -> timerWaiting
 //   timerModifying -> wait until status changes
 //   timerModifiedXX -> timerMoving -> timerWaiting
 //   timerDeleted   -> timerRemoving -> timerRemoved
 //   timerRunning   -> panic: concurrent runtimer calls
 //   timerRemoved   -> panic: inconsistent timer heap
 //   timerRemoving   -> panic: inconsistent timer heap
 //   timerMoving     -> panic: inconsistent timer heap

计时器的创建 addtimer

文章开头提到了定时器的创建,我们先看一下首次调用的函数 startTimer中的 addtimer子函数。

 // startTimer adds t to the timer heap.
 //go:linkname startTimer time.startTimer
 func startTimer(t *timer) {
  if raceenabled {
  racerelease(unsafe.Pointer(t))
  }
  addtimer(t)
 }

此函数会将计时器添加到 timer heap 中。

addtimer

此函数功能的是实现将一个 timer

&Timer{
         C: c,
         r: runtimeTimer{
             when: when(d),
             f:    sendTime,
             arg:  c,
         },
     }

添加到当前的 P 的 p.timer 字段里,这个字段是一个切片类型,后面有时间我们将其称为 timer queue,其使用四叉堆算法维护 。只能在新创建一个 Timer 时,通过调用 startTimer() 函数来调用。

 /src/runtime/time.go#L245-L278
 func addtimer(t *timer) {
  // 必要条件检查
  if t.when <= 0 {
  throw("timer when must be positive")
  }
  if t.period < 0 {
  throw("timer period must be non-negative")
  }
  if t.status != timerNoStatus {
  throw("addtimer called with initialized timer")
  }
  t.status = timerWaiting
 ​
  when := t.when
 ​
  // Disable preemption while using pp to avoid changing another P's heap.
  mp := acquirem()
 ​
  pp := getg().m.p.ptr()
  lock(&pp.timersLock)
 
  // 清理 timer heap 中首个定时器,以加快创建(添加)和删除timer的速度,如果保留在堆中速度会慢
  cleantimers(pp)
  doaddtimer(pp, t)
  unlock(&pp.timersLock)
 ​
  wakeNetPoller(when)
 ​
  releasem(mp)
 }

整体流程

  1. 对 计时器值的一些字段进行检查,其中 when 值必须大于0, period 字段不能小于0; 通过 status != timerNoStatus 检查当前计时器值是不是刚刚创建的,上面已提到过这一点。
  2. 修改计时器值状态 timerNoStatus -> timerWaiting
  3. 调用mp := acquirem() 获取当前 m, 通过操作 m.locks++ 实现
  4. 获取当前的p, 同时加锁 &pp.timersLock,这是必须也是重要的一步。
  5. 首先清理计时器队列的头部 cleantimers(),以加快创建和删除定时器值的速度,如果将它们保留在heap中会减慢 addtimer 的速度;
  6. 调用 doaddtimer() 实现添加定时器值,这才是真正添加 timer 的实现。下面我们单独介绍它的实现。
  7. 进行解锁
  8. 它唤醒一个空闲的 P 来服务定时器和网络轮询器 wakeNetPoller(), 注意函数里有唤醒P的函数 wakep() 的调用
  9. 释放换取的m

doaddtimer

这里我们看下添加定时器的主要实现函数 doaddtimer

 /src/runtime/time.go#L280-L300
 // doaddtimer adds t to the current P's heap.
 // The caller must have locked the timers for pp.
 func doaddtimer(pp *p, t *timer) {
  // Timers rely on the network poller, so make sure the poller
  // has started.
  if netpollInited == 0 {
  netpollGenericInit()
  }
 ​
  if t.pp != 0 {
  throw("doaddtimer: P already set in timer")
  }
  t.pp.set(pp)
 
  // 新添加的 timer 放置到堆的最后一位,然后向前调整
  i := len(pp.timers)
  pp.timers = append(pp.timers, t)
  siftupTimer(pp.timers, i)
 
  if t == pp.timers[0] {
  atomic.Store64(&pp.timer0When, uint64(t.when))
  }
  atomic.Xadd(&pp.numTimers, 1)
 }

调用此函数时,需要对 P 加锁,即 lock(&pp.timersLock)

实现步骤:

  1. 由于计时器依赖于网络轮询器,因此请确保轮询器已经启动
  2. 检查当前定时器值是否已经被 addtimer 添加过,否则将抛出异常。这点在上面已提到过,只允许刚创建的定时器( t.pp 字段为类型默认值0)才允许通过 addtimer 添加到定时器队列。
  3. 将定时器 t.pp.set(pp) 绑定到 P
  4. 首先添加前,先获取定时器队列pp.times (切片类型)的大小,然后再新的定时器添加到 p.times
  5. 再调用 siftupTimer(pp.timers, i) 实现堆排序(四叉小顶堆),同时还有相反的 siftdownTimer() 实现。实现原理:每次添加元素都是往切片的最后面添加元素,然后调用此函数根据 t.when 字段向前移动。
  6. 如果 timer heap 的第一个定时器 pp.timers[0] 正好是刚刚添加的那个定时器(说明当前定时器需要最先执行),则修改 pp.timer0When 的值为当前定时器的时间t.when,表示下次最先执行,具体原理参考 checkTimes();如果 timer0When 字段值为 0, 则表示 timer heap 为空。
  7. 增加 pp.numTimers 的值,表示当前p中的 定时器 的个数,此语句原子操作
heap

可以看到实现中主要使用了四叉小顶堆算法,将新添加的定时器放在合适的位置,同时在 p 中对定时器进行了维护。

清理堆顶 cleantimers

cleantimers 清理定时器队列的头部。 这加快了创建和删除计时器的程序; 将它们留在堆中会减慢 addtimer。

 /src/runtime/time.go#L542-L592
 // cleantimers cleans up the head of the timer queue. This speeds up
 // programs that create and delete timers; leaving them in the heap
 // slows down addtimer. Reports whether no timer problems were found.
 // The caller must have locked the timers for pp.
 func cleantimers(pp *p) {
  gp := getg()
  for {
  if len(pp.timers) == 0 {
  return
  }
 ​
  // This loop can theoretically run for a while, and because
  // it is holding timersLock it cannot be preempted.
  // If someone is trying to preempt us, just return.
  // We can clean the timers later.
  if gp.preemptStop {
  return
  }
 ​
  // 当前定时器所属的 p 不是当前的p,则抛出异常
  t := pp.timers[0]
  if t.pp.ptr() != pp {
  throw("cleantimers: bad p")
  }
 
  // 根据定时器的状态分别处理
  switch s := atomic.Load(&t.status); s {
  case timerDeleted:
  // 状态 timerDeleted 变为 timerRemoving
  if !atomic.Cas(&t.status, s, timerRemoving) {
  continue
  }
 
  // 删除堆的首个元素,从堆中删除
  dodeltimer0(pp)
 
  // 再把状态从 timerRemoving 变为 timerRemoved
  if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
  badTimer()
  }
 
  // 更新当前 p 中删除定时器的统计个数
  atomic.Xadd(&pp.deletedTimers, -1)
 
  case timerModifiedEarlier, timerModifiedLater:
  if !atomic.Cas(&t.status, s, timerMoving) {
  continue
  }
 
  // 修改原来计划执行的时间
  t.when = t.nextwhen
 ​
  dodeltimer0(pp)
  doaddtimer(pp, t)
  if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
  badTimer()
  }
 
  default:
  // Head of timers does not need adjustment.
  return
  }
  }
 }

这里主要是根据 t.status 进行了不同的处理,以下操作只对 堆的首个定时器 执行。

timerDeleted:删除掉定时器,并把状态依次变更 timerDeleted -> timerRemoving -> timerRemoved ,最后原子更新 p.deletedTimers

timerModifiedEarlier, timerModifiedLater: 首先变更状态为 timerMoving,再更新定时器字段 t.when = t.nextwhen, 其次再调用 dodeltimer0 删除定时器, 将定时器的 t.pp 字段置为 0(和初始化定时器时的状态一样),接着调用 doaddtimer 重新添加到堆中。最后再将状态由 timerMoving 变更为 timerWaiting

这里定时器修改 t.whent.pp 两个字段,其中 t.pp 的修改是在 dodeltimer0() 中实现的。状态变更依次为 timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaiting,只要是 timerWaiting 状态就可以重新开始排队执行

其它状态:不做任何操作

这里用到了 cas 操作,如果一次失败,则通过 for 语句再试,直到修改成功。

删除堆顶元素 dodeltimer0

我们看下dodeltimer0()的实现,它可以实现将定时器从堆中删除

// dodeltimer0 removes timer 0 from the current P's heap.
// We are locked on the P when this is called.
// It reports whether it saw no problems due to races.
// The caller must have locked the timers for pp.
func dodeltimer0(pp *p) {
if t := pp.timers[0]; t.pp.ptr() != pp {
throw("dodeltimer0: wrong P")
} else {
t.pp = 0
}
last := len(pp.timers) - 1

// 说明至少有两个定时器元素
if last > 0 {
pp.timers[0] = pp.timers[last]
}

// 赋值nil, 以便GC
pp.timers[last] = nil
pp.timers = pp.timers[:last]
if last > 0 {
siftdownTimer(pp.timers, 0)
}
updateTimer0When(pp)
atomic.Xadd(&pp.numTimers, -1)
}
  1. 将堆首定时器绑定的 p 解绑t.pp = 0,以后此定时器会被GC回收掉
  2. 将最后的定时器移动到首位置(并释放其位置的资源),并调用 siftdownTimer() 函数实现向后调整位置
  3. 更新 p.timer0When 字段为首位置的 when 字段值,如果堆为空的话,则赋值为0
  4. 更新 p.numTimers 记录定时器个数的值

计时器的停止/删除 deltimer

当我们不再需要定时器的时候,调用 timer.Stop() 方法即可,下面我们看看这一块的实现原理。

// 如果停止计时器成功,则返回true, 如果计时器已经过期或已经停止,则返回false。说明多次调用 Stop() 方法并不会抛出异常。
// Stop() 方法并不会关闭 channel, 以防止从通道读取错误成功。
//
// 为确保调用 Stop() 后, channel是空,请检查返回值并清空channel。
// 例如,假设程序还没有从 t.C 接收到
//
// if !t.Stop() {
// <-t.C
// }
//
// 这不能与来自 Timer channel 的其他接收 或 对 Timer 的 Stop 方法的其他调用 同时完成。
//
// 通过 AfterFunc(d, f) 创建一个Timer, 如果 t.Stop 返回false, 那么 Timer 已经过期且函数f 已经在它自己的 goroutine 启动;
// Stop 并不会等待函数f 执行完成才返回
// 如果调用方需要知道函数f 是否执行完成,它必须与函数f 明确协调才可以。
//
func (t *Timer) Stop() bool {
if t.r.f == nil {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(&t.r)
}

这里的实现主要是调用 stopTimer 来实现的,我们再看下它的实现

// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
return deltimer(t)
}

它只是对 deltimer 函数进行了一次封装,再看下其实现方法

// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:

mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {

tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)

return true
} else {
releasem(mp)
}
case timerModifiedEarlier:

mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {

tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)

return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
return false
case timerRunning, timerMoving:

osyield()
case timerNoStatus:

return false
case timerModifying:

osyield()
default:
badTimer()
}
}
}

从函数的注释及实现原理得知此函数并不是真正的将计时器从当前P堆里删除,由于计时器可能工作在任何一个P上,所以只能对定时器的状态作删除标记,删除工作由定时器所在的P在合适的时机( dodeltimer dodeltimer0 )进行真正的删除。

想必大家还记得上面在添加定时器时调用的 addtimer, 其实现会调用 cleantimers 函数,此函数如何检查到 t.status 状态为 timerDeleted,则会将其删除。

可以看到 time.Stop() 的实现对应的是 deltimer() 函数,

可以看到 deltimer 函数也是对各种状态做不同的处理。

计时器的修改 modtimer

默认情况下 Timer 只执行一次,可能通过 timer.Reset() 方法实现再执行一次。多次使用的效果有点类似于 Ticker

func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
return resetTimer(&t.r, w)
}

此函数实现了将 Timer 设为过 d Duration 时间后再过期。

通过阅读官网对此函数的注释得知以下信息

  1. 如果当前的 Timer 已经是活动计时器,则 Reset 函数返回true;如果已过期或已停止则返回 false(过期和停止的区别?)
  2. 对于使用 NewTimer 创建的 Timer ,Reset 应仅在具有耗尽channel的停止或过期计时器上调用
  3. 如果程序已经从t.C 接收到一个值,则 Timer 是知道计时器已过期并且channel 已消费完,因此 Reset 可直接使用;如果一个程序还没有从 t.C 中接收值,如果想重置Timer,必须显示的 Stop() 停止并消费完channel才可以。if !t.Stop() {
    <-t.C
    }
  4. 不应该与来自定时器通道的其他接收同时进行
  5. 其它

这里调用了 resetTimer 函数,我们再看下它的实现

// resettimer resets the time when a timer should fire.
// If used for an inactive timer, the timer will become active.
// This should be called instead of addtimer if the timer value has been,
// or may have been, used previously.
// Reports whether the timer was modified before it was run.
func resettimer(t *timer, when int64) bool {
return modtimer(t, when, t.period, t.f, t.arg, t.seq)
}

这里又调用了 modtimer 函数对其值进行了修改,其实现

// modtimer modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
if when <= 0 {
throw("timer when must be positive")
}
if period < 0 {
throw("timer period must be non-negative")
}

status := uint32(timerNoStatus)
wasRemoved := false
var pending bool
var mp *m
loop:
for {
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
pending = true // timer not yet run
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()

// Timer was already run and t is no longer in a heap.
// Act like addtimer.
if atomic.Cas(&t.status, status, timerModifying) {
wasRemoved = true
pending = false // timer already run or stopped
break loop
}
releasem(mp)
case timerDeleted:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
pending = false // timer already stopped
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerModifying:
// Multiple simultaneous calls to modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}

t.period = period
t.f = f
t.arg = arg
t.seq = seq

if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
// The timer is in some other P's heap, so we can't change
// the when field. If we did, the other P's heap would
// be out of order. So we put the new when value in the
// nextwhen field, and let the other P set the when field
// when it is prepared to resort the heap.
t.nextwhen = when

newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}

tpp := t.pp.ptr()

if newStatus == timerModifiedEarlier {
updateTimerModifiedEarliest(tpp, when)
}

// Set the new status of the timer.
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
releasem(mp)

// If the new status is earlier, wake up the poller.
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}

return pending
}

计时器的执行 runtimer

对于计时器的执行是通过调用 runtimer() 函数来实现的,执行的仅仅是首个计时器元素,并不是所有的计时器。切记此函数调用前需要进行加 p.timersLock 锁。此函数一共有两个参数,一个返回值

func runtimer(pp *p, now int64) int64 {
for {
// 堆顶元素
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
// Not ready to run.
return t.when
}

if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
// Note that runOneTimer may temporarily unlock
// pp.timersLock.

// 真正的执行函数
runOneTimer(pp, t, now)
return 0

case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
if len(pp.timers) == 0 {
return -1
}

case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}

case timerModifying:
// Wait for modification to complete.
osyield()

case timerNoStatus, timerRemoved:
// Should not see a new or inactive timer on the heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// These should only be set when timers are locked,
// and we didn't do it.
badTimer()
default:
badTimer()
}
}
}
  • *p 执行的是哪个P上面的计时器,对应的是 p.timers 字段,是一个切片类型。由于其切片值的实现是用四小堆来实现的,堆顶的元素 timers[0] 表示最先执行的,所以只需要对 timer[0]字段做检查即可。
  • now 当前的时间。在执行的时候,会对当前的时间进行对比以此来判断是否需要执行
  • 返回值:如果运行了当前 p.timers[0] 则返回0,如果当前P已经没有 timer 了则返回-1,否则返回首个计时器的要运行时间

函数的实现原理是根据当前定时器元素的不同状态分别处理.。

  • timerWaiting 如果计时器指定的执行的时间晚于当前时间now,则直接返回计时器的指定时间 t.when,否则修改其状态为timerRunning,然后调用runOneTimer() 函数正式执行定时器。
  • timerDeleted 如果当前计时器已经处于被删除的状态,则调用 dodeltimer0() 函数将其元素从当前 p.timers 字段里彻底移除,同时更新 p.DeletedTimers 字段。对于这点我们前面的 deltimer已经介绍过,一个p只能删除将当前p上面处于 timerDeleted 状态的元素彻底从堆中删除,其它p只能对其对 timerDeleted 标记.
  • timerModifiedEarlier/timerModifiedLater 首先变更状态为 timerMoving,再更新定时器字段 t.when = t.nextwhen, 其次再调用 dodeltimer0 删除定时器, 将定时器的 t.pp 字段置为 0(和初始化定时器时的状态一样),接着调用 doaddtimer 重新添加到堆中。最后再将状态由 timerMoving 变更为 timerWaiting。这里定时器修改 t.whent.pp 两个字段,其中 t.pp 的修改是在 dodeltimer0() 中实现的。状态变更依次为 timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaiting, 只要是 timerWaiting 状态就可以重新开始排队执行这个情况下的处理 与上面介绍的 cleantimer 是完全一样的。
  • timerModifying 等待完成
  • 其它 非法状态

我们再看下 runOneTimer 的实现

// runOneTimer runs a single timer.
// The caller must have locked the timers for pp.
// This will temporarily unlock the timers while running the timer function.
//go:systemstack
func runOneTimer(pp *p, t *timer, now int64) {
// 函数名
f := t.f
// 函数参数
arg := t.arg
seq := t.seq

if t.period > 0 {
// 调整 t.when 字段
// Leave in heap but adjust next time to fire.
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)

// 溢出处理
if t.when < 0 { // check for overflow.
t.when = maxWhen
}
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {
// Remove from heap.
// 彻底从堆中删除
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}

...

// 先解锁,再执行,最后再加上原来的锁
unlock(&pp.timersLock)
f(arg, seq) // 函数调用
lock(&pp.timersLock)

...
}

计时器的获取 timeSleepUntil

sysmon 监控线程里会有几个主要任务,其中一项就是对 timer 的管理。我们看下函数 timeSleepUntil 的实现。

// timeSleepUntil returns the time when the next timer should fire,
// and the P that holds the timer heap that that timer is on.
// This is only called by sysmon and checkdead.
func timeSleepUntil() (int64, *p) {
// 默认一个系统允许的最大值
next := int64(maxWhen)
var pret *p

// Prevent allp slice changes. This is like retake.
lock(&allpLock)
for _, pp := range allp {
if pp == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}

// 如果 timer0When 字段值为0, 则表示当前P没有定时器
w := int64(atomic.Load64(&pp.timer0When))
if w != 0 && w < next {
next = w
pret = pp
}

w = int64(atomic.Load64(&pp.timerModifiedEarliest))
if w != 0 && w < next {
next = w
pret = pp
}
}
unlock(&allpLock)

return next, pret
}

调用 此函数可以返回所有P中的需要执行的下一个计时器的触发时间及其所在P, 当前函数只能在 sysmon监控线程和 checkdead 检查死锁函数中调用 。

此操作需要添加全局 allpLock 锁。通过遍历所有P 中的 pp.timer0Whenpp.timerModifiedEarliest 字段来找出最小值,其中 pp.timer0Whenpp.timers[0] 元素的 when 值。如果当前系统还没有计时器的话,则会返回默认的最大值,则 sysmon 会直接忽略处理。

可以看到这里如果直接通过直接判断字段 pp.timers[0] 的话,由于其数据类型为切片,所以需要加锁,开销有些大。

定时器的管理 checkTimers

检查指定P上面所有timers 是否已准备就绪。

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {}

如果参数 now值为非 0,则表示当前时间;如果为0,则返回的值 rnow可能为当前时间或0,且返回下一个计时器应该运行的时间,如果没有下一个计时器,则为0,并报告(参数ran )计时器是否在运行;如果下一个计时器运行的时间为非0,它的返回值总是大于返回的时间;

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// 如果当前p 没有任何定时器或第一次调整计时器,则什么也不做
next := int64(atomic.Load64(&pp.timer0When))
nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
if next == 0 || (nextAdj != 0 && nextAdj < next) {
next = nextAdj
}

if next == 0 {
// 直接返回
return now, 0, false
}

if now == 0 {
now = nanotime()
}
if now < next {
// Next timer is not ready to run, but keep going
// if we would clear deleted timers.
// This corresponds to the condition below where
// we decide whether to call clearDeletedTimers.
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}

lock(&pp.timersLock)

// 调整所有定时器
if len(pp.timers) > 0 {
adjusttimers(pp, now)
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}

// 如果当前所检查的P正是当前使用的P,则调用 clearDeleteTimers 函数清理掉所有被标记为删除的timer,将其从P的heap中删除,以减少对 timersLock 锁的使用。
// 再次说明了每个P只能清理自己的timer
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}

unlock(&pp.timersLock)

return now, pollUntil, ran
}

调整计时器 adjusttimers

在当前 P 的堆中查找任何已修改为更早运行的定时器,并将它们放在堆中的正确位置。

在查找这些计时器时,它还会移动已修改为稍后运行的计时器,并删除已删除的计时器。 调用者必须锁定 pp 的计时器。

/src/runtime/time.go#L653
func adjusttimers(pp *p, now int64) {
first := atomic.Load64(&pp.timerModifiedEarliest)
if first == 0 || int64(first) > now {
if verifyTimers {
verifyTimerHeap(pp)
}
return
}

// We are going to clear all timerModifiedEarlier timers.
atomic.Store64(&pp.timerModifiedEarliest, 0)

var moved []*timer
for i := 0; i < len(pp.timers); i++ {
t := pp.timers[i]
if t.pp.ptr() != pp {
throw("adjusttimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
if atomic.Cas(&t.status, s, timerRemoving) {
changed := dodeltimer(pp, i)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
// Now we can change the when field.
t.when = t.nextwhen
// Take t off the heap, and hold onto it.
// We don't add it back yet because the
// heap manipulation could cause our
// loop to skip some other timer.
changed := dodeltimer(pp, i)
moved = append(moved, t)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
badTimer()
case timerWaiting:
// OK, nothing to do.
case timerModifying:
// Check again after modification is complete.
osyield()
i--
default:
badTimer()
}
}

if len(moved) > 0 {
addAdjustedTimers(pp, moved)
}

if verifyTimers {
verifyTimerHeap(pp)
}
}

遍历 p.timers 字段,根据每个计时器状态分别处理

  • timerDeleted:状态转换。timerDeleted -> timerRemoving -> timerRemoved,更新删除定时器统计 p.deletedTimers
  • timerModifiedEarlier, timerModifiedLater:转换为 timerMoving 状态,同步t.when字(t.when=t.nextwhen)。将删除的定时器放在 moved 切片中最后将 moved 中的所有 timer 重新添加到 timer heap中。// addAdjustedTimers adds any timers we adjusted in adjusttimers
    // back to the timer heap.
    func addAdjustedTimers(pp *p, moved []*timer) {
    for _, t := range moved {
    doaddtimer(pp, t)
    if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
    badTimer()
    }
    }
    }

清理计时器 clearDeletedTimers

此函数是除了STW期间运行 moveTimers 之外的唯一一个遍历整个timer heap 的函数。此函数的执行一定要持有 timerLock 锁

func clearDeletedTimers(pp *p) {
// 清理掉P中的 timerModifiedEarlier 计时器
// Do this now in case new ones show up while we are looping.
atomic.Store64(&pp.timerModifiedEarliest, 0)

// 删除的数量
cdel := int32(0)
// 切片索引位置,首个要删除的索引位置
to := 0
changedHeap := false
timers := pp.timers

// 遍历当前P所有的计时器,针对不同状态作相应的处理,最张将要清除的定时器移到最后面
nextTimer:
for _, t := range timers {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if changedHeap {
timers[to] = t
siftupTimer(timers, to)
}
to++
continue nextTimer
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
t.when = t.nextwhen
timers[to] = t
siftupTimer(timers, to)
to++
changedHeap = true
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
continue nextTimer
}
case timerDeleted:
// 真正的删除操作
if atomic.Cas(&t.status, s, timerRemoving) {
t.pp = 0
cdel++
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
changedHeap = true
continue nextTimer
}
case timerModifying:
// Loop until modification complete.
osyield()
case timerNoStatus, timerRemoved:
// We should not see these status values in a timer heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// Some other P thinks it owns this timer,
// which should not happen.
badTimer()
default:
badTimer()
}
}
}


// 清理定时器,以便GC回收,同时更新统计数据
for i := to; i < len(timers); i++ {
timers[i] = nil
}

// 更新统计数据
atomic.Xadd(&pp.deletedTimers, -cdel)
atomic.Xadd(&pp.numTimers, -cdel)

// 更新 p.timers 字段
timers = timers[:to]
pp.timers = timers
updateTimer0When(pp)

// 检查堆的状态
if verifyTimers {
verifyTimerHeap(pp)
}
}

可以看到在清理过程中,会对堆元素进行位置的调整,将所有要删除的定时器放在切片的尾部,最后通过索引位置进行 nil 操作,以便GC回收,剩下的就是一些更新统计 p.deleteTimersp.numTimers字段和重置 updateTimer0When 操作。

移除计时器 dodeltimer

我们上面已介绍过另一个移除计时器的函数 dodeltimer0() ,它实现移除堆顶计时器。这里我们介绍的是另一个相同功能的函数,可以实现自定义移除指定索引位置的元素。

dodeltimer 实现从p的 timer heap 中移除索引值为 i 的计时器,并返回最小的调整索引大小值。

// dodeltimer removes timer i from the current P's heap.
// We are locked on the P when this is called.
// It returns the smallest changed index in pp.timers.
// The caller must have locked the timers for pp.
func dodeltimer(pp *p, i int) int {
if t := pp.timers[i]; t.pp.ptr() != pp {
throw("dodeltimer: wrong P")
} else {
// 解除计时器与 P 的绑定关系
t.pp = 0
}
last := len(pp.timers) - 1
if i != last {
pp.timers[i] = pp.timers[last]
}
pp.timers[last] = nil
pp.timers = pp.timers[:last]
smallestChanged := i
if i != last {
// Moving to i may have moved the last timer to a new parent,
// so sift up to preserve the heap guarantee.
// 重新调整位置
smallestChanged = siftupTimer(pp.timers, i)
siftdownTimer(pp.timers, i)
}
if i == 0 {
// 删除的是当前仅有的一个timer,同重置 time0When 字段
updateTimer0When(pp)
}
atomic.Xadd(&pp.numTimers, -1)
return smallestChanged
}

操作流程

  1. 解除索引值为i 位置的计时器与P的绑定关系
  2. 将最后一个元素赋值到索引 i 位置
  3. 将最后一个元素的位置置为 nil
  4. 重新调整堆中 i 位置的索引,分别调用 siftupTimersiftdownTimer