用 Goalng 开发 OPA 策略

Open Policy Agent 简称OPA是一个开源的通用策略引擎,可在整个堆栈中实现统一的、上下文感知的策略实施。OPA 已经成为了云原生计算基金会 (CNCF) 领域的毕业项目,已经在 Kubernetes / Istio 等多个知名项目里使用 。

OPA的核心思想就是策略即代码。

它使用Rego语言开发,Rego 的灵感来自 Datalog,它是一种易于理解、已有数十年的历史的查询语言。Rego 扩展了 Datalog 以支持 JSON 等文档模型。对于它的详细介绍请参考官方文档 https://www.openpolicyagent.org/docs/latest/policy-language/#what-is-rego,这里不再介绍,本方主要介绍如何使用Golang 来开发一个opa策略。

概述

OPA 将策略决策与策略执行分离。当您的软件需要做出策略决策时,它会查询 OPA 并提供结构化数据(例如 JSON)作为输入。 OPA 接受任意结构化数据作为输入。

对于它的输入,一般称为 input 可以为任何类型,输出也一样可以为任意类型,即可以输出布尔值truefalse,也可以输出一个 JSON 字符串对象。

Continue reading

一文看懂Golang 定时器源码

计时器分 Timer 和 Ticker 两种,它们底层基本是一样的,两差的区别请参考 https://blog.haohtml.com/archives/19859, 这里我们的介绍对象是 Timer 。

golang timer

计时器结构体

https://github.com/golang/go/blob/go1.17.6/src/time/sleep.go#L84-L98

 // NewTimer creates a new Timer that will send
 // the current time on its channel after at least duration d.
 func NewTimer(d Duration) *Timer {
     c := make(chan Time, 1)
     t := &Timer{
         C: c,
         r: runtimeTimer{
             when: when(d),
             f:    sendTime,
             arg:  c,
         },
     }
     startTimer(&t.r)
     return t
 }

通过调用 NewTimer() 函数创建一个 Timer,首先创建一个长度为1的有缓冲channel,再创建一个Timer的结构体,并将 channel 置于 Timer 结构体内。

注意这里的 runtimeTimer.f 字段是一个函数 sendTime ,其实现如下

