Golang 的调度策略

我们上篇文章(Golang 的底层引导流程/启动顺序)介绍了一个golang程序的启动流程,在文章的最后对于最重要的一点“调度“ (函数  schedule()) 并没有展开来讲,今天我们继续从源码来分析一下它的调度机制。

在此之前我们要明白golang中的调度主要指的是什么?在 src/runtime/proc.go 文件里有一段注释这样写到

// Goroutine scheduler
// The scheduler’s job is to distribute ready-to-run goroutines over worker threads.

这里指如何找一个已准备好运行的 G 关联到PM 让其执行。对于G 的调度可以围绕三个方面来理解:

  • 时机:什么时候关联(调度)。对于调度时机一般是指有空闲P的时候都会去找G执行
  • 对象:选择哪个G进行调度。这是我们本篇要讲的内容
  • 机制:如何调度。execute() 函数

理解了这三个问题,基本也就明白了它的调度策略了,本篇主要对G的获取。

源文件 src/runtime/proc.go , go version 1.15.6

获取G流程

下面我们看一下 schedule() 函数

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {}

从注释我们可得知以下几点:

  1. 调度是一轮一轮执行的,并不是只执行一次
  2. 调度的工作就是找到一个 runnable 状态的G,对于G的选择可直接理解为调度,至少大部分场景下我们谈论的是指G的调度
  3. 当前这个调度并无返回值,也就是说在当函数执行结束时就代表当前一轮的调度已结束(不严谨)。剩下的执行此函数后面的程序
func schedule() {
	// 获取当前 g0
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("schedule: holding locks")
	}

	if _g_.m.lockedg != 0 {
		stoplockedm()
		execute(_g_.m.lockedg.ptr(), false) // Never returns.
	}

	// We should not schedule away from a g that is executing a cgo call,
	// since the cgo call is using the m's g0 stack.
	// 不允许cgo正在使用g0栈
	if _g_.m.incgo {
		throw("schedule: in cgo")
	}

top:
	pp := _g_.m.p.ptr()
	pp.preempt = false

	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	if pp.runSafePointFn != 0 {
		runSafePointFn()
	}

	// Sanity check: if we are spinning, the run queue should be empty. 
	// 健全性检查:如果有m正在spinning 的话,则g的运行队列应该是空的才对,这个应该很好理解
	// Check this before calling checkTimers, as that might call
	// goready to put a ready goroutine on the local run queue.
	// 健全性检查必须要在调用 checkTimers() 之前进行检查,这是因为有可能goready()在本地运行队列放了一个就绪的goroutine
	if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}

	// 重点关注 定时器处理
	checkTimers(pp, 0)

	var gp *g
	var inheritTime bool

	// Normal goroutines will check for need to wakeP in ready,
	// but GCworkers and tracereaders will not, so the check must
	// be done here instead.
	tryWakeP := false
	if trace.enabled || trace.shutdown {
		gp = traceReader()
		if gp != nil {
			casgstatus(gp, _Gwaiting, _Grunnable)
			traceGoUnpark(gp, 0)
			tryWakeP = true
		}
	}
	if gp == nil && gcBlackenEnabled != 0 {
		gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
		tryWakeP = tryWakeP || gp != nil
	}
	if gp == nil {
		// Check the global runnable queue once in a while to ensure fairness.
		// Otherwise two goroutines can completely occupy the local runqueue
		// by constantly respawning each other.
		// 为了确保公平性,调度器会每经过61次调度就直接从全局g运行队列获取1个G,否则直接从本地g运行队列获取
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}

	// 从与当前m关联的p本地g队列获取,优先读取p.runnext的G,如果为nil再从队列里取
	if gp == nil {
		gp, inheritTime = runqget(_g_.m.p.ptr())
		// We can see gp != nil here even if the M is spinning,
		// if checkTimers added a local goroutine via goready.
	}

	// 以阻塞方式获取一个g, 重点关注函数 finrunnable() 函数
	if gp == nil {
		gp, inheritTime = findrunnable() // blocks until work is available
	}

	// This thread is going to run a goroutine and is not spinning anymore,
	// so if it was marked as spinning we need to reset it now and potentially
	// start a new spinning M.
	// 如果当前m处于 spinning 状态,则进行重置,因为它要执行当前获取到的G
	if _g_.m.spinning {
		resetspinning()
	}

	if sched.disable.user && !schedEnabled(gp) {
		// Scheduling of this goroutine is disabled. Put it on
		// the list of pending runnable goroutines for when we
		// re-enable user scheduling and look again.
		lock(&sched.lock)
		if schedEnabled(gp) {
			// Something re-enabled scheduling while we
			// were acquiring the lock.
			unlock(&sched.lock)
		} else {
			sched.disable.runnable.pushBack(gp)
			sched.disable.n++
			unlock(&sched.lock)
			goto top
		}
	}

	// If about to schedule a not-normal goroutine (a GCworker or tracereader),
	// wake a P if there is one.
	// 如果调度的是一个gcworker或traceeader,则唤醒一个P (gc也是并发的)
	if tryWakeP {
		wakep()
	}
	if gp.lockedm != 0 {
		// Hands off own p to the locked m,
		// then blocks waiting for a new p.
		startlockedm(gp)
		goto top
	}

	// 执行G(将G运行在一个m上)
	execute(gp, inheritTime)
}

