Runtime: goroutine的暂停和恢复源码剖析

上一节《GC 对根对象扫描实现的源码分析》中,我们提到过在GC的时候,在对一些goroutine 栈进行扫描时,会在其扫描前台触发 G 的暂停(suspendG)和恢复(resumeG)。

// markroot scans the i'th root.
//
// Preemption must be disabled (because this uses a gcWork).
//
// nowritebarrier is only advisory here.
//
//go:nowritebarrier
func markroot(gcw *gcWork, i uint32) {
	baseFlushCache := uint32(fixedRootCount)
	baseData := baseFlushCache + uint32(work.nFlushCacheRoots)
	baseBSS := baseData + uint32(work.nDataRoots)
	baseSpans := baseBSS + uint32(work.nBSSRoots)
	baseStacks := baseSpans + uint32(work.nSpanRoots)
	end := baseStacks + uint32(work.nStackRoots)

	// Note: if you add a case here, please also update heapdump.go:dumproots.
	switch {
		......

	default:
		var gp *g
		if baseStacks <= i && i < end {
			gp = allgs[i-baseStacks]
		} else {
			throw("markroot: bad index")
		}

		status := readgstatus(gp) // We are not in a scan state
		if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 {
			gp.waitsince = work.tstart
		}

		// scanstack must be done on the system stack in case
		// we're trying to scan our own stack.
		systemstack(func() {
			userG := getg().m.curg
			selfScan := gp == userG && readgstatus(userG) == _Grunning
			if selfScan {
				casgstatus(userG, _Grunning, _Gwaiting)
				userG.waitreason = waitReasonGarbageCollectionScan
			}

			// TODO: suspendG blocks (and spins) until gp
			// stops, which may take a while for
			// running goroutines. Consider doing this in
			// two phases where the first is non-blocking:
			// we scan the stacks we can and ask running
			// goroutines to scan themselves; and the
			// second blocks.
			stopped := suspendG(gp)
			if stopped.dead {
				gp.gcscandone = true
				return
			}
			if gp.gcscandone {
				throw("g already scanned")
			}
			scanstack(gp, gcw)
			gp.gcscandone = true
			resumeG(stopped)

			if selfScan {
				casgstatus(userG, _Gwaiting, _Grunning)
			}
		})
	}
}

那么它在暂停和恢复一个goroutine时都做了些什么工作呢,今天我们通过源码来详细看一下。 go version 1.16.2

G的抢占

一个G可以在任何 安全点(safe-point) 被抢占,目前安全点可以分为以下几类:

  1. 阻塞安全点出现在 goroutine 被取消调度、同步阻塞或系统调用期间;
  2. 同步安全点出现在运行goroutine检查抢占请求时;
  3. 异步安全点出现在用户代码中的任何指令上,其中G可以安全的暂停且可以保证堆栈和寄存器扫描找到 stack root(这个很重要,GC扫描开始的地方)。runtime 可以通过一个信号在一个异步安全点暂停一个G。

这里将安全点分为 阻塞安全点同步安全点异步安全点,每种安全点都出现在不同的场景。

阻塞安全点和同步安全点,一个G的CPU状态是最小的(无法理解这里最小的意思)。垃圾回收器拥有整个stack的完整信息。这样就有可能使用最小的空间重新调度G,并精确的扫描G的 栈。

同步安全点是通过在重载函数序言中stack bound check(栈边界检查)实现的。在下一个同步安全点抢占G,runtime 在G的 stack绑定一个值,该值将导致下一个 stack bound check 失败,从而进入栈的增涨实现,此实现将检测到它实际上是抢占并重写向到抢占处理逻辑。

异步安全点抢占是通过操作系统(如:信号)挂起一个线程并检查它的状态以确定G是否处于一个异步安全点。由于挂起线程本身是异步的,它将检查运行的G是否需要被抢占,这将引起一些改变。如果所有条件都满足,它将调整信号上下文,使其看起来像刚刚发起调用的asyncPreempt(异步抢占)信号线程并恢复此线程。asyncPreempt溢出所有寄存器并进入调度程序。

(另一种方法是抢占信号处理程序本身。这将允许操作系统保存和恢复寄存器状态,运行时只需要知道如何从信号上下文中提取可能包含指针的寄存器。但是,这将为每个抢占的G消耗一个M,并且调度器本身并不是设计为从信号处理程序运行的,因为它倾向于在抢占路径中分配内存和启动线程)

暂停状态

在G的暂停状态没有使用一个单独的变量来表示,而是通过一个 suspendGState 的结构体来表示。

type suspendGState struct {
	g *g
	dead bool
	stopped bool
}

字段意义:

  • g 表示当前暂停的G,将其放在状态结构体中,这样直需要什么一个结构体就可以了,不需要再单独占用一个参数来表示暂停的哪个G;
  • dead 表示当前G并没有暂停,而是处于 _Gdead 状态。这个 G 可以以后被复用,因为调用者不能一直认为它是 _Gdead 状态,见G的状态流转图;
  • stopped 表示通过 g.preemptStop 将G转换为 _Gwaiting 状态,因此负责在完成时做好准备