func sendTime(c interface{}, seq uintptr) {
	// Non-blocking send of time on c.
	// Used in NewTimer, it cannot block anyway (buffer).
	// Used in NewTicker, dropping sends on the floor is
	// the desired behavior when the reader gets behind,
	// because the sends are periodic.
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

当 sendTime 函数主要用在 newTimer() 时,它以无阻塞的方式将当前时间 Now() 发送到 c 通道里。如果用在 newTicker() 时,如果读取落后,会将发送丢弃,它是周期性的。

我们给出 Timer 的结构体声明。

 type Timer struct {
  C <-chan Time
  r runtimeTimer
 }

一共两个字段,为了理解方面我们称 runtimeTimer 为 timer 值。

我们再看一下其中的 runtimeTimer 结构体的声明

 // Interface to timers implemented in package runtime.
 // Must be in sync with ../runtime/time.go:/^type timer
 type runtimeTimer struct {
  pp       uintptr
  when     int64
  period   int64
  f        func(interface{}, uintptr) // NOTE: must not be closure
  arg      interface{}
  seq      uintptr
  nextwhen int64
  status   uint32
 }

对于 runnerTimer结构体要与在 runtime/time.go 文件中的 timer 结构体保持同步。

结构体字段说明

  • pp 指针类型,这里指 GPM 中的 P。如果这个计时器 timer 在一个heap 上,它在哪个 P 的堆上
  • when 表示唤醒执行的时间,表示什么时间开始执行
  • period 周期,一定是大于 0; when+period 表示下次唤醒执行的时间
  • f 执行函数,不允许为匿名函数,最好为非阻塞函数
  • arg 上面f函数的参数
  • seq 同 arg,其在 runOneTimer 函数中的调用方式为 f(arg, seq)
  • nextwhen 下次运行的时间,其值只有在 timerModifiedXX status 状态下才设置
  • status 状态,其定义的的可用值有10种,定义在 runtime/time.go,我们下面对这些状态进行了介绍。

每次开启一个goroutine 执行 f(arg, now),基中when表示执行的时间,而 when+period 表示下次执行的时间。(这时有点疑问,对调用的函数参数,f的第二个参数是 now, 但后面介绍的时候第二个参数却是 seq)

通过查看 https://github.com/golang/go/blob/go1.17.6/src/runtime/time.go#L41-L116 可知以下几点:

Continue reading

Golang常见编译参数

在执行 go build 命令的时候,经常需要添加一些参数,或许是为了调试,也或许是为了生成最终部署。

如果只在编译特定包时需要传递参数,格式应遵守“包名=参数列表”,如

go build -gcflags -gcflags='log=-N -l' main.go

-gcflags

go build 可以用 -gcflagsgo编译器传入参数,也就是传给 go tool compile 的参数,因此可以用 go tool compile –help 查看所有可用的参数。

其中 -m 可以检查代码的编译优化情况,包括逃逸情况和函数是否内联。

-ldflags

go build用 -ldflags 给go链接器传入参数,实际是给go tool link的参数,可以用go tool link –help查看可用的参数。

常用-X来指定版本号等编译时才决定的参数值。例如代码中定义var buildVer string,然后在编译时用go build -ldflags “-X main.buildVer=1.0” … 来赋值。注意-X只能给string类型变量赋值。


Golang中的 CGO_ENABLED 环境变量

Golang中的编译参数

开发中经常使用 go build 命令来编译我们的程序源码,然后将生成二进制文件直接部署,极其方便。

对于 go build 有一些参数,对于针对程序源码进行一些编译优化,下面我们对经常使用的一些参数来介绍一下。

环境变量

环境变量需要在go命令前面设置,如果多个变量的话,中间需要用“空格”分隔。下面我们介绍一个非常常见到的一些环境变量

$ CGO_ENABLED=1 GOARCH=amd64 GOOS=linux go build -o myserver main.go

除了这里给出的这几个变量外,还有一些其它变量,如 GODEBUG、GOFLAGS、GOPROXY 等,所有支持环境变量都可以在 https://github.com/golang/go/blob/a88575d662a7e8e4fbb31bf139bcffc063e2a734/src/cmd/go/internal/help/helpdoc.go#L485 里找到,有兴趣的话可以看看他们的作用。

这里重点介绍一下 CGO_ENABLED 环境变量对我们程序的影响。 CGO_ENABLED是用来控制golang 编译期间是否支持调用 cgo 命令的开关,其值为1或0,默认情况下值为1,可以用 go env 查看默认值。

如果你的程序里调用了cgo 命令,此参数必须设置为1,否则将编译时出错。这里直接用文档 https://go.dev/blog/cgo 中的一个例子验证。

package main

// #include <stdio.h>
// #include <stdlib.h>
//
// static void myprint(char* s) {
//   printf("%sn", s);
// }
import "C"
import "unsafe"

func main() {
	cs := C.CString("Hello from stdio")
	C.myprint(cs)
	C.free(unsafe.Pointer(cs))
}

然后我们执行一下 启用 CGO_ENABLED=1 的情况

root@ubuntu:/home/sxf/gotest# CGO_ENABLED=1 go run main.go
root@ubuntu:/home/sxf/gotest# ./main
Hello from stdio

可以看到输出正常,这里也可以省略不写变量,因为默认情况为启用CGO状态

启用 CGO_ENABLED=0 的情况

root@ubuntu:/home/sxf/gotest# CGO_ENABLED=0 go build main.go
go: no Go source files

可以看到编译失败,验证了我们上面说的情况。

这里提示找不到go源文件,不清楚底层是如何判断这一情况的。

那么,如果我们一个程序里未调用cgo,在编译时指定 CGO_ENABLED 不同值话,又会发生什么呢?编译的二进制有何区别呢?

root@ubuntu:/home/sxf/gotest# CGO_ENABLED=1 go build -o cgo_main main.go
root@ubuntu:/home/sxf/gotest# CGO_ENABLED=0 go build -o no_cgo_main main.go
root@ubuntu:/home/sxf/gotest# ls -al
total 3804
drwxrwxr-x  2 sxf  sxf     4096 Nov 25 10:22 .
drwxr-x--- 47 sxf  sxf     4096 Nov 19 12:48 ..
-rwxr-xr-x  1 root root 1937311 Nov 25 10:22 cgo_main
-rw-rw-r--  1 sxf  sxf      119 Oct 29 09:07 go.mod
-rw-rw-r--  1 sxf  sxf      241 Oct 29 09:07 go.sum
-rw-rw-r--  1 sxf  sxf       72 Nov 25 10:21 main.go
-rwxr-xr-x  1 root root 1937311 Nov 25 10:22 no_cgo_main

可以看到两种情况都可能编译成功,且两者生成的二进制文件是完全一样的。由此总结出可以看出如果程序里未调用 cgo 的话,此变量值并没有影响的。

那么问题来了,为什么这么多项目里编译的时候都明确指定了此环境变量的值呢,主要是编译器在编译时会根据不同的情况使用不同的编译方法。 当CGO_ENABLED=1,进行编译时会将文件中引用libc的库(比如常用的net包),以动态链接的方式生成目标文件。 当CGO_ENABLED=0,进行编译时则会把在目标文件中未定义的符号(外部函数)一起链接到可执行文件中。

不论哪种方式,都可以使用静态连接编译

参考

k8s安装负载均衡器:Metallb

在使用kubenetes的过程中,如何将服务开放到集群外部访问是一个重要的问题。当使用云平台(阿里云、腾讯云、AWS等)的容器服务时,我们可以通过配置 service 为 LoadBalancer 模式来绑定云平台的负载均衡器,从而实现外网的访问。但是,如果对于自建的 kubernetes裸机集群,这个问题则要麻烦的多。

祼机集群不支持负载均衡的方式,可用的不外乎NodePort、HostNetwork、ExternalIPs等方式来实现外部访问。但这些方式并不完美,他们或多或少都存在的一些缺点,这使得裸机集群成为Kubernetes生态系统中的二等公民。

MetalLB 旨在通过提供与标准网络设备集成的Network LB实施来解决这个痛点,从而使裸机群集上的外部服务也尽可能“正常运行”,减少运维上的管理成本。它是一种纯软件的解决方案,参考 https://kubernetes.github.io/ingress-nginx/deploy/baremetal/

Continue reading

Golang中的runtime.LockOSThread 和 runtime.UnlockOSThread

在runtime中有 runtime.LockOSThreadruntime.UnlockOSThread 两个函数,这两个函数有什么作用呢?我们看一下标准库中对它们的解释。

runtime.LockOSThread

// LockOSThread wires the calling goroutine to its current operating system thread.
// The calling goroutine will always execute in that thread,
// and no other goroutine will execute in it,
// until the calling goroutine has made as many calls to
// UnlockOSThread as to LockOSThread.
// If the calling goroutine exits without unlocking the thread,
// the thread will be terminated.
//
// All init functions are run on the startup thread. Calling LockOSThread
// from an init function will cause the main function to be invoked on
// that thread.
//
// A goroutine should call LockOSThread before calling OS services or
// non-Go library functions that depend on per-thread state.

调用 LockOSThread绑定 当前 goroutine 到当前 操作系统线程,此 goroutine 将始终在此线程执行,其它 goroutine 则无法在此线程中得到执行,直到当前调用线程执行了 UnlockOSThread 为止(也就是说 LockOSThread 可以指定一个goroutine 独占 一个系统线程);

Continue reading

认识无锁队列

无锁队列lock-free 中最基本的数据结构,一般应用在需要一款高性能队列的场景下。

对于多线程用户来说,无锁队列的入队和出队操作是线程安全的,不用再加锁控制

什么是无锁队列

队列每个开发者都知道,那么什么又是无锁队列呢?字面理解起来就是一个无锁状态的队列,多个线程(消费者)同时操作数据的时候不需要加锁,因为加/解锁都是一个很消耗资源的动作。

实现原理

我们先看一下无锁队列的底层实现数据结构。

数据结构

无锁队列底层的数据结构实现方式主要有两种:数组链接

Continue reading

Runtime: goroutine的暂停和恢复源码剖析

上一节《GC 对根对象扫描实现的源码分析》中,我们提到过在GC的时候,在对一些goroutine 栈进行扫描时,会在其扫描前触发 G 的暂停(suspendG)和恢复(resumeG)。

// markroot scans the i'th root.
//
// Preemption must be disabled (because this uses a gcWork).
//
// nowritebarrier is only advisory here.
//
//go:nowritebarrier
func markroot(gcw *gcWork, i uint32) {
	baseFlushCache := uint32(fixedRootCount)
	baseData := baseFlushCache + uint32(work.nFlushCacheRoots)
	baseBSS := baseData + uint32(work.nDataRoots)
	baseSpans := baseBSS + uint32(work.nBSSRoots)
	baseStacks := baseSpans + uint32(work.nSpanRoots)
	end := baseStacks + uint32(work.nStackRoots)

	// Note: if you add a case here, please also update heapdump.go:dumproots.
	switch {
		......

	default:
		var gp *g
		if baseStacks <= i && i < end {
			gp = allgs[i-baseStacks]
		} else {
			throw("markroot: bad index")
		}

		status := readgstatus(gp) // We are not in a scan state
		if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 {
			gp.waitsince = work.tstart
		}

		// scanstack must be done on the system stack in case
		// we're trying to scan our own stack.
		systemstack(func() {
			userG := getg().m.curg
			selfScan := gp == userG && readgstatus(userG) == _Grunning
			if selfScan {
				casgstatus(userG, _Grunning, _Gwaiting)
				userG.waitreason = waitReasonGarbageCollectionScan
			}

			// TODO: suspendG blocks (and spins) until gp
			// stops, which may take a while for
			// running goroutines. Consider doing this in
			// two phases where the first is non-blocking:
			// we scan the stacks we can and ask running
			// goroutines to scan themselves; and the
			// second blocks.
			stopped := suspendG(gp)
			if stopped.dead {
				gp.gcscandone = true
				return
			}
			if gp.gcscandone {
				throw("g already scanned")
			}
			scanstack(gp, gcw)
			gp.gcscandone = true
			resumeG(stopped)

			if selfScan {
				casgstatus(userG, _Gwaiting, _Grunning)
			}
		})
	}
}

那么它在暂停和恢复一个goroutine时都做了些什么工作呢,今天我们通过源码来详细看一下。 go version 1.16.2

G的抢占

一个G可以在任何 安全点(safe-point) 被抢占,目前安全点可以分为以下几类:

  1. 阻塞安全点出现在 goroutine 被取消调度、同步阻塞或系统调用期间;
  2. 同步安全点出现在运行goroutine检查抢占请求时;
  3. 异步安全点出现在用户代码中的任何指令上,其中G可以安全的暂停且可以保证堆栈和寄存器扫描找到 stack root(这个很重要,GC扫描开始的地方)。runtime 可以通过一个信号在一个异步安全点暂停一个G。

这里将安全点分为 阻塞安全点同步安全点异步安全点,每种安全点都出现在不同的场景。

阻塞安全点和同步安全点,一个G的CPU状态是最小的(无法理解这里最小的意思)。垃圾回收器拥有整个stack的完整信息。这样就有可能使用最小的空间重新调度G,并精确的扫描G的 栈。

Continue reading

goroutine栈的申请与释放

对于提高对 stack 的使用效率,避免重复从heap中分配与释放,对其使用了 pool 的概念,runtime 里为共提供了两个pool, 分别为 stackpool ,另一个为 stackLarge

stack pool

stackpool: 16b~32k 对应通用的大小的stack。获取时通过调用 stackpoolalloc(), 释放时调用 stackpoolfree()

stackLarge:对应 > 32K 的 stack

在程序全局调度器初始化时会通过调用 stackinit() 实现对 stack 初始化。

当我们执行一个 go func() 语句的时候,runtime 会通过调用 newproc() 函数来创建G。而内部真正创建G的函数为 newproc1(),在没有G可以复用的情况下,会通过 newg = malg(_StackMin) 语句创建一个包含stack的G。

// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g {
	newg := new(g)
	if stacksize >= 0 {
		stacksize = round2(_StackSystem + stacksize)
		systemstack(func() {
			newg.stack = stackalloc(uint32(stacksize))
		})
		newg.stackguard0 = newg.stack.lo + _StackGuard
		newg.stackguard1 = ^uintptr(0)
		// Clear the bottom word of the stack. We record g
		// there on gsignal stack during VDSO on ARM and ARM64.
		*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
	}
	return newg
}

