Golang 的调度策略之G的窃取
By admin
- 6 minutes read - 1255 words我们上篇文章( Golang 的底层引导流程/启动顺序)介绍了一个golang程序的启动流程,在文章的最后对于最重要的一点“调度
“ (函数 [schedule()](https://github.com/golang/go/blob/go1.15.6/src/runtime/proc.go#L2607-L2723)
) 并没有展开来讲,今天我们继续从源码来分析一下它的调度机制。
在此之前我们要明白golang中的调度主要指的是什么?在 src/runtime/proc.go
文件里有一段注释这样写到
// Goroutine scheduler
// The scheduler’s job is to distribute ready-to-run goroutines over worker threads.
这里指如何找一个已准备好运行的 G 关联到PM 让其执行。对于G 的调度可以围绕三个方面来理解:
- 时机:什么时候关联(调度)。对于调度时机一般是指有空闲P的时候都会去找G执行
- 对象:选择哪个G进行调度。这是我们本篇要讲的内容
- 机制:如何调度。
execute()
函数
理解了这三个问题,基本也就明白了它的调度策略了,本篇主要对G的获取。
源文件 [src/runtime/proc.go](https://github.com/golang/go/blob/go1.15.6/src/runtime/proc.go)
, go version 1.15.6
获取G流程
下面我们看一下 schedule()
函数
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {}
从注释我们可得知以下几点:
- 调度是一轮一轮执行的,并不是只执行一次
- 调度的工作就是找到一个
runnable
状态的G,对于G的选择可直接理解为调度(实际上这只是调试中的前半部分,找到G 还要把它放在一个位置去执行),至少大部分场景下我们谈论的是指G的调度 - 当前这个调度并无返回值,也就是说在当函数执行结束时就代表当前一轮的调度已结束(不严谨)。剩下的执行此函数后面的程序
func schedule() {
// 获取当前 g0
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
// 不允许cgo正在使用g0栈
if _g_.m.incgo {
throw("schedule: in cgo")
}
top:
pp := _g_.m.p.ptr()
pp.preempt = false
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}
// Sanity check: if we are spinning, the run queue should be empty.
// 健全性检查:如果有m正在spinning 的话,则g的运行队列应该是空的才对,这个应该很好理解
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
// 健全性检查必须要在调用 checkTimers() 之前进行检查,这是因为有可能goready()在本地运行队列放了一个就绪的goroutine
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
// 重点关注 定时器处理
checkTimers(pp, 0)
var gp *g
var inheritTime bool
// Normal goroutines will check for need to wakeP in ready,
// but GCworkers and tracereaders will not, so the check must
// be done here instead.
tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 为了确保公平性,调度器会每经过61次调度就直接从全局g运行队列获取1个G,否则直接从本地g运行队列获取
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从与当前m关联的p本地g队列获取,优先读取p.runnext的G,如果为nil再从队列里取
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
// 以阻塞方式获取一个g, 重点关注函数 finrunnable() 函数
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
// 如果当前m处于 spinning 状态,则进行重置,因为它要执行当前获取到的G
if _g_.m.spinning {
resetspinning()
}
if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled. Put it on
// the list of pending runnable goroutines for when we
// re-enable user scheduling and look again.
lock(&sched.lock)
if schedEnabled(gp) {
// Something re-enabled scheduling while we
// were acquiring the lock.
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}
// If about to schedule a not-normal goroutine (a GCworker or tracereader),
// wake a P if there is one.
// 如果调度的是一个gcworker或traceeader,则唤醒一个P (gc也是并发的)
if tryWakeP {
wakep()
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
// 执行G(将G运行在一个m上)
execute(gp, inheritTime)
}
可以看到,整个 schedue()
函数主要工作就是获取一个 _Grunnable
状态的G,大概获取流程如下
- 调度器每调度
61
次,则调用[globrunqget](https://github.com/golang/go/blob/go1.15.6/src/runtime/proc.go#L5065-L5092)()
函数从全局G队列里获取一个G, 主要是为了确保公平性,也就是说有1/61的机会从优先从全局队列里获取G。这一步操作很少有开发者提起。 - 如果获取不到,则通过
runqget()
函数从P中获取G。获取时优先使用p.runnext
的G(亲和性),如果没有G,再从P的运行队列里获取 - 如果仍未获取,则最后通过调用
[finrunnable()](https://github.com/golang/go/blob/go1.15.6/src/runtime/proc.go#L2187-L2487)
函数以阻塞形式获取一个G
找到一个G后,然后调用 [execute()](https://github.com/golang/go/blob/9b955d2d3fcff6a5bc8bce7bafdc4c634a28e95b/src/runtime/proc.go#L2145-L2185)
函数在 m
上运行G, 整个调度流程如图所示
对于函数 [checkTimers()](https://github.com/golang/go/blob/9b955d2d3fcff6a5bc8bce7bafdc4c634a28e95b/src/runtime/proc.go#L2739-L2802)
这里不再介绍。
全局队列获取(1/61 机会)
为了公平性考虑,调度器每调度61次会从全局队列里获取一个G执行,前提是队列里有G可用。实现函数 globrunqget()
, 这里指定了获取1个G。
// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
// 根据P数量和全局队列g的总量,计算每个P可分到的数量
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
// 平均每个P可分得数量 > 所要获取的数量,则直接使用用户指定的数量个数
if max > 0 && n > max {
n = max
}
// 如果获取数量>P本地队列数量的1/2,则最多获取一半。 runq 是一个长度为256的数组
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
// 更新全局队列G数量
sched.runqsize -= n
gp := sched.runq.pop()
n--
// 批量将G 从全局队列里转移到p的本地队列
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
可以看到当用户从全局队列里获取G的时候,同时也会根据当前 p.runq
数量情况将全局队列里的G放在P的本地队列里(runqput
)。
本地队列获取
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
// 优先从 runnext 字段获取下一个运行的G
// 利用 for 进行自旋获取
for {
next := _p_.runnext
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
// 利用循环获取本地队列G
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
这里优先从p._runnext
字段获取G,如果字段为空的话,则再从本地列队里获取。从本地队列获取时使用了for+cas来实现,这是因为有可能多个线程同时读取。这里使用了 atomic.LoadAcq()
和 atomic.CasRel()
来实现了原子操作语义,函数基于汇编实现,函数位置视用户平台架构不同而异。
- 优先从 p._runnext 字段获取
- 从 p 的本地运行队列 _runq 字段获取一个
阻塞式获取
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
......
// local runq
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 全局 runq 的头部获取一个G
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// Steal work from other P's.
procs := uint32(gomaxprocs)
ranTimer := false
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
// 当spinning的 M>P 的数量时,为了节省CPU资源则会直接阻塞
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// 如果经过一轮窃取失败了,则再试一轮,最多重试四轮
// 选择P的算法,重点
for i := 0; i < 4; i++ {
// 遍历一轮所有P,每次遍历开始时随机选择一个位置,直到所有P都遍历完
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
// 前三次未获取到G, 当最后一次获取时,stealRunNextG 为true,表示直接从p 的 runnext 字段获取G
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 随机获取一个P
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// 从P中窃取G
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
.....
}
}
.....
}
对于阻塞式获取方式又分以下顺序
- 调用
runqget()
从当前P运行队列首部获取G - 调用
globrunqget()
从全局队列首部获取G - 从网络轮询器获取
- 从其它P窃取,具体从哪个P获取是基于一个算法来实现的
可以看到当前方式最多从四个地方获取G。
窃取算法:为了保证公平性,遍历allp时并不是固定的从allp[0]开始,而是从随机位置上的p开始,而且遍历的顺序也随机化了,并不是现在访问了第i
个p下一次就访问第i+1
个p,而是使用了一种伪随机的方式遍历allp中的每个p,防止每次遍历时使用同样的顺序访问allp中的元素。
下面举例说明一下上述算法过程,现假设nprocs为8,也就是一共有8个p。
如果第一次随机选择的 offset = 6,coprime = 3
(3与8互质,满足算法要求)的话,则从allp切片中偷取的下标顺序为6, 1, 4, 7, 2, 5, 0, 3,计算过程:
6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3
如果第二次随机选择的 offset = 4,coprime = 5
的话,则从allp切片中偷取的下标顺序为1, 6, 3, 0, 5, 2, 7, 4,计算过程:
1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4
可以看到这种随机算法,偷取p的顺序也是随机的,但结果却仍然保证所有的p都有被有选择的机会。不管nprocs是多少,这个算法都可以保证经过nprocs次循环,每个p都可以得到访问。
切记:对于调度这一块不仅仅是获取一个G就结束了,剩下的还有获取P和M的过程,只是相对来说获取G的工作要多一些而已。
参考
- https://mp.weixin.qq.com/s/WZqiGvIMyR7QYmGgSWg60g
- https://blog.haohtml.com/archives/21010
- https://mp.weixin.qq.com/s/o2UmxmFFOwUAkgE-Ao9MsA