可以看到,整个 schedue() 函数主要工作就是获取一个 _Grunnable 状态的G,大概获取流程如下

  • 调度器每调度 61 次,则调用 globrunqget() 函数从全局G队列里获取一个G, 主要是为了确保公平性,也就是说有1/61的机会从优先从全局队列里获取G。这一步操作很少有开发者提起。
  • 如果获取不到,则通过 runqget() 函数从P中获取G。获取时优先使用 p.runnext 的G(亲和性),如果没有G,再从P的运行队列里获取
  • 如果仍未获取,则最后通过调用 finrunnable() 函数以阻塞形式获取一个G

找到一个G后,然后调用 execute() 函数在 m 上运行G, 整个调度流程如图所示

对于函数 checkTimers() 这里不再介绍。

全局队列获取(1/61 机会)

为了公平性考虑,调度器每调度61次会从全局队列里获取一个G执行,前提是队列里有G可用。实现函数 globrunqget(), 这里指定了获取1个G。

// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
	if sched.runqsize == 0 {
		return nil
	}

	// 根据P数量和全局队列g的总量,计算每个P可分到的数量
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize
	}

	// 平均每个P可分得数量 > 所要获取的数量,则直接使用用户指定的数量个数
	if max > 0 && n > max {
		n = max
	}

	// 如果获取数量>P本地队列数量的1/2,则最多获取一半。 runq 是一个长度为256的数组
	if n > int32(len(_p_.runq))/2 {
		n = int32(len(_p_.runq)) / 2
	}

	// 更新全局队列G数量
	sched.runqsize -= n

	gp := sched.runq.pop()
	n--

	// 批量将G 从全局队列里转移到p的本地队列
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
		runqput(_p_, gp1, false)
	}
	return gp
}

可以看到当用户从全局队列里获取G的时候,同时也会根据当前 p.runq 数量情况将全局队列里的G放在P的本地队列里(runqput)。

本地队列获取

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
	// If there's a runnext, it's the next G to run.
	// 优先从 runnext 字段获取下一个运行的G
	// 利用 for 进行自旋获取
	for {
		next := _p_.runnext
		if next == 0 {
			break
		}
		if _p_.runnext.cas(next, 0) {
			return next.ptr(), true
		}
	}

	// 利用循环获取本地队列G
	for {
		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
		t := _p_.runqtail
		if t == h {
			return nil, false
		}
		gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
		if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
			return gp, false
		}
	}
}