对于新创建的g,需要通过调用 stackalloc() 函数为其分配 stacksize 大小stack,那么分配操作它又是如何工作的呢?

stack的申请

根据申请stack的大小分两种情况,一种是 small stack,另一种是 large stack,两者采用不同的申请策略。主要涉及了内存申请策略,如果对golang 的内存管理比较了解的话,这块理解起来就显的太过于简单了。建议先阅读一下这篇文章《Golang 内存组件之mspan、mcache、mcentral 和 mheap 数据结构》。

func stackalloc(n uint32) stack {
	...

	var v unsafe.Pointer
	if n < _FixedStack<<_NumStackOrders && n < _StackCacheSize {
		order := uint8(0)
		n2 := n
		for n2 > _FixedStack {
			order++
			n2 >>= 1
		}
		var x gclinkptr
		if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" {
			// thisg.m.p == 0 can happen in the guts of exitsyscall
			// or procresize. Just get a stack from the global pool.
			// Also don't touch stackcache during gc
			// as it's flushed concurrently.
			lock(&stackpool[order].item.mu)
			x = stackpoolalloc(order)
			unlock(&stackpool[order].item.mu)
		} else {
			c := thisg.m.p.ptr().mcache
			x = c.stackcache[order].list
			if x.ptr() == nil {
				stackcacherefill(c, order)
				x = c.stackcache[order].list
			}
			c.stackcache[order].list = x.ptr().next
			c.stackcache[order].size -= uintptr(n)
		}
		v = unsafe.Pointer(x)
	} else {
		...
	}
	return stack{uintptr(v), uintptr(v) + uintptr(n)}
}

对于 small stack 以可以分两种情况:

  • P:会直接通过从当前 G 绑定的P中的 mcache 字段申请,这个字段可以理解为内存资源中心,里面包含有多种不同规格大小的内存块,根据申请大小找到一个可以满足其大小的最小规格的内存区域。
  • P:调用 stackpoolalloc() 会直接从全局 stack pool 中获取
Continue reading