暂停G (suspendG)

在安全点暂停G将返回一个 suspendGState 结构体的状态值,调用者在此期间将一直拥有此G的读权限,直到恢复 resumeG 为止。

多个调用者在同一时间试图suspend同一个G时,它是安全的。goroutine 可以在后续成功挂起操作之间执行。当前实现授予对G的独占访问权限,所以多个调用者将会序列化。但是,其目的是授予共享read权限,所以不要依赖独占访问。

suspend操作必须在系统栈执行,并且在M(如果有的话)上的用户goroutine必须处于一个可抢占的状态。这样可以防止两个goroutine试图互相挂起并且都处于非抢占状态时出现死锁。有其它的方式来解决这个死锁,但看起来非常的简单。

// go:systemstack
func suspendG(gp *g) suspendGState {
	// 当前暂停的G正是自己,且自己还处于_Grunning,直接抛出异常
	if mp := getg().m; mp.curg != nil && readgstatus(mp.curg) == _Grunning {
		throw("suspendG from non-preemptible goroutine")
	}

	const yieldDelay = 10 * 1000
	var nextYield int64

	stopped := false
	var asyncM *m
	var asyncGen uint32
	var nextPreemptM int64
	for i := 0; ; i++ {
		switch s := readgstatus(gp); s {
		default:
			if s&_Gscan != 0 {
				break
			}

			dumpgstatus(gp)
			throw("invalid g status")
		case _Gdead:
			return suspendGState{dead: true}
		case _Gcopystack:

		case _Gpreempted:
			if !casGFromPreempted(gp, _Gpreempted, _Gwaiting) {
				break
			}

			stopped = true

			s = _Gwaiting
			fallthrough
		case _Grunnable, _Gsyscall, _Gwaiting:
			if !castogscanstatus(gp, s, s|_Gscan) {
				break
			}

			gp.preemptStop = false
			gp.preempt = false
			gp.stackguard0 = gp.stack.lo + _StackGuard
			return suspendGState{g: gp, stopped: stopped}
		case _Grunning:
			if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen {
				break
			}

			// Temporarily block state transitions.
			if !castogscanstatus(gp, _Grunning, _Gscanrunning) {
				break
			}

			// Request synchronous preemption.
			gp.preemptStop = true
			gp.preempt = true
			gp.stackguard0 = stackPreempt

			// Prepare for asynchronous preemption.
			asyncM2 := gp.m
			asyncGen2 := atomic.Load(&asyncM2.preemptGen)
			needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
			asyncM = asyncM2
			asyncGen = asyncGen2

			casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)

			if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
				now := nanotime()
				if now >= nextPreemptM {
					nextPreemptM = now + yieldDelay/2
					preemptM(asyncM)
				}
			}
		}

		if i == 0 {
			nextYield = nanotime() + yieldDelay
		}
		if nanotime() < nextYield {
			procyield(10)
		} else {
			osyield()
			nextYield = nanotime() + yieldDelay/2
		}
	}

整体流程是通过一个 for 方法,不断的检查G的状态并在合适的机会返回suspendGState。

  • _Gdead 已处于 dead状态,直接返回 suspendGState{dead: true},注意这时没有g;
  • _Gcopystack 处于复制stack状态,当前处于栈的扩容或缩减,继续等待直到完成;
  • _Gpreempted 可抢占状态;将G变为 _Gwaiting 状态,同时设置变量 stopped=true。继续等待;
  • _Grunnable, _Gsyscall, _Gwaiting : 标记为扫描状态;取消抢占请求等,返回 suspendGState{g: gp, stopped: true};
  • _Grunning 这里指非当前G的运行状态; 先将 _Grunning 变为 _Gscanrunning;设置同步抢占标记并做一些抢占准备,再恢复 _Grunning 状态;最后再发送异步抢占

这里提到过几个与转换G状态的函数,如casfrom_Gscanstatus()castogscanstatus()casGFromPreempted()

恢复G (resumeG)

所谓恢复G就是指暂停的撤销,允许暂停的G从当前 安全点(safe-point) 继续执行。

func resumeG(state suspendGState) {
	if state.dead {
		// We didn't actually stop anything.
		return
	}

	gp := state.g
	switch s := readgstatus(gp); s {
	default:
		dumpgstatus(gp)
		throw("unexpected g status")

	case _Grunnable | _Gscan,
		_Gwaiting | _Gscan,
		_Gsyscall | _Gscan:
		casfrom_Gscanstatus(gp, s, s&^_Gscan)
	}

	if state.stopped {
		// We stopped it, so we need to re-schedule it.
		ready(gp, 0, true)
	}
}

主要是最后一句,调用 ready() ,将其G设置为运行 _Grunnable 状态,这样 G 就可以在下次被立即执行。

总结

可以看到对G的暂停和恢复,其实是对G 的状态进行改变。对于suspend操作只会在安全点才会发生,它会一直重试尝试着修改G的状态,同时会对一些抢占标记做一些修改直到修改成功为止。

参考资料

由于个人能力有限,文章中若有错误,可以联系本人指证。