Golang 基于信号的异步抢占与处理

在Go1.14版本开始实现了 基于信号的协程抢占调度 模式,在此版本以前执行以下代码是永远也无法执行完成。

本文基于go version 1.16

package main

import (
    "runtime"
    "time"
)

func main() {
    runtime.GOMAXPROCS(1)
    go func() {
        for {
        }
    }()

    time.Sleep(time.Millisecond)
    println("OK")
}

原因很简单:在main函数里只有一个CPU,从上到下执行到 time.Sleep() 函数的时候,会将 main goroutine 放出入运行队列,出让了P,开始执行匿名函数,但匿名函数是一个for循环,没有任何IO语句,也就无法引起对G的调度,所以当前仅有的一个P永远被其占用,导致无法打印OK。

这个问题在1.14版本开始有所改变,主要是因为引入了基于信号的抢占模式。在程序启动时,初始化信号,并在 runtime.sighandler 函数注册了 SIGURG 信号的处理函数 runtime.doSigPreempt,然后在触发垃圾回收的栈扫描时或执行 sysmon 监控线程,调用函数挂起goroutine,并向M发送信号,M收到信号后,会让当前goroutine陷入休眠继续执行其他的goroutine。

本篇从发送与接收信息并处理两方面来看一下它是如何实现的。

发送信号

在上篇文章(认识sysmon监控线程)介绍 sysmon 的时候,我们知道监控线程会在无P的情况下一直运行,定期扫描所有的P,将长时间运行的G 进行解除。

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
	......

	for {
		if idle == 0 { // start with 20us sleep...
			delay = 20
		} else if idle > 50 { // start doubling the sleep after 1ms...
			delay *= 2
		}
		if delay > 10*1000 { // up to 10ms
			delay = 10 * 1000
		}
		usleep(delay)


		......


		// retake P's blocked in syscalls
		// and preempt long running G's
		if retake(now) != 0 {
			idle = 0
		} else {
			idle++
		}
	}

	......

}

通过 retake() 函数对所有 P 进行检查。

// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil {
			// This can happen if procresize has grown
			// allp but not yet created new Ps.
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false

		if s == _Prunning || s == _Psyscall {
			// Preempt G if it's running for too long.
			// 如果 P 运行得太久, 则抢占 G
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {

				// 如果超过了10ms就需要进行抢占了
				preemptone(_p_)

				// In case of syscall, preemptone() doesn't
				// work, because there is no M wired to P.
				sysretake = true
			}
		}

		......
	}


}

如果一个 Ppd.schedwhen+forcePreemptNS <= now ,则说明P上的G运行的时间太长,则需要通过函数 preemptone() 进行抢占。

// src/runtime/proc.go

// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}

	// 被抢占的 goroutine
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}

	// 设置g的抢占标识
	gp.preempt = true


	// Every call in a go routine checks for stack overflow by
	// comparing the current stack pointer to gp->stackguard0.
	// Setting gp->stackguard0 to StackPreempt folds
	// preemption into the normal stack overflow check.
	// 设置栈抢占 stackPreempt,这是一个很大的值比任何栈都大,
	// 在 goroutine 内部的每次调用都会比较栈顶指针和 g.stackguard0,用以判断是否发生了栈溢出。
	gp.stackguard0 = stackPreempt

	// Request an async preemption of this P.
	// 对P发一个异步抢占请示
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	}

	return true
}

这里主要是设置抢占标识位,然后调用 preemptM() 函数发送一个抢占请求到m。

// src/runtime/signal_unix.go

const preemptMSupported = true

// preemptM sends a preemption request to mp. This request may be
// handled asynchronously and may be coalesced with other requests to
// the M. When the request is received, if the running G or P are
// marked for preemption and the goroutine is at an asynchronous
// safe-point, it will preempt the goroutine. It always atomically
// increments mp.preemptGen after handling a preemption request.
func preemptM(mp *m) {
	......
	if atomic.Cas(&mp.signalPending, 0, 1) {
		if GOOS == "darwin" || GOOS == "ios" {
			atomic.Xadd(&pendingPreemptSignals, 1)
		}

		// If multiple threads are preempting the same M, it may send many
		// signals to the same M such that it hardly make progress, causing
		// live-lock problem. Apparently this could happen on darwin. See
		// issue #37741.
		// Only send a signal if there isn't already one pending.
		signalM(mp, sigPreempt)
	}
	......
}

这里又调用了 signalM() 函数。

// src/runtime/os_darwin.go

func signalM(mp *m, sig int) {
	pthread_kill(pthread(mp.procid), uint32(sig))
}

对于 pthread_kill() 函数我们不再详细介绍。

以上就是发送抢占信号的基本流程,相应的也就应该有处理抢占信号的逻辑。

处理信号

给m发送的信息是 sigPreempt ,它是一个常量

const sigPreempt = _SIGURG

对于它的详细说明,可以参考官方注释文档。

程序在开始运行的时候,

// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
func initsig(preinit bool) {
	if !preinit {
		// It's now OK for signal handlers to run.
		signalsOK = true
	}

	// For c-archive/c-shared this is called by libpreinit with
	// preinit == true.
	if (isarchive || islibrary) && !preinit {
		return
	}

	for i := uint32(0); i < _NSIG; i++ {
		t := &sigtable[i]
		if t.flags == 0 || t.flags&_SigDefault != 0 {
			continue
		}

		// We don't need to use atomic operations here because
		// there shouldn't be any other goroutines running yet.
		fwdSig[i] = getsig(i)

		if !sigInstallGoHandler(i) {
			// Even if we are not installing a signal handler,
			// set SA_ONSTACK if necessary.
			if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
				setsigstack(i)
			} else if fwdSig[i] == _SIG_IGN {
				sigInitIgnored(i)
			}
			continue
		}

		handlingSig[i] = 1
		setsig(i, funcPC(sighandler)) // 注册信号对应的回调方法
	}
}

go 在启动的时候会把所有的信号都注册一次。

再通过函数 sighandler() 进行 _SIGURG信号回调机制的注册。

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
	......
	// 如果是抢占信号
	if sig == sigPreempt && debug.asyncpreemptoff == 0 {
		// Might be a preemption signal.
		doSigPreempt(gp, c)
		// Even if this was definitely a preemption signal, it
		// may have been coalesced with another signal, so we
		// still let it through to the application.
	}
	......
}

然后是执行抢占信号事件

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
	// Check if this G wants to be preempted and is safe to
	// preempt.
	if wantAsyncPreempt(gp) {
		if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
			// Adjust the PC and inject a call to asyncPreempt.

			// 执行抢占
			ctxt.pushCall(funcPC(asyncPreempt), newpc)
		}
	}

	// Acknowledge the preemption.
	atomic.Xadd(&gp.m.preemptGen, 1)
	atomic.Store(&gp.m.signalPending, 0)

	if GOOS == "darwin" || GOOS == "ios" {
		atomic.Xadd(&pendingPreemptSignals, -1)
	}
}

会先判断异步安全点 isAsyncSafePoint(), 然后返回一个可以插入调用 asyncPreempt() 的PC。

参考资料