一文看懂Golang 定时器源码
By admin
- 18 minutes read - 3709 words计时器分 Timer 和 Ticker 两种,它们底层基本是一样的,两差的区别请参考 , 这里我们的介绍对象是 Timer 。golang timer
计时器结构体
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
通过调用 NewTimer()
函数创建一个 Timer
,首先创建一个长度为1的有缓冲channel,再创建一个Timer
的结构体,并将 channel 置于 Timer 结构体内。
注意这里的 runtimeTimer.f 字段是一个函数 sendTime ,其实现如下
func sendTime(c interface{}, seq uintptr) {
// Non-blocking send of time on c.
// Used in NewTimer, it cannot block anyway (buffer).
// Used in NewTicker, dropping sends on the floor is
// the desired behavior when the reader gets behind,
// because the sends are periodic.
select {
case c.(chan Time) <- Now():
default:
}
}
当 sendTime 函数主要用在 newTimer() 时,它以无阻塞的方式将当前时间 Now() 发送到 c 通道里。当用在 newTicker()
时,如果读取落后,会将发送丢弃,它是周期性的。
我们给出 Timer 的结构体声明。
type Timer struct {
C <-chan Time
r runtimeTimer
}
一共两个字段,为了理解方便我们称 runtimeTimer
为 timer
值。
我们再看一下其中的 runtimeTimer
结构体声明
// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
pp uintptr
when int64
period int64
f func(interface{}, uintptr) // NOTE: must not be closure
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
对于 runnerTimer
结构体要与在 runtime/time.go 文件中的 timer
结构体保持同步。
结构体字段说明
- pp 指针类型,这里指 GPM 中的 P。如果这个计时器 timer 在一个heap 上,它在哪个 P 的堆上
- when 表示唤醒执行的时间,表示什么时间开始执行
- period 周期,一定是大于 ``;
when+period
表示下次唤醒执行的时间 - f 执行函数,不允许为匿名函数,最好为非阻塞函数
- arg 上面f函数的参数
- seq 同 arg,其在 runOneTimer 函数中的调用方式为 f(arg, seq)
- nextwhen 下次运行的时间,其值只有在
timerModifiedXX status
状态下才设置 - status 状态,其定义的的可用值有10种,定义在 runtime/time.go,我们下面对这些状态进行了介绍。
每次开启一个goroutine 执行 f(arg, now),基中when表示执行的时间,而 when+period 表示下次执行的时间。(这时有点疑问,对调用的函数参数,f的第二个参数是 now, 但后面介绍的时候第二个参数却是 seq)
通过查看 可知以下几点:
- 在新创建一个
Timer
的时候,可以设置的只有when
、period
、f
、arg
、seq
这五个字段。 - 创建完一个 Timer 后,对于 Timer 的值(
time.runtimeTimer
或runtime.timer
),只能通过addtimer
传递( 调用startTimer()
函数),以后不能修改任何字段。 - 一个活动计时器(一个已经传递给
addtimer
)可能会被传递给deltimer
(time.stopTimer
),之后它就不再是一个活动计时器了,称其为非活动计时器。 而在一个非活动周期内,f
,arg
, 和seq
三个字段可以被修改,但when
字段不会被修改。当删除一个非活动计时器,GC会将其自动回收,只有新分配的计时器值可以传递给addtimer
,而不能将非活动计时器传递给addtimer
。 - 可以将一个活动计时器传递给
modtimer
,它不会修改任何内容,其仍是一个活动计时器。 - 一个非活动计时器可以传递给
resetTimer
, 通过修改when
字段,让其变成活动计时器。新创建的定时器值传递给resetTimer
也是可以的(上面说过也可以传递给 addtimer)。 - 计时器的操作函数有
addtimer
,deltimer
,modtimer
,resettimer
,cleantimers
,adjusttimers
, 和runtimer
- 我们不允许同时调用
addtimer/deltimer/modtimer/resettimer
,但adjusttimer
和runtimer
可以与任何一个同时调用。 - 活动计时器存在于
P
的heap
中, 即timer
结构体中的pp
字段,非活动计时器也暂时存储在这里的,不过是临时的,直到被移走。 - 计时器状态的流转关系,这里不一一列出,可以直接通过查看注释。
通过注释我们大概知道了一些与 timer 相关的信息,其中提到了一些操作函数,在 runtime 里都有其介绍,下面我们分析看看每个函数的实现来加深一下理解。
计时器的几种状态
与计时器相关的状态一共有 10
种
- timerNoStatus 未知状态。只有通过 NewTimer 函数创建的才处理这种状态
- timerWaiting 等待计时器启动,timer 处于堆中
- timerRunning 正在运行,此状态比较短暂
- timerDeleted 已删除,等待被从堆
p.timers
中 Remove 移除 - timerRemoving 正在移除,短暂状态,从timerDeleted 到 timerRemoved 中的中间过滤状态
- timerRemoved 已被移除,当前已不在堆中
- timerModifying 修改中状态,短暂状态
- ttimerModifiedEarlier 被修改为更早的状态,新的
when
值在nextwhen
字段中,表示下次立即执行,在堆中或者错误的位置 - timerModifiedLater 被修改为相同或更晚的状态,新的
when
值在nextwhen
字段中,表示下次立即执行,在堆中或者错误的位置 - timerMoving 已修改并在移动,短暂状态
活动计时器位于 P
的 heap
中,即 p.timers
字段;非活动的计时器也可能短暂的存在heap里,直到它们被 removed
;
每种状态相关流转状态
// addtimer:
// timerNoStatus -> timerWaiting
// anything else -> panic: invalid value
// deltimer:
// timerWaiting -> timerModifying -> timerDeleted
// timerModifiedEarlier -> timerModifying -> timerDeleted
// timerModifiedLater -> timerModifying -> timerDeleted
// timerNoStatus -> do nothing
// timerDeleted -> do nothing
// timerRemoving -> do nothing
// timerRemoved -> do nothing
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerModifying -> wait until status changes
// modtimer:
// timerWaiting -> timerModifying -> timerModifiedXX
// timerModifiedXX -> timerModifying -> timerModifiedYY
// timerNoStatus -> timerModifying -> timerWaiting
// timerRemoved -> timerModifying -> timerWaiting
// timerDeleted -> timerModifying -> timerModifiedXX
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerRemoving -> wait until status changes
// timerModifying -> wait until status changes
// cleantimers (looks in P's timer heap):
// timerDeleted -> timerRemoving -> timerRemoved
// timerModifiedXX -> timerMoving -> timerWaiting
// adjusttimers (looks in P's timer heap):
// timerDeleted -> timerRemoving -> timerRemoved
// timerModifiedXX -> timerMoving -> timerWaiting
// runtimer (looks in P's timer heap):
// timerNoStatus -> panic: uninitialized timer
// timerWaiting -> timerWaiting or
// timerWaiting -> timerRunning -> timerNoStatus or
// timerWaiting -> timerRunning -> timerWaiting
// timerModifying -> wait until status changes
// timerModifiedXX -> timerMoving -> timerWaiting
// timerDeleted -> timerRemoving -> timerRemoved
// timerRunning -> panic: concurrent runtimer calls
// timerRemoved -> panic: inconsistent timer heap
// timerRemoving -> panic: inconsistent timer heap
// timerMoving -> panic: inconsistent timer heap
计时器的创建 addtimer
文章开头提到了定时器的创建,我们先看一下首次调用的函数 startTimer
中的 addtimer
子函数。
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
此函数会将计时器添加到 timer heap 中。
addtimer
此函数功能的是实现将一个 timer
&Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
添加到当前 P
的 p.timer
字段里,这个字段是一个切片类型,后面有时候我们将其称为 timer queue
,其使用四叉堆算法维护 。只能在新创建一个 Timer
时,通过调用 startTimer()
函数来调用。
// src/runtime/time.go#L245-L278
func addtimer(t *timer) {
// 必要条件检查
if t.when <= 0 {
throw("timer when must be positive")
}
if t.period < 0 {
throw("timer period must be non-negative")
}
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
t.status = timerWaiting
when := t.when
// Disable preemption while using pp to avoid changing another P's heap.
mp := acquirem()
pp := getg().m.p.ptr()
lock(&pp.timersLock)
// 清理 timer heap 中首个定时器,以加快创建(添加)和删除timer的速度,如果保留在堆中速度会慢
cleantimers(pp)
doaddtimer(pp, t)
unlock(&pp.timersLock)
wakeNetPoller(when)
releasem(mp)
}
整体流程
- 对 计时器值的一些字段进行检查,其中 when 值必须大于0, period 字段不能小于0; 通过
status != timerNoStatus
检查当前计时器值是不是刚刚创建的,上面已提到过这一点。 - 修改计时器值状态
timerNoStatus -> timerWaiting
- 调用
mp := acquirem()
获取当前m
, 通过操作m.locks++
实现 - 获取当前的
p
, 同时加锁 &pp.timersLock,这是必须也是重要的一步。 - 首先清理计时器队列的头部 cleantimers(),以加快创建和删除定时器值的速度,如果将它们保留在heap中会减慢
addtimer
的速度; - 调用
doaddtimer()
实现添加定时器值,这才是真正添加 timer 的实现。下面我们单独介绍它的实现。 - 进行解锁
- 它唤醒一个空闲的 P 来服务定时器和网络轮询器 wakeNetPoller(), 注意函数里有唤醒P的函数 wakep() 的调用
- 释放换取的m
doaddtimer
这里我们看下添加定时器的主要实现函数 doaddtimer
/src/runtime/time.go#L280-L300
// doaddtimer adds t to the current P's heap.
// The caller must have locked the timers for pp.
func doaddtimer(pp *p, t *timer) {
// Timers rely on the network poller, so make sure the poller
// has started.
if netpollInited == 0 {
netpollGenericInit()
}
if t.pp != 0 {
throw("doaddtimer: P already set in timer")
}
t.pp.set(pp)
// 新添加的 timer 放置到堆的最后一位,然后向前调整
i := len(pp.timers)
pp.timers = append(pp.timers, t)
siftupTimer(pp.timers, i)
if t == pp.timers[0] {
atomic.Store64(&pp.timer0When, uint64(t.when))
}
atomic.Xadd(&pp.numTimers, 1)
}
调用此函数时,需要对 P 加锁,即 lock(&pp.timersLock)
。
实现步骤:
- 由于计时器依赖于网络轮询器,因此请确保轮询器已经启动
- 检查当前定时器值是否已经被
addtimer
添加过,否则将抛出异常。这点在上面已提到过,只允许刚创建的定时器( t.pp 字段为类型默认值0)才允许通过addtimer
添加到定时器队列。 - 将定时器
t.pp.set(pp)
绑定到 P - 首先添加前,先获取定时器队列
pp.times
(切片类型)的大小,然后再新的定时器添加到p.times
中 - 再调用 siftupTimer(pp.timers, i) 实现堆排序(四叉小顶堆),同时还有相反的 siftdownTimer() 实现。实现原理:每次添加元素都是往切片的最后面添加元素,然后调用此函数根据
t.when
字段向前移动。 - 如果 timer heap 的第一个定时器
pp.timers[0]
正好是刚刚添加的那个定时器(说明当前定时器需要最先执行),则修改pp.timer0When
的值为当前定时器的时间t.when
,表示下次最先执行,具体原理参考 checkTimes();如果 timer0When 字段值为 0, 则表示 timer heap 为空。 - 增加 pp.numTimers 的值,表示当前p中的 定时器 的个数,此语句原子操作heap
可以看到实现中主要使用了四叉小顶堆算法,将新添加的定时器放在合适的位置,同时在 p 中对定时器进行了维护。
清理堆顶 cleantimers
cleantimers 清理定时器队列的头部。 这加快了创建和删除计时器的程序; 将它们留在堆中会减慢 addtimer。
/src/runtime/time.go#L542-L592
// cleantimers cleans up the head of the timer queue. This speeds up
// programs that create and delete timers; leaving them in the heap
// slows down addtimer. Reports whether no timer problems were found.
// The caller must have locked the timers for pp.
func cleantimers(pp *p) {
gp := getg()
for {
if len(pp.timers) == 0 {
return
}
// This loop can theoretically run for a while, and because
// it is holding timersLock it cannot be preempted.
// If someone is trying to preempt us, just return.
// We can clean the timers later.
if gp.preemptStop {
return
}
// 当前定时器所属的 p 不是当前的p,则抛出异常
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("cleantimers: bad p")
}
// 根据定时器的状态分别处理
switch s := atomic.Load(&t.status); s {
case timerDeleted:
// 状态 timerDeleted 变为 timerRemoving
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
// 删除堆的首个元素,从堆中删除
dodeltimer0(pp)
// 再把状态从 timerRemoving 变为 timerRemoved
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
// 更新当前 p 中删除定时器的统计个数
atomic.Xadd(&pp.deletedTimers, -1)
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
// 修改原来计划执行的时间
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
default:
// Head of timers does not need adjustment.
return
}
}
}
这里主要是根据 t.status
进行了不同的处理,以下操作只对 堆的首个定时器 执行。
timerDeleted:删除掉定时器,并把状态依次变更 timerDeleted -> timerRemoving -> timerRemoved
,最后原子更新 p.deletedTimers
timerModifiedEarlier, timerModifiedLater: 首先变更状态为 timerMoving
,再更新定时器字段 t.when = t.nextwhen
, 其次再调用 dodeltimer0
删除定时器, 将定时器的 t.pp
字段置为 0(和初始化定时器时的状态一样),接着调用 doaddtimer
重新添加到堆中。最后再将状态由 timerMoving
变更为 timerWaiting
。
这里定时器修改 t.when
和 t.pp
两个字段,其中 t.pp 的修改是在 dodeltimer0()
中实现的。状态变更依次为 timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaiting
,只要是 timerWaiting
状态就可以重新开始排队执行
其它状态:不做任何操作
这里用到了 cas 操作,如果一次失败,则通过 for 语句再试,直到修改成功。
删除堆顶元素 dodeltimer0
我们看下dodeltimer0()
的实现,它可以实现将定时器从堆中删除
// dodeltimer0 removes timer 0 from the current P's heap.
// We are locked on the P when this is called.
// It reports whether it saw no problems due to races.
// The caller must have locked the timers for pp.
func dodeltimer0(pp *p) {
if t := pp.timers[0]; t.pp.ptr() != pp {
throw("dodeltimer0: wrong P")
} else {
t.pp = 0
}
last := len(pp.timers) - 1
// 说明至少有两个定时器元素
if last > 0 {
pp.timers[0] = pp.timers[last]
}
// 赋值nil, 以便GC
pp.timers[last] = nil
pp.timers = pp.timers[:last]
if last > 0 {
siftdownTimer(pp.timers, 0)
}
updateTimer0When(pp)
atomic.Xadd(&pp.numTimers, -1)
}
- 将堆首定时器绑定的 p 解绑
t.pp = 0
,以后此定时器会被GC回收掉 - 将最后的定时器移动到首位置(并释放其位置的资源),并调用
siftdownTimer()
函数实现向后调整位置 - 更新
p.timer0When
字段为首位置的when
字段值,如果堆为空的话,则赋值为`` - 更新
p.numTimers
记录定时器个数的值
计时器的停止/删除 deltimer
当我们不再需要定时器的时候,调用 timer.Stop() 方法即可,下面我们看看这一块的实现原理。
// 如果停止计时器成功,则返回true, 如果计时器已经过期或已经停止,则返回false。说明多次调用 Stop() 方法并不会抛出异常。
// Stop() 方法并不会关闭 channel, 以防止从通道读取错误成功。
//
// 为确保调用 Stop() 后, channel是空,请检查返回值并清空channel。
// 例如,假设程序还没有从 t.C 接收到
//
// if !t.Stop() {
// <-t.C
// }
//
// 这不能与来自 Timer channel 的其他接收 或 对 Timer 的 Stop 方法的其他调用 同时完成。
//
// 通过 AfterFunc(d, f) 创建一个Timer, 如果 t.Stop 返回false, 那么 Timer 已经过期且函数f 已经在它自己的 goroutine 启动;
// Stop 并不会等待函数f 执行完成才返回
// 如果调用方需要知道函数f 是否执行完成,它必须与函数f 明确协调才可以。
//
func (t *Timer) Stop() bool {
if t.r.f == nil {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(&t.r)
}
这里的实现主要是调用 stopTimer 来实现的,我们再看下它的实现
// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
return deltimer(t)
}
它只是对 deltimer 函数进行了一次封装,再看下其实现方法
// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerModifiedEarlier:
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
return false
case timerRunning, timerMoving:
osyield()
case timerNoStatus:
return false
case timerModifying:
osyield()
default:
badTimer()
}
}
}
从函数的注释及实现原理得知此函数并不是真正的将计时器从当前P堆里删除,由于计时器可能工作在任何一个P上,所以只能对定时器的状态作删除标记,删除工作由定时器所在的P在合适的时机( dodeltimer 或 dodeltimer0)进行真正的删除。
想必大家还记得上面在添加定时器时调用的 addtimer
, 其实现会调用 cleantimers 函数,此函数如何检查到 t.status
状态为 timerDeleted
,则会将其删除。
可以看到 time.Stop() 的实现对应的是 deltimer() 函数,
可以看到 deltimer 函数也是对各种状态做不同的处理。
计时器的修改 modtimer
默认情况下 Timer
只执行一次,可能通过 timer.Reset() 方法实现再执行一次。多次使用的效果有点类似于 Ticker
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
return resetTimer(&t.r, w)
}
此函数实现了将 Timer
设为过 d Duration
时间后再过期。
通过阅读官网对此函数的注释得知以下信息
- 如果当前的 Timer 已经是活动计时器,则 Reset 函数返回true;如果已过期或已停止则返回 false(过期和停止的区别?)
- 对于使用
NewTimer
创建的Timer
,Reset 应仅在具有耗尽channel的停止或过期计时器上调用 - 如果程序已经从
t.C
接收到一个值,则 Timer 是知道计时器已过期并且channel 已消费完,因此 Reset 可直接使用;如果一个程序还没有从 t.C 中接收值,如果想重置Timer,必须显示的Stop()
停止并消费完channel才可以。if !t.Stop() { <-t.C } - 不应该与来自定时器通道的其他接收同时进行
- 其它
这里调用了 resetTimer
函数,我们再看下它的实现
// resettimer resets the time when a timer should fire.
// If used for an inactive timer, the timer will become active.
// This should be called instead of addtimer if the timer value has been,
// or may have been, used previously.
// Reports whether the timer was modified before it was run.
func resettimer(t *timer, when int64) bool {
return modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
这里又调用了 modtimer
函数对其值进行了修改,其实现
// modtimer modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
if when <= 0 {
throw("timer when must be positive")
}
if period < 0 {
throw("timer period must be non-negative")
}
status := uint32(timerNoStatus)
wasRemoved := false
var pending bool
var mp *m
loop:
for {
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
pending = true // timer not yet run
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
// Timer was already run and t is no longer in a heap.
// Act like addtimer.
if atomic.Cas(&t.status, status, timerModifying) {
wasRemoved = true
pending = false // timer already run or stopped
break loop
}
releasem(mp)
case timerDeleted:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
pending = false // timer already stopped
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerModifying:
// Multiple simultaneous calls to modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}
t.period = period
t.f = f
t.arg = arg
t.seq = seq
if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
// The timer is in some other P's heap, so we can't change
// the when field. If we did, the other P's heap would
// be out of order. So we put the new when value in the
// nextwhen field, and let the other P set the when field
// when it is prepared to resort the heap.
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
tpp := t.pp.ptr()
if newStatus == timerModifiedEarlier {
updateTimerModifiedEarliest(tpp, when)
}
// Set the new status of the timer.
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
releasem(mp)
// If the new status is earlier, wake up the poller.
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
return pending
}
计时器的执行 runtimer
对于计时器的执行是通过调用 runtimer() 函数来实现的,执行的仅仅是首个计时器元素,并不是所有的计时器。切记此函数调用前需要进行加 p.timersLock
锁。此函数一共有两个参数,一个返回值
func runtimer(pp *p, now int64) int64 {
for {
// 堆顶元素
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
// Not ready to run.
return t.when
}
if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
// Note that runOneTimer may temporarily unlock
// pp.timersLock.
// 真正的执行函数
runOneTimer(pp, t, now)
return 0
case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
if len(pp.timers) == 0 {
return -1
}
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
case timerModifying:
// Wait for modification to complete.
osyield()
case timerNoStatus, timerRemoved:
// Should not see a new or inactive timer on the heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// These should only be set when timers are locked,
// and we didn't do it.
badTimer()
default:
badTimer()
}
}
}
- *p 执行的是哪个P上面的计时器,对应的是
p.timers
字段,是一个切片类型。由于其切片值的实现是用四小堆来实现的,堆顶的元素timers[0]
表示最先执行的,所以只需要对timer[0]
字段做检查即可。 - now 当前的时间。在执行的时候,会对当前的时间进行对比以此来判断是否需要执行
- 返回值:如果运行了当前
p.timers[0]
则返回``,如果当前P已经没有 timer 了则返回-1
,否则返回首个计时器的要运行时间
函数的实现原理是根据当前定时器元素的不同状态分别处理.。
- timerWaiting 如果计时器指定的执行的时间晚于当前时间
now
,则直接返回计时器的指定时间t.when
,否则修改其状态为timerRunning
,然后调用runOneTimer()
函数正式执行定时器。 - timerDeleted 如果当前计时器已经处于被删除的状态,则调用
dodeltimer0()
函数将其元素从当前p.timers
字段里彻底移除,同时更新p.DeletedTimers
字段。对于这点我们前面的deltimer
已经介绍过,一个p只能删除将当前p上面处于timerDeleted
状态的元素彻底从堆中删除,其它p只能对其对timerDeleted
标记. - timerModifiedEarlier/timerModifiedLater 首先变更状态为
timerMoving
,再更新定时器字段t.when = t.nextwhen
, 其次再调用dodeltimer0
删除定时器, 将定时器的t.pp
字段置为 0(和初始化定时器时的状态一样),接着调用doaddtimer
重新添加到堆中。最后再将状态由timerMoving
变更为timerWaiting
。这里定时器修改t.when
和t.pp
两个字段,其中 t.pp 的修改是在dodeltimer0()
中实现的。状态变更依次为timerModifiedEarlier/timerModifiedLater -> timerMoving -> timerWaiting
, 只要是 timerWaiting 状态就可以重新开始排队执行这个情况下的处理 与上面介绍的 cleantimer 是完全一样的。 - timerModifying 等待完成
- 其它 非法状态
我们再看下 runOneTimer
的实现
// runOneTimer runs a single timer.
// The caller must have locked the timers for pp.
// This will temporarily unlock the timers while running the timer function.
//go:systemstack
func runOneTimer(pp *p, t *timer, now int64) {
// 函数名
f := t.f
// 函数参数
arg := t.arg
seq := t.seq
if t.period > 0 {
// 调整 t.when 字段
// Leave in heap but adjust next time to fire.
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
// 溢出处理
if t.when < 0 { // check for overflow.
t.when = maxWhen
}
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {
// Remove from heap.
// 彻底从堆中删除
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}
...
// 先解锁,再执行,最后再加上原来的锁
unlock(&pp.timersLock)
f(arg, seq) // 函数调用
lock(&pp.timersLock)
...
}
计时器的获取 timeSleepUntil
在 sysmon 监控线程里会有几个主要任务,其中一项就是对 timer 的管理。我们看下函数 timeSleepUntil
的实现。
// timeSleepUntil returns the time when the next timer should fire,
// and the P that holds the timer heap that that timer is on.
// This is only called by sysmon and checkdead.
func timeSleepUntil() (int64, *p) {
// 默认一个系统允许的最大值
next := int64(maxWhen)
var pret *p
// Prevent allp slice changes. This is like retake.
lock(&allpLock)
for _, pp := range allp {
if pp == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
// 如果 timer0When 字段值为0, 则表示当前P没有定时器
w := int64(atomic.Load64(&pp.timer0When))
if w != 0 && w < next {
next = w
pret = pp
}
w = int64(atomic.Load64(&pp.timerModifiedEarliest))
if w != 0 && w < next {
next = w
pret = pp
}
}
unlock(&allpLock)
return next, pret
}
调用 此函数可以返回所有P中的需要执行的下一个计时器的触发时间及其所在P
, 当前函数只能在 sysmon监控线程和 checkdead 检查死锁函数中调用 。
此操作需要添加全局 allpLock
锁。通过遍历所有P
中的 pp.timer0When
和 pp.timerModifiedEarliest
字段来找出最小值,其中 pp.timer0When
即 pp.timers[0]
元素的 when
值。如果当前系统还没有计时器的话,则会返回默认的最大值,则 sysmon
会直接忽略处理。
可以看到这里如果直接通过直接判断字段 pp.timers[0]
的话,由于其数据类型为切片,所以需要加锁,开销有些大。
定时器的管理 checkTimers
检查指定P上面所有timers 是否已准备就绪。
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {}
如果参数 now
值为非 ``,则表示当前时间;如果为0,则返回的值 rnow
可能为当前时间或0,且返回下一个计时器应该运行的时间,如果没有下一个计时器,则为0,并报告(参数ran )计时器是否在运行;如果下一个计时器运行的时间为非0,它的返回值总是大于返回的时间;
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// 如果当前p 没有任何定时器或第一次调整计时器,则什么也不做
next := int64(atomic.Load64(&pp.timer0When))
nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
if next == 0 || (nextAdj != 0 && nextAdj < next) {
next = nextAdj
}
if next == 0 {
// 直接返回
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next {
// Next timer is not ready to run, but keep going
// if we would clear deleted timers.
// This corresponds to the condition below where
// we decide whether to call clearDeletedTimers.
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
lock(&pp.timersLock)
// 调整所有定时器
if len(pp.timers) > 0 {
adjusttimers(pp, now)
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
// 如果当前所检查的P正是当前使用的P,则调用 clearDeleteTimers 函数清理掉所有被标记为删除的timer,将其从P的heap中删除,以减少对 timersLock 锁的使用。
// 再次说明了每个P只能清理自己的timer
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return now, pollUntil, ran
}
调整计时器 adjusttimers
在当前 P 的堆中查找任何已修改为更早运行的定时器,并将它们放在堆中的正确位置。
在查找这些计时器时,它还会移动已修改为稍后运行的计时器,并删除已删除的计时器。 调用者必须锁定 pp 的计时器。
/src/runtime/time.go#L653
func adjusttimers(pp *p, now int64) {
first := atomic.Load64(&pp.timerModifiedEarliest)
if first == 0 || int64(first) > now {
if verifyTimers {
verifyTimerHeap(pp)
}
return
}
// We are going to clear all timerModifiedEarlier timers.
atomic.Store64(&pp.timerModifiedEarliest, 0)
var moved []*timer
for i := 0; i < len(pp.timers); i++ {
t := pp.timers[i]
if t.pp.ptr() != pp {
throw("adjusttimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
if atomic.Cas(&t.status, s, timerRemoving) {
changed := dodeltimer(pp, i)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
// Now we can change the when field.
t.when = t.nextwhen
// Take t off the heap, and hold onto it.
// We don't add it back yet because the
// heap manipulation could cause our
// loop to skip some other timer.
changed := dodeltimer(pp, i)
moved = append(moved, t)
// Go back to the earliest changed heap entry.
// "- 1" because the loop will add 1.
i = changed - 1
}
case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
badTimer()
case timerWaiting:
// OK, nothing to do.
case timerModifying:
// Check again after modification is complete.
osyield()
i--
default:
badTimer()
}
}
if len(moved) > 0 {
addAdjustedTimers(pp, moved)
}
if verifyTimers {
verifyTimerHeap(pp)
}
}
遍历 p.timers 字段,根据每个计时器状态分别处理
- timerDeleted:状态转换。
timerDeleted
->timerRemoving
->timerRemoved
,更新删除定时器统计p.deletedTimers
值 - timerModifiedEarlier, timerModifiedLater:转换为
timerMoving
状态,同步t.when字(t.when=t.nextwhen
)。将删除的定时器放在moved
切片中最后将 moved 中的所有 timer 重新添加到 timer heap中。// addAdjustedTimers adds any timers we adjusted in adjusttimers // back to the timer heap. func addAdjustedTimers(pp *p, moved []*timer) { for _, t := range moved { doaddtimer(pp, t) if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() } } }
清理计时器 clearDeletedTimers
此函数是除了STW期间运行 moveTimers 之外的唯一一个遍历整个timer heap 的函数。此函数的执行一定要持有 timerLock 锁
func clearDeletedTimers(pp *p) {
// 清理掉P中的 timerModifiedEarlier 计时器
// Do this now in case new ones show up while we are looping.
atomic.Store64(&pp.timerModifiedEarliest, 0)
// 删除的数量
cdel := int32(0)
// 切片索引位置,首个要删除的索引位置
to := 0
changedHeap := false
timers := pp.timers
// 遍历当前P所有的计时器,针对不同状态作相应的处理,最张将要清除的定时器移到最后面
nextTimer:
for _, t := range timers {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if changedHeap {
timers[to] = t
siftupTimer(timers, to)
}
to++
continue nextTimer
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
t.when = t.nextwhen
timers[to] = t
siftupTimer(timers, to)
to++
changedHeap = true
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
continue nextTimer
}
case timerDeleted:
// 真正的删除操作
if atomic.Cas(&t.status, s, timerRemoving) {
t.pp = 0
cdel++
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
changedHeap = true
continue nextTimer
}
case timerModifying:
// Loop until modification complete.
osyield()
case timerNoStatus, timerRemoved:
// We should not see these status values in a timer heap.
badTimer()
case timerRunning, timerRemoving, timerMoving:
// Some other P thinks it owns this timer,
// which should not happen.
badTimer()
default:
badTimer()
}
}
}
// 清理定时器,以便GC回收,同时更新统计数据
for i := to; i < len(timers); i++ {
timers[i] = nil
}
// 更新统计数据
atomic.Xadd(&pp.deletedTimers, -cdel)
atomic.Xadd(&pp.numTimers, -cdel)
// 更新 p.timers 字段
timers = timers[:to]
pp.timers = timers
updateTimer0When(pp)
// 检查堆的状态
if verifyTimers {
verifyTimerHeap(pp)
}
}
可以看到在清理过程中,会对堆元素进行位置的调整,将所有要删除的定时器放在切片的尾部,最后通过索引位置进行 nil
操作,以便GC回收,剩下的就是一些更新统计 p.deleteTimers
、p.numTimers
字段和重置 updateTimer0When
操作。
移除计时器 dodeltimer
我们上面已介绍过另一个移除计时器的函数 dodeltimer0() ,它实现移除堆顶计时器。这里我们介绍的是另一个相同功能的函数,可以实现自定义移除指定索引位置的元素。
dodeltimer 实现从p的 timer heap 中移除索引值为 i
的计时器,并返回最小的调整索引大小值。
// dodeltimer removes timer i from the current P's heap.
// We are locked on the P when this is called.
// It returns the smallest changed index in pp.timers.
// The caller must have locked the timers for pp.
func dodeltimer(pp *p, i int) int {
if t := pp.timers[i]; t.pp.ptr() != pp {
throw("dodeltimer: wrong P")
} else {
// 解除计时器与 P 的绑定关系
t.pp = 0
}
last := len(pp.timers) - 1
if i != last {
pp.timers[i] = pp.timers[last]
}
pp.timers[last] = nil
pp.timers = pp.timers[:last]
smallestChanged := i
if i != last {
// Moving to i may have moved the last timer to a new parent,
// so sift up to preserve the heap guarantee.
// 重新调整位置
smallestChanged = siftupTimer(pp.timers, i)
siftdownTimer(pp.timers, i)
}
if i == 0 {
// 删除的是当前仅有的一个timer,同重置 time0When 字段
updateTimer0When(pp)
}
atomic.Xadd(&pp.numTimers, -1)
return smallestChanged
}
操作流程
- 解除索引值为i 位置的计时器与P的绑定关系
- 将最后一个元素赋值到索引 i 位置
- 将最后一个元素的位置置为
nil
- 重新调整堆中 i 位置的索引,分别调用
siftupTimer
和siftdownTimer