这里优先从p._runnext字段获取G,如果字段为空的话,则再从本地列队里获取。从本地队列获取时使用了for+cas来实现,这是因为有可能多个线程同时读取。这里使用了 atomic.LoadAcq() atomic.CasRel() 来实现了原子操作语义,函数基于汇编实现,函数位置视用户平台架构不同而异。

  • 优先从 p._runnext 字段获取
  • 从 p 的本地运行队列 _runq 字段获取一个

阻塞式获取

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()

	......

	// local runq
	if gp, inheritTime := runqget(_p_); gp != nil {
		return gp, inheritTime
	}

	// global runq
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false
		}
	}

	// Poll network.
	// This netpoll is only an optimization before we resort to stealing.
	// We can safely skip it if there are no waiters or a thread is blocked
	// in netpoll already. If there is any kind of logical race with that
	// blocked thread (e.g. it has already returned from netpoll, but does
	// not set lastpoll yet), this thread will do blocking netpoll below
	// anyway.
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
		if list := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			injectglist(&list)
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false
		}
	}

	// Steal work from other P's.
	procs := uint32(gomaxprocs)
	ranTimer := false
	// If number of spinning M's >= number of busy P's, block.
	// This is necessary to prevent excessive CPU consumption
	// when GOMAXPROCS>>1 but the program parallelism is low.
	// 当spinning的 M>P 的数量时,为了节省CPU资源则会直接阻塞
	if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
		goto stop
	}
	if !_g_.m.spinning {
		_g_.m.spinning = true
		atomic.Xadd(&sched.nmspinning, 1)
	}

	// 如果经过一轮窃取失败了,则再试一轮,最多重试四轮
	// 选择P的算法,重点
	for i := 0; i < 4; i++ {
		// 遍历一轮所有P,每次遍历开始时随机选择一个位置,直到所有P都遍历完
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				goto top
			}

			// 前三次未获取到G, 当最后一次获取时,stealRunNextG 为true,表示直接从p 的 runnext 字段获取G
			stealRunNextG := i > 2 // first look for ready queues with more than 1 g

			// 随机获取一个P
			p2 := allp[enum.position()]
			if _p_ == p2 {
				continue
			}

			// 从P中窃取G
			if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
				return gp, false
			}

			.....
		}
	}
	.....

}

对于阻塞式获取方式又分以下顺序

  1. 调用 runqget() 从当前P运行队列获取
  2. 调用 globrunqget() 从全局队列获取
  3. 从网络轮询器获取
  4. 从其它P窃取,具体从哪个P获取是基于一个算法来实现的

可以看到当前方式最多从四个地方获取G。

窃取算法:为了保证公平性,遍历allp时并不是固定的从allp[0]开始,而是从随机位置上的p开始,而且遍历的顺序也随机化了,并不是现在访问了第i个p下一次就访问第i+1个p,而是使用了一种伪随机的方式遍历allp中的每个p,防止每次遍历时使用同样的顺序访问allp中的元素。

下面举例说明一下上述算法过程,现假设nprocs为8,也就是一共有8个p。

如果第一次随机选择的 offset = 6,coprime = 3(3与8互质,满足算法要求)的话,则从allp切片中偷取的下标顺序为6, 1, 4, 7, 2, 5, 0, 3,计算过程:

6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3

如果第二次随机选择的 offset = 4,coprime = 5 的话,则从allp切片中偷取的下标顺序为1, 6, 3, 0, 5, 2, 7, 4,计算过程:

1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4

可以看到这种随机算法,偷取p的顺序也是随机的,但结果却仍然保证所有的p都有被有选择的机会。不管nprocs是多少,这个算法都可以保证经过nprocs次循环,每个p都可以得到访问。

切记:对于调度这一块不仅仅是获取一个G就结束了,剩下的还有获取P和M的过程,只是相对来说获取G的工作要多一些而已。

参考