Protobuf协议实现原理

protobuf是Google开源的一款支持跨平台、语言中立的结构化数据描述和高性能序列化协议,此协议完全基于二进制,所以性能要远远高于JSON/XML。由于出色的传输性能所以常见于微服务之间的通讯,其中最为著名的是Google开源的 gRPC 框架。

那么protobuf是如何实现高性能的,又是如何实现数据的编码和解码的呢?

protobuf协议原理

基于128bits的数据存储方式(Base 128 Varints)

Varint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。


比如对于 int32 类型的数字,一般需要 4 个 byte 来表示。但是采用 Varint,对于很小的 int32 类型的数字,则可以用 1 个 byte 来表示。当然凡事都有好的也有不好的一面,采用 Varint 表示法,大的数字则需要 5 个 byte 来表示。从统计的角度来说,一般不会所有的消息中的数字都是大数,因此大多数情况下,采用 Varint 后,可以用更少的字节数来表示数字信息。

Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。其他的 7 个 bit 都用来表示数字。因此小于 128 的数字都可以用一个 byte 表示。大于 128 的数字,比如 300,会用两个字节来表示:1010 1100 0000 0010。

另外如果从数据大小角度来看,这种表示方式比实现的数据多了一个bit, 所以其实际传输大小就多14%(1/7 = 0.142857143)。

数字1表示方式:0000 0001

对于小的数据比较好理解,正常情况下1的二进制是 0000 0001,使用128bits表示的话,首位结束标识位也是0,所以两者结果是一样的 0000 0001。

数字 300 表示方式:1010 1100 0000 0010

300

这个有点不太好理解了,这是因为原本用一个字节(8bit)就可以表示,但由于使用128bits表示方法,需要对每个字节的最高位添加一个结束标识位来表示,所以一个字节已经不够用了,需要占用两个字节来表示,其中两个字节最高位都是结束标识位。

如果正向推算的话,我们知道数字300的二进制值 1 0010 1100,用两个字节表示完整值则为
0000 0001 0010 1100 # 二进制
_000 0010 _010 1100 # 二进制每个字节的最高位向左移动一个位置,放入结束标识位
0000 0010 1010 1100 # 转换为128bits方式,1:结束,0:未结束
1010 1100 0000 0010 # 转换为小端字节序, 低字节在前,高字节在后

注意这里是先添加结束标识符,然后再转为小端字节序。

协议数据结构

消息经过序列化后会成为一个二进制数据流,该流中的数据为一系列的 Key-Value 对。如下图所示:

图 7. Message Buffer
图 7. Message Buffer

采用这种 Key-Pair 结构无需使用分隔符来分割不同的 Field。对于可选的 Field,如果消息中不存在该 field,那么在最终的 Message Buffer 中就没有该 field,这些特性都有助于节约消息本身的大小。

Key 用来标识具体的 field,在解包的时候,客户端创建一个结构对象,Protocol Buffer 从数据流中读取并反序列化数据,并根据 Key 就可以知道相应的 Value 应该对应于结构体中的哪一个 field。

而Key也是由以下两部分组成

Key 的定义如下:

1(field_number << 3) | wire_type
Key的定义

可以看到 Key 由两部分组成。第一部分是 field_number。第二部分为 wire_type。表示 Value 的传输类型。

一个字节的低3位表示数据类型,其它位则表示字段序号。

Wire Type 可能的类型如下表所示:

表 1. Wire Type

Type
MeaningUsed For
0Varintint32, int64, uint32, uint64, sint32, sint64, bool, enum
164-bitfixed64, sfixed64, double
2Length-delimistring, bytes, embedded messages, packed repeated fields
3Start groupGroups (deprecated)
4End groupGroups (deprecated)
532-bitfixed32, sfixed32, float
Wire Type 数据类型
message Test1 {
    required int32 a = 1;
}

在我们的例子当中,field id 所采用的数据类型为 int32,因此对应的 wire type 为 0。细心的读者或许会看到在 Type 0 所能表示的数据类型中有 int32 和 sint32 这两个非常类似的数据类型。Google Protocol Buffer 区别它们的主要意图也是为了减少 encoding 后的字节数。

每个数据头同样采用128bits方式,一般1个字节就足够了,

本例中字段a 的序号是1

如上创建了 Test1 的结构并且把 a 的值设为 2,序列化后的二进制数据为
0000 1000 0000 0010

Key 部分是 0000 1000
value 部分是 0000 0010, 其中字节最高位是结束标识位,即10进制的2,我们在转换的时候统一将符号位转为0即可。

协议规定数据头的低3位表示wire_type, 其它字段表示字段序号field_number,因此
0000 1000
_000 1000 # 去掉结束标识符位
_000 1000 # 000 表示数据类型, 这里是Varint
_000 1000 # 0001 这四位表示字段序号

参考

https://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

Golang开发中中使用GitHub私有仓库

私有仓库地址为

github.com/cfanbo/websocket

一、设置私有环境变量 GOPRIVATE

$ go env -w GOPRIVATE=github.com/cfanbo/websocket

对于为什么需要设置 GOPRIMARY 变量,可以参考这里

对于GOPRIVATE值级别分为仓库级别和账号级别。

如果只有一个仓库,直接设置为仓库地址即可。如果有多个私有仓库的话,使用”,”分开,都在这个账号下,也可以将值设置为账号级别,这样账号下的所有私有仓库都可以正常访问。如 http://github.com/cfanbo

如果不想每次都重新设置,我们也可以利用通配符,例如:

$ go env -w GOPRIVATE="*.example.com"

这样子设置的话,所有模块路径为 example.com 的子域名(例如:git.example.com)都将不经过 Go module proxy 和 Go checksum database,需要注意的是不包括 example.com 本身。

国内用户访问仓库建议设置 GORPOXY为 https://proxy.golang.org,direct

二、设置凭证

使用私有仓库一定要绕不开权限设置这一步。访问仓库来常见的有两种方式,分别为SSH和 Https 。对于私有仓库来说,ssh可以设置rsa私钥来访问,https这种则可以使用用户名和密码,一般通过命令行访问的时候,会自动提示用户输入这些信息。

对于权限控制这一块可参考官方文档 。其实在官方文档里还提供了第三种访问仓库的方式,那就是 Personal access token,简称 PAT, 这种 Token 是专门为api调用提供的,常见于自动化工作流中,如 CICD场景。

这里我们就利用PAT 来实现

  1. 在Github.com 网站生成 Personal access tokens,新手可参考官方教程文档
  2. 本地配置token凭证
$ git config --global url."https://${username}:${access_token}@github.com".insteadOf / "https://github.com"

如果你在使用Github Actions部署时,遇到无法读取版本号问题,需要改写成 git config –global url.”https://${username}:${access_token}@github.com”.insteadOf “https://github.com

命令验证

go get github.com/cfanbo/websocket

到这里基本配置基本完成了。

其它场景

如果要用在docker环境中的话,也要记得设置上面的几个环境变量值。

以下为一个docker示例

# Start from the latest golang base image
FROM golang:alpine

RUN GOCACHE=OFF

# 设置环境变量
RUN go env -w GOPRIVATE=github.com/ereshzealous

# Set the Current Working Directory inside the container
WORKDIR /app

# Copy everything from the current directory to the Working Directory inside the container
COPY . .

RUN apk add git

# 设置访问仓库凭证
RUN git config --global url."https://user-name:<access-token>@github.com".insteadOf "https://github.com"

# Build the Go app
RUN go build -o main .

# Expose port 8080 to the outside world
EXPOSE 8080

#ENTRYPOINT ["/app"]

# Command to run the executable
CMD ["./main"]

利用jenkins+github实现应用的自动部署及回滚

对于jenkins的介绍这里不再详细写了,此教程只是为了让大家对部署和回滚原理有所了解。

一、创建项目

点击左侧的“New Item”,输入项目名称,如 rollback-demo。

选中 ” 丢弃旧的构建(Discard old builds)”项,在“策略(Strategy” 选择”Log Rotation“, 并输入保留的最大构建个数。

二、常规配置

设置参数,点击”Add Parameter“,依次选择 “Choice Parameter” 和 “String Parameter“这两,填写如下

这里的Name 项为参数名称,用户在操作的时候,会在deploy 和 rollback 两个值中选择一项。

三、源码管理

我们这里选择Git.并填写github.com上的项目地址,记得设置认证 Credentials。构建分支直接使用默认的 */master 即可以了。查看代码浏览器选择 githubweb,并填写项目的github地址。

四、构建触发事件

选择 “GitHub hook trigger for GITScm polling”,表示使用github webhook来触发构建操作,要实现引功能,需要在项目地址github.com里的“setting”里添加一个webhook的url地址,一般地址为http://jenkins.com/github-webhook/

同时为了防止网络通讯不稳定的情况,同时选择 “Poll SCM”, 在调度Schedule 杠中填写 H/5 * * * *,表示5分钟自动从github上拉取数据一次,如果有变化就进行构建。

五、构建环境

我们演示为了简单,使用了php项目,这里不进行任何操作。如果java、NodeJS或者Golang的话,可能需要进行一些操作, 有时为了方便会把这些操作放在下一步shell脚本里进行。

六、构建配置

1.添加构建步骤,点击Add build step,选择“Execute shell”,填写内容如下

#!/bin/bash
case $deploy_env in
deploy)
	echo "deploy $deploy_env"
    ;;
rollback)
	echo "rollback $deploy_env version=$version"
    cp -R ${JENKINS_HOME}/jobs/rollback-demo/builds/${version}/archive/*.* ./
    pwd &amp;&amp; ls
    ;;
    *)
    exit
    ;;
esac    

可以看到当rollback的时候,是从原来构建归档路径里把文件复制出来。

这里正常情况下应该有一些单元测试之类的脚本,这里省略不写了。

2.添加构建后的操作。点击“Add post-build action” -> “Archive the artifacts” 配置用于归档的文件为“**/*“,表示所有文件。这点十分重要,只有每次构建完归档了才有东西回滚,另外时间长了,归档的内容越来越多,所以上面设置了最大归档个数。
3.再次添加”Send build artifacts over SSH”,配置内容如下

注意 shell脚本里的路径比 Remote directory 的路径里多一个/data 目录,这是由于在配置 ssh server 的时候指定了一个根目录为 /data.

如果在 Add post-build action 中找不到send build artifacts over ssh ,则说明需要安装一下插件,左侧点击“Manage Jenkins”-> “Manage Plugis”, 搜索“Publish Over SSH”安装即可。

到这里为了基本配置完成了。

测试配置

这里我们首次手动构建一次,点击项目页面左侧菜单的“build with Parameter”,显示如下

在正常deploy的时候,version字段时忽略掉即可。如果要加滚的话,则需要选择”rollback”,同时填写 version字段号,这个字段号为页面左下角build history的编号,就是以#开始的那些数字

golang中几种对goroutine的控制方法

我们先看一个代码片断

func listen() {
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <-ticker.C:
			fmt.Println(time.Now())
		}
	}
}
func main() {
	go listen()
	time.Sleep(time.Second * 5)
	fmt.Println("main exit")
}

非常简单的一个goroutine用法,想必每个go新手都看过的。

不过在实际生产中,我们几乎看不到这种用法的的身影,原因很简单,我们无法实现对goroutine的控制,而一般业务中我们需要根据不同情况对goroutine进行各种操作。

要实现对goroutine的控制,一般有以下两种。

一、手动发送goroutine控制信号

这里我们发送一个退出goroutine的信号。

// listen 利用只读chan控制goroutine的退出
func listen(ch <-chan bool) {
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <-ticker.C:
			fmt.Println(time.Now())
		case <-ch:
			fmt.Println("goroutine exit")
			return
		}
	}
}

func main() {
	// 声明一个控制goroutine退出的chan
	ch := make(chan bool, 1)
	go listen(ch)

	// 只写chan
	func(ch chan<- bool) {
		time.Sleep(time.Second * 3)
		fmt.Println("发送退出chan信号")
		ch <- true
		close(ch)
	}(ch)

	time.Sleep(time.Second * 5)
	fmt.Println("main exit")
}

我们在main函数里发送一个控制goroutine的退出信息,在goroutine里我们会select这个通道,如果收到此信息,则直接退出goroutine。这里我们使用到了单向chan。

二、利用 context 包来控制goroutine

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()

	go func() {
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				fmt.Println(time.Now())
			case <-ctx.Done():
				// 3秒后会收到信号
				fmt.Println("goroutine exit")
				return
			}
		}
	}()

	time.Sleep(time.Second * 5)
	fmt.Println("main exit")
}

这里主要用到了上下文包context来实现,如果你看过包源码的话会发现 ctx.Done() 其实也是chan的读取操作,其原理和第一种方法是完全一样。

Golang遍历切片删除元素引起恐慌问题

删除一个切片的一些元素,https://github.com/golang/go/wiki/SliceTricks告知切片操作:Golang遍历切片恐慌时删除元素

a = append(a[:i], a[i+1:]...)

然后我下面的编码:

package main 

import (
    "fmt" 
) 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
    for i, value := range slice { 
     if value%3 == 0 { // remove 3, 6, 9 
      slice = append(slice[:i], slice[i+1:]...) 
     } 
    } 
    fmt.Printf("%vn", slice) 
} 

go run hello.go,它恐慌:

panic: runtime error: slice bounds out of range 

goroutine 1 [running]: 
panic(0x4ef680, 0xc082002040) 
    D:/Go/src/runtime/panic.go:464 +0x3f4 
main.main() 
    E:/Code/go/test/slice.go:11 +0x395 
exit status 2 

我该如何更改此代码才能正确使用?

想到的有几下几种方法

1、使用goto和标签

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
Label: 
    for i, n := range slice { 
     if n%3 == 0 { 
      slice = append(slice[:i], slice[i+1:]...) 
      goto Label 
     } 
    } 
    fmt.Printf("%vn", slice) 
} 

它的工作原理很简单,就是不停的迭代,每次删除一个元素后,都需要从切片的开头重新迭代,太过于效率低下了,特别是当一个切片很大的情况下。

2,使用另一个变量临时存储想要的元素

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
    dest := slice[:0] 
    for _, n := range slice { 
     if n%3 != 0 { // filter 
      dest = append(dest, n) 
     } 
    } 
    slice = dest 
    fmt.Printf("%vn", slice) 
} 

这种方法有些浪费内存,一是申请变量占用内存,另外切片扩大时,底层会重新申请内存空间,并将数据迁移过去。

3,从Remove elements in slice,与len操作:

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
    for i := 0; i < len(slice); i++ { 
     if slice[i]%3 == 0 { 
      slice = append(slice[:i], slice[i+1:]...) 
      i-- // should I decrease index here? 
     } 
    } 
    fmt.Printf("%vn", slice) 
} 

要选哪一个,这里看一下基准测试

与基准:

func BenchmarkRemoveSliceElementsBySlice(b *testing.B) { 
    for i := 0; i < b.N; i++ { 
     slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
     dest := slice[:0] 
     for _, n := range slice { 
      if n%3 != 0 { 
       dest = append(dest, n) 
      } 
     } 
    } 
} 

func BenchmarkRemoveSliceElementByLen(b *testing.B) { 
    for i := 0; i < b.N; i++ { 
     slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
     for i := 0; i < len(slice); i++ { 
      if slice[i]%3 == 0 { 
       slice = append(slice[:i], slice[i+1:]...) 
      } 
     } 
    } 
} 


$ go test -v -bench=".*" 
testing: warning: no tests to run 
PASS 
BenchmarkRemoveSliceElementsBySlice-4 50000000    26.6 ns/op 
BenchmarkRemoveSliceElementByLen-4  50000000    32.0 ns/op 

从结果上看来使用第2种方法好像好一些的。还哪更好的解决办法没有了呢,网友给出了一种更合理的解决方案,也是使用迭代。

最佳方案:

package main 

import "fmt" 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 

    k := 0 
    for _, n := range slice { 
     if n%3 != 0 { // filter 
      slice[k] = n 
      k++ 
     } 
    } 
    slice = slice[:k] 

    fmt.Println(slice) //[1 2 4 5 7 8] 
} 

实现原理就是将要保留的数据向左边存储,参考:https://play.golang.org/p/eMZltc_gEB,不得不说这个方法真是让人脑洞大开。

来源: https://stackoverflow.com/questions/38387633/golang-remove-elements-when-iterating-over-slice-panics/38387701#38387701

package main 

import "fmt" 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 

    k := 0 
    for i, n := range slice { 
     if n%3 != 0 { // filter 
      if i != k { 
       slice[k] = n 
      } 
      k++ 
     } 
    } 
    slice = slice[:k] 

    fmt.Println(slice) //[1 2 4 5 7 8] 
} 

如果您需要新片保留旧切片

package main 

import "fmt" 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 

    s2 := make([]int, len(slice)) 
    k := 0 
    for _, n := range slice { 
     if n%3 != 0 { // filter 
      s2[k] = n 
      k++ 
     } 
    } 
    s2 = s2[:k] 

    fmt.Println(s2) //[1 2 4 5 7 8] 
} 

http://cn.voidcc.com/question/p-mkbvfagj-hy.html

Golang中select用法导致CPU占用100%的问题分析

上一节(golang中有关select的几个知识点)中介绍了一些对于select{}的一些用法,今天介绍一下有关select在for语句中由于使用不当引起的CPU占用100% 的案例。

先看代码

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan int, 10)

	// 读取chan
	go func() {
		for {
			select {
			case i := <-ch:
				// 只读取15次chan
				fmt.Println(i)
			default:
				// 读取15次chan以后的操作一直在这个空语句无任何IO操作的default条件里死循环,无法出让P,以保证一个GPM关系。
				// 而如果无default条件的话,则系统当读取完15次chan后,当前goroutine会发会chan IO阻塞, Go调度器根据GPM的调度关系,会将当前执行关系中的G切换出去,再从LRQ队列中取一个新的G,重新组成一个GPM继续执行,以实现合理利用计算机资源,提高GO的高并发性能
			}
		}
	}()

	// 写入10个值到chan
	for i := 0; i < 15; i++ {
		ch <- i
	}

	// 模拟程序效果使用
	time.Sleep(time.Minute)
}

实现功能

通过操作chan来实现消费者逻辑。

问题现象

但在运行的时候,即发现CPU占用率100%,下面我们分析一下什么原因引起的。

问题分析

程序运行时,先使用go关键字创建一个 goroutine,里面是一个for循环语句。for语句里面通过select{}来监听是否有chan的IO操作,当ch中有可以读取的数据时,则将值打印出来。没有的话则执行default语句,而这里default语句为空,所以继续下一次for语句,for{}是一个死循环语句。

当读取15次ch后,由于ch会永远处于阻塞状态,所以会一直执行default条件,然后再执行for循环。此时这段逻辑基本演变成了一个空的 for{} 语句,所以会导致CPU占用100%。

解决办法

既然我们知道这个goroutine会一直在死循环的执行空的语句,导致一直占用着cpu不放,我们只需要让当前goroutine出让CPU控制权给其它goroutine即可。根据GPM调度原理,这里我们只需要让操作chan的IO语句进行阻塞即可,这样Go Seched 就会发现goroutine发生阻塞,于是将当前G切换出去,重新调度一个新的G过来,开始一个新的GPM关系继续运行。
这里最简单的实现方法就注释掉 default 语句即可。将当前goroutine死循环状态变成阻塞状态。

注意这里的阻塞和执行空的 default是两码事,阻塞是执行到这里主停止不再继续执行了,而这里有空的default, 表示的是无执行代码,但本次循环是可以结束的,继续下一次循环,就是一个死循环而已。

对于Go中的阻塞需要了解一下有哪些场景会发生,可以参考上面提到的GPM文章。
常见的阻塞一般发生在像网络请求、系统调用进行磁盘IO操作、执行Sleep函数等,而针对每一种阻塞的处理方式也不一样。
如果是网络导致的阻塞的话,则直接将G切换到网络轮询器NetPoller继续执行, 为PM重新调度过来一个新的G继续执行,等网络请求完成后,再将G追加到LRQ的队列尾部等待再次执行。
而对于系统调用产生的IO阻塞这种情况,则Go Seched则直接将M和G同时切换出去,并保持MG继续执行IO操作,出让出来的P再分配一个新的MG继续执行。等原来IO操作完成后,将原来的G放入LRQ队列,等待P的再次执行,而原来的M放在一旁等待被重复调用使用。可以看到原来的G和M关系到此结束了,下次与G执行的M不一定是原来的M了, 所以G再次执行就需要知道运行状态,堆栈之类的信息,而这些正是存储在G的结构体中了。

对于GPM关系中的几个要点

Go 调度器模型我们通常叫做G-P-M 模型,他包括 4 个重要结构,分别是G、P、M、Sched

  • G 并非执行体,每个 G 需要绑定到 P 才能被调度执行。
  • P 的数量决定了系统内最大可并行的 G 的数量(前提:物理 CPU 核数  >= P 的数量)。
  • M 并不保留 G 状态,这是 G 可以跨 M 调度的基础。M的数量是由Go Runtime来调整,目前默认最大限制为10000个。
  • Sched:Go 调度器,它维护有存储 M 和 G 的队列以及调度器的一些状态信息等。
  • Go 调度器中有两个不同的运行队列:全局运行队列(GRQ)和本地运行队列(LRQ)。
  • 多个 Goroutine 通过用户级别(用户态)的上下文切换来共享内核线程 M 的计算资源(内核态),但对于操作系统来说并没有线程上下文切换产生的性能损耗。

调度器循环的机制大致是从各种队列、P 的本地队列中获取 G,切换到 G 的执行栈上并执行 G 的函数,调用 Goexit 做清理工作并回到 M,如此反复。

推荐 https://mp.weixin.qq.com/s/ihJFa5Wir4ohhZUXVSBvMQ

基于 GitHub Actions 实现 Golang 项目的自动构建部署

前几天 GitHub官网宣布 GitHub 的所有核心功能对所有人都免费开放,不得不说自从微软收购了GitHub后,确实带来了一些很大的改变。

以前有些项目考虑到协作关系的原因,虽然放在github上面,但对于一些项目的持续构建和部署一般是通过自行抢建Travis CI、jenkins等系统来实现。虽然去年推出了Actions用来代替它类三方系统,但感觉着还是不方便,必须有些核心功能无法使用,此消息的发布很有可能将这种格局打破。

本篇教程将介绍使用github的系列产品来实现项目的发布,构建,测试和部署,当然这仅仅是一个非常小的示例,有些地方后期可能会有更好的瞿恩方案。

GitHub Actions 是一款持续集成工具,包括clone代码,代码构建,程序测试和项目发布等一系列操作。更多内容参考:http://www.ruanyifeng.com/blog/2019/09/getting-started-with-github-actions.html

如果你对CI/CD不了解的话,建议先找些文档看看。

项目源文件见 https://github.com/cfanbo/github-actions-demo

GitHub Actions 术语

GitHub Actions 相关的术语。

(1)workflow (工作流程):持续集成一次运行的过程,就是一个 workflow。

(2)job (任务):一个 workflow 由一个或多个 jobs 构成,含义是一次持续集成的运行,可以完成多个任务。

(3)step(步骤):每个 job 由多个 step 构成,一步步完成。

(4)action (动作):每个 step 可以依次执行一个或多个命令(action)。

一、创建workflow 文件

在项目里创建一个workflow文件,文件格式为yaml类型。文件名可以随意起,文件后缀可以为yml 或 .yaml, 这里我们创建文件 .github/workflows/deploy.yaml,注意这里的路径。

如果你的仓库中有项目文件的话,当你点击“Actions”时,系统会自动根据你的开发语言推荐一个常用的actions,在页面的右侧也会推荐一些相应的actions.

二、在部署服务器上生成部署用户密钥

部署时需要用到用户的私钥,所以先登录到部署服务器获取私钥,这里为了方便,单独创建了一对公钥和公钥,

$ cd ~/.ssh
$ ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa): /root/.ssh/id_rsa_actions
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa_actions.****
Your public key has been saved in /root/.ssh/id_rsa_actions.pub.
The key fingerprint is:
SHA256:QPg26V17/pnRdHZrDqysG6jFgdTEysbz+aCuXusyO/Q root@iZbp1acq02ar70gdvppgh6Z
The key’s randomart image is:
+—[RSA 2048]—-+
| .o. |
| ..o. |****
| oooo |
| .**. . |
| .+o+S. . =|
| . o++ . o ++|
| . …+o. o o.o.|
| +.E+ .o o ++ |
| .+O= ooo .+. |
+—-[SHA256]—–+

这里为部署单独生成了一对公钥和私钥,私钥路径为 /root/.ssh/id_rsa_actions

将公钥保存到 authorized_keys 文件中
$ cat id_rsa_actions.pub >> authorized_keys

查看私钥内容,后面需要用到,先将内容保存起来
$ cat id_rsa_actions

三、准备工作

对于服务的部署所以这里选择了 ssh-deploy 这一个actioins,官方网址 https://github.com/marketplace/actions/ssh-deploy。

下面开始添加deploy.yaml文件中用到了一些变量。

在项目首页右上角点击 Seetings->Secets, 找到 Add a new secret,分别添加以下变量

SERVER_SSH_KEY 登录私钥,就是上面保存的私钥内容
REMOTE_HOST 服务器地址,如202.102.224.68
REMOTE_PORT (可选项)服务器ssh端口,一般默认为22
REMOTE_USER 登录服务器用户名,这时指密钥所属的用户
SOURCE (可选项),默认为‘’, 构建服务器路径,这里为相应 $GITHUB_WORKSPACE 根目录而言的相对路径, 例如 dist/
REMOTE_TARGET 服务器部署路径,如 /data/ghactions
ARGS (可选项)默认值为 -rltgoDzvO

四、上传代码到github远程仓库

我们这里定义了当master分支发生push操作时就触发一系列workflow操作。
$git push origin master
这时我们可以在 https://github.com/cfanbo/github-actions-demo/actions 页面看到当前项目的构建情况。

四、测试

这里我们登录到远程服务器,可以发现一个server可执行文件,表示已经成功部署到生产服务器了

五、其它

有关 workflow 的一些环境变量可参考 https://help.github.com/en/actions/configuring-and-managing-workflows/using-environment-variables

如果你有过CI/CD的经历,会发现本教程虽然实现了最为初级的功能,离真正线上使用还有一定的距离。主要存在以下问题:
1. 教程里只用了一个job,如果你的项目允许的话,完全可以利用多个job来同时异步构建。
2. 这里构建、测试和部署同时放在了一个job里,也不是太优雅
3. 这里的部署只是将最终生成的二进制文件上传到了生产服务器,并没有对服务进行启动操作或者说没有进行服务的热更新,这在一般场景下是不允许的
由于本篇文章只是一个简单介绍actions基本用法的教程,所以如果想真正用到工作中的话,可能还需要对李篇内容再完善完善才行。

参考

  • https://help.github.com/en/articles/workflow-syntax-for-github-actions
  • https://help.github.com/en/actions/configuring-and-managing-workflows/using-environment-variables
  • https://github.com/marketplace/actions/checkout
  • https://github.com/marketplace/actions/setup-go-for-use-with-actions
  • https://github.com/marketplace/actions/ssh-deploy
  • http://www.ruanyifeng.com/blog/2019/09/getting-started-with-github-actions.html

Golang中的限速器 time/rate

在高并发的系统中,限流已作为必不可少的功能,而常见的限流算法有:计数器、滑动窗口、令牌桶、漏斗(漏桶)。其中滑动窗口算法、令牌桶和漏斗算法应用最为广泛。

常见限流算法

这里不再对计数器算法和滑动窗口作介绍了,有兴趣的同学可以参考其它相关文章。

漏斗算法

非常很好理解,就像有一个漏斗容器一样,漏斗上面一直往容器里倒水(请求),漏斗下方以固定速率一直流出(消费)。如果漏斗容器满的情况下,再倒入的水就会溢出,此时表示新的请求将被丢弃。可以看到这种算法在应对大的突发流量时,会造成部分请求弃用丢失。

可以看出漏斗算法能强行限制数据的传输速率。

漏斗算法

令牌桶算法

从某种意义上来说,令牌算法是对漏斗算法的一种改进。对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发情况。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。

令牌桶算法是指一个固定大小的桶,可以存放的令牌的最大个数也是固定的。此算法以一种固定速率不断的往桶中存放令牌,而每次请求调用前必须先从桶中获取令牌才可以。否则进行拒绝或等待,直到获取到有效令牌为止。如果桶内的令牌数量已达到桶的最大允许上限的话,则丢弃令牌。

Golang中的限制算法

Golang标准库中的限制算法是基于令牌桶算法(Token Bucket) 实现的,库名为golang.org/x/time/rate

对于限流器的消费方式有三种,分别为 Allow()、 Wait()和 Reserve()。前两种内部调用的都是Reserve() ,每个都对应一个XXXN()的方法。如Allow()是AllowN(t, 1)的简写方式。

结构体

type Limiter struct {
	limit Limit
	burst int

	mu     sync.Mutex
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}

主要用来限速控制并发事件,采用令牌池算法实现。

创建限速器

使用 NewLimiter(r Limit, b int) 函数创建限速器,令牌桶容量为b。初始化状态下桶是满的,即桶里装有b 个令牌,以后再以每秒往里面填充 r 个令牌。

func NewLimiter(r Limit, b int) *Limiter {
	return &amp;Limiter{
		limit: r,
		burst: b,
	}
}

允许声明容量为0的限速器,此时将以拒绝所有事件操作。

// As a special case, if r == Inf (the infinite rate), b is ignored.
有一种特殊情况,就是 r == Inf 时,此时b参数将被忽略。

// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)

Limiter 提供了三个主要函数 Allow, Reserve, 和 Wait. 大部分时候使用Wait。其中 AllowN, ReserveN 和 WaitN 允许消费n个令牌。

每个方法都可以消费一个令牌,当没有可用令牌时,三个方法的处理方式不一样

  • 如果没有令牌时,Allow 返回 false。
  • 如果没有令牌时,Wait 会阻塞走到有令牌可用或者超时取消(context.Context)。
  • 如果没有令牌时,Reserve 返回一个 reservation,以便token的预订时,调用之前必须等待一段时间。

1. Allow/AllowN

AllowN方法表示,截止在某一时刻,目前桶中数目是否至少为n个。如果条件满足,则从桶中消费n个token,同时返回true。反之不消费Token,返回false。

使用场景:一般用在如果请求速率过快,直接拒绝请求的情况

package main

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	// 初始化一个限速器,每秒产生10个令牌,桶的大小为100个
	// 初始化状态桶是满的
	var limiter = rate.NewLimiter(10, 100)

	for i := 0; i < 20; i++ {
		if limiter.AllowN(time.Now(), 25) {
			fmt.Printf("%03d Ok  %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
		} else {
			fmt.Printf("%03d Err %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
		}
		time.Sleep(500 * time.Millisecond)
	}

}

输出

000 Ok  2020-03-27 16:17:18.604
001 Ok  2020-03-27 16:17:19.110
002 Ok  2020-03-27 16:17:19.612
003 Ok  2020-03-27 16:17:20.115
004 Err 2020-03-27 16:17:20.620
005 Ok  2020-03-27 16:17:21.121
006 Err 2020-03-27 16:17:21.626
007 Err 2020-03-27 16:17:22.127
008 Err 2020-03-27 16:17:22.632
009 Err 2020-03-27 16:17:23.133
010 Ok  2020-03-27 16:17:23.636
011 Err 2020-03-27 16:17:24.138
012 Err 2020-03-27 16:17:24.642
013 Err 2020-03-27 16:17:25.143
014 Err 2020-03-27 16:17:25.644
015 Ok  2020-03-27 16:17:26.147
016 Err 2020-03-27 16:17:26.649
017 Err 2020-03-27 16:17:27.152
018 Err 2020-03-27 16:17:27.653
019 Err 2020-03-27 16:17:28.156

2. Wait/WaitN

当使用Wait方法消费Token时,如果此时桶内Token数量不足(小于N),那么Wait方法将会阻塞一段时间,直至Token满足条件。否则直接返回。
// 可以看到Wait方法有一个context参数。我们可以设置context的Deadline或者Timeout,来决定此次Wait的最长时间。

func main() {
	// 指定令牌桶大小为5,每秒补充3个令牌
	limiter := rate.NewLimiter(3, 5)

	// 指定超时时间为5秒
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	for i := 0; ; i++ {
		fmt.Printf("%03d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))

		// 每次消费2个令牌
		err := limiter.WaitN(ctx, 2)
		if err != nil {
			fmt.Printf("timeout: %s\n", err.Error())
			return
		}
	}

	fmt.Println("main")
}

输出

000 2020-03-27 16:53:34.764
001 2020-03-27 16:53:34.764
002 2020-03-27 16:53:34.764
003 2020-03-27 16:53:35.100
004 2020-03-27 16:53:35.766
005 2020-03-27 16:53:36.434
006 2020-03-27 16:53:37.101
007 2020-03-27 16:53:37.770
008 2020-03-27 16:53:38.437
009 2020-03-27 16:53:39.101
timeout: rate: Wait(n=2) would exceed context deadline

3. Reserve/ReserveN

// 此方法有一点复杂,它返回的是一个*Reservation类型,后续操作主要针对的全是这个类型
// 判断限制器是否能够在指定时间提供指定N个请求令牌。
// 如果Reservation.OK()为true,则表示需要等待一段时间才可以提供,其中Reservation.Delay()返回需要的延时时间。
// 如果Reservation.OK()为false,则Delay返回InfDuration, 此时不想等待的话,可以调用 Cancel()取消此次操作并归还使用的token

func main() {
	// 指定令牌桶大小为5,每秒补充3个令牌
	limiter := rate.NewLimiter(3, 5)

	// 指定超时时间为5秒
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	for i := 0; ; i++ {
		fmt.Printf("%03d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
		reserve := limiter.Reserve()
		if !reserve.OK() {
			//返回是异常的,不能正常使用
			fmt.Println("Not allowed to act! Did you remember to set lim.burst to be > 0 ?")
			return
		}
		delayD := reserve.Delay()
		fmt.Println("sleep delay ", delayD)
		time.Sleep(delayD)
		select {
		case <-ctx.Done():
			fmt.Println("timeout, quit")
			return
		default:
		}
		//TODO 业务逻辑
	}

	fmt.Println("main")
}

输出

000 2020-03-27 16:57:23.135
sleep delay  0s
001 2020-03-27 16:57:23.135
sleep delay  0s
002 2020-03-27 16:57:23.135
sleep delay  0s
003 2020-03-27 16:57:23.135
sleep delay  0s
004 2020-03-27 16:57:23.135
sleep delay  0s
005 2020-03-27 16:57:23.135
sleep delay  333.292866ms
006 2020-03-27 16:57:23.474
sleep delay  328.197741ms
007 2020-03-27 16:57:23.804
sleep delay  331.211817ms
008 2020-03-27 16:57:24.136
sleep delay  332.779335ms
009 2020-03-27 16:57:24.473
sleep delay  328.952586ms
010 2020-03-27 16:57:24.806
sleep delay  329.620588ms
011 2020-03-27 16:57:25.136
sleep delay  332.404798ms
012 2020-03-27 16:57:25.474
sleep delay  328.456103ms
013 2020-03-27 16:57:25.803
sleep delay  331.34754ms
014 2020-03-27 16:57:26.136
sleep delay  332.285545ms
015 2020-03-27 16:57:26.473
sleep delay  328.673618ms
016 2020-03-27 16:57:26.803
sleep delay  332.296438ms
017 2020-03-27 16:57:27.137
sleep delay  332.201646ms
018 2020-03-27 16:57:27.474
sleep delay  328.312813ms
019 2020-03-27 16:57:27.803
sleep delay  332.210098ms
020 2020-03-27 16:57:28.136
sleep delay  332.854719ms
timeout, quit

参考资料

https://www.cyhone.com/articles/analisys-of-golang-rate/
https://zhuanlan.zhihu.com/p/100594314
https://www.jianshu.com/p/1ecb513f7632
https://studygolang.com/articles/10148

Golang中的两个定时器 ticker 和 timer

Golang中time包有两个定时器,分别为ticker 和 timer。两者都可以实现定时功能,但各自都有自己的使用场景。

区别

  • ticker定时器表示每隔一段时间就执行一次,一般可执行多次。
  • timer定时器表示在一段时间后执行,默认情况下只执行一次,如果想再次执行的话,每次都需要调用 time.Reset()方法,此时效果类似ticker定时器。同时也可以调用stop()方法取消定时器
  • timer定时器比ticker定时器多一个Reset()方法,两者都有Stop()方法,表示停止定时器,底层都调用了stopTimer()函数。

Ticker定时器

package main

import (
	"fmt"
	"time"
)

func main() {
    // Ticker 包含一个通道字段C,每隔时间段 d 就向该通道发送当时系统时间。
    // 它会调整时间间隔或者丢弃 tick 信息以适应反应慢的接收者。
    // 如果d <= 0会触发panic。关闭该 Ticker 可以释放相关资源。

	ticker1 := time.NewTicker(5 * time.Second)
	// 一定要调用Stop(),回收资源
	defer ticker1.Stop()
	go func(t *time.Ticker) {
		for {
			// 每5秒中从chan t.C 中读取一次
			<-t.C
			fmt.Println("Ticker:", time.Now().Format("2006-01-02 15:04:05"))
		}
	}(ticker1)

	time.Sleep(30 * time.Second)
	fmt.Println("ok")
}

执行结果

开始时间: 2020-03-19 17:49:41
Ticker: 2020-03-19 17:49:46
Ticker: 2020-03-19 17:49:51
Ticker: 2020-03-19 17:49:56
Ticker: 2020-03-19 17:50:01
Ticker: 2020-03-19 17:50:06
结束时间: 2020-03-19 17:50:11
ok

可以看到每次执行的时间间隔都是一样的。

Timer定时器

package main

import (
	"fmt"
	"time"
)

func main() {

	// NewTimer 创建一个 Timer,它会在最少过去时间段 d 后到期,向其自身的 C 字段发送当时的时间
	timer1 := time.NewTimer(5 * time.Second)

	fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
	go func(t *time.Timer) {
		times := 0
		for {
			<-t.C
			fmt.Println("timer", time.Now().Format("2006-01-02 15:04:05"))

			// 从t.C中获取数据,此时time.Timer定时器结束。如果想再次调用定时器,只能通过调用 Reset() 函数来执行
			// Reset 使 t 重新开始计时,(本方法返回后再)等待时间段 d 过去后到期。
			// 如果调用时 t 还在等待中会返回真;如果 t已经到期或者被停止了会返回假。
			times++
			// 调用 reset 重发数据到chan C
			fmt.Println("调用 reset 重新设置一次timer定时器,并将时间修改为2秒")
			t.Reset(2 * time.Second)
			if times > 3 {
				fmt.Println("调用 stop 停止定时器")
				t.Stop()
			}
		}
	}(timer1)

	time.Sleep(30 * time.Second)
	fmt.Println("结束时间:", time.Now().Format("2006-01-02 15:04:05"))
	fmt.Println("ok")
}

执行结果

开始时间: 2020-03-19 17:41:59
timer 2020-03-19 17:42:04
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
timer 2020-03-19 17:42:06
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
timer 2020-03-19 17:42:08
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
timer 2020-03-19 17:42:10
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
调用 stop 停止定时器
结束时间: 2020-03-19 17:42:29
ok

可以看到,第一次执行时间为5秒以后。然后通过调用 time.Reset() 方法再次激活定时器,定时时间为2秒,最后通过调用time.Stop()把前面的定时器取消掉

注意事项

1. 这里需要注意的时,如果在调用 time.Reset() 或time.Stop() 的时候,timer已经过期或者停止了,则会返回false。


func main() {
	// timer 过期
	timer := time.NewTimer(2 * time.Second)
	time.Sleep(3 * time.Second)
	ret := timer.Reset(2 * time.Second)
	fmt.Println(ret)

	// timer 停止
	timer = time.NewTimer(2 * time.Second)
	timer.Stop()
	ret = timer.Reset(2 * time.Second)
	fmt.Println(ret)

	fmt.Println("ok")
}

执行结果

false
false
ok

2. 如果调用 time.Stop() 时,timer已过期或已stop,则并不会关闭通道。

3. 使用time.NewTicker() 定时器时,需要使用Stop()方法进行资源释放,否则会产生内存泄漏,(Stop the ticker to release associated resources.)

认识虚拟内存

什么是虚拟内存

虚拟内存是计算机系统内存管理的一种技术。它使得应用程序认为它拥有连续的可用的内存(一个连续完整的地址空间),而实际上,它通常是被分隔成多个物理内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换。目前,大多数操作系统都使用了虚拟内存,如Windows家族的“虚拟内存”;Linux的“交换空间”等。

为什么需要虚拟内存

我们知道程序执行指令的时候,程序计数器是顺序地一条一条指令执行下去,这一条条指令就需要连续地存储在一起,所以就需要这块内存是连续的。物理内存是有限的,如果多个程序同时运行的话,访问同一个物理地址的话,就有可能内存地址冲突,怎么办呢?

这时就需要虚拟内存发挥的作用了,程序里有指令和各种内存地址,系统从物理内存申请一段地址,与这个程序指令里用到的内存地址建立映射关系,这样实际程序指令执行的时候,会通过虚拟内存地址,找到对应的物理内存地址执行。对于任何一个程序来说,它看到的都是同样的内存地址。我们只需要维护一个虚拟内存到物理内存的映射表即可。

这种从物理内存申请一段地址建立映射的方法,我们称其为内存分段

看似解决了上面的问题,但这里又引起了新的问题,即内存碎片。由于物理内存每次申请一段地址的时候,都是连接的,如果有三个程序分别执行,中间的程序执行完后,内存也释放回了可用物理内存,此时又来第四个程序,有可能出现内存不足的情况,因为物理内存已经没有了连续的可用地址,如下图所求,共1G内存的情况。

57211af3053ed621aeb903433c6c10d1

假如程序X需要内存为256MB的话,可以看到两个128MB内存是非连续的,程序无法运行,白白浪费掉了中间的128MB空间,这种情况就是内存碎片

怎么解决呢,只能将Python程序运行信息先存储到硬盘上,先释放最下面的256MB空间,然后再重新载入,这样会腾出256MB的空间。

这种将页面写入磁盘的行为我们称为换出,从磁盘载入内存称为换入。

虽然解决了内存碎片的总是,但由于内存和磁盘两者的速度相差实在太大了,看来这种办法效率太差了,还有其它好的办法没有呢?

内存分页

从上面的介绍可以看出主要问题是因为内存碎片内存交换粒度太大,导致大量的内存交换成本过高,如果我们把这个交换粒度变小一些是不是就好多了呢。

我们将物理内存和虚拟内存都按固定大小进行分页,从虚拟内存到物理内存的映射由原来的段映射调整为了按页映射。

由于内存空间都是预先划分好的,也就没有了不能使用的碎片,而只有被释放出来的很多 4KB 的页。即使内存空间不够,需要让现有的、正在运行的其他程序,通过内存交换释放出一些内存的页出来,一次性写入磁盘的也只有少数的一个页或者几个页,不会花太多时间,让整个机器被内存交换的过程给卡住。

内存映射

上面我们讲过程序运行时,需要将虚拟内存地址即程序指令中用到的地址通过页表来转换成对应的物理内存地址,那么两者之间又是如何实现转换的呢?

每个进程都对应自己的虚拟地址,虚拟地址可分为“虚拟页号”和“偏移量”两部分。

我们的程序看到的内存地址,都是虚拟内存地址。

程序运行时通过三步来实现数据读取

  1. 读取虚拟内存地址的“虚拟页号”值
  2. 在“页表”中根据虚拟页号查找对应的物理地址页号(每个页表,保存每个物理内存页的起始地址)
  3. 将物理内存页基地址+虚拟内存地址的偏移量得到真正的物理内存地址
07cd4c3344690055240f215404a286dd

在加载程序的时候,不再需要一次性都把程序加载到物理内存中。我们完全可以在进行虚拟内存和物理内存的页之间的映射之后,并不真的把页加载到物理内存里,而是只在程序运行中,需要用到对应虚拟内存页里面的指令和数据时,再加载到物理内存里面去。

如果发现尚未为虚拟内存分配物理内存,则申请新的内存,此现象称为“缺页”,等使用完后再释放内存。

对于物理内存,操作系统把它分成一块一块大小相同的页,这样更方便管理,例如有的内存页面长时间不用了,可以暂时写到硬盘上,称为换出。一旦需要的时候,再加载进来,叫作换入。这样可以扩大可用物理内存的大小,提高物理内存的利用率。其中换入与换出是以页为单元,每页大小一般为4KB。

0cf2f08e1ceda473df71189334857cf0

每个进程都有操作系统为自己分配的一块地址连接的虚拟内存,同时也有自己的页表。页表中所有页表项必须提前建好,并且要求是连续的。如果不连续,就没有办法通过虚拟地址里面的页号找到对应的页表项了。

为了节省内存,则使用一种类似于多叉树的“多级页表”数据结构来存储虚拟内存和物理内存的对应关系。

多级页表将虚拟内存的“虚拟页号“分成了4段,从高到低,分成 4 级到 1 级这样 4 个页表索引。

614034116a840ef565feda078d73cb76
5ba17a3ecf3f9ce4a65546de480fcc4e

每级页表包含多个条目,每级条目又都保存了下级条目的地址,按4-1级顺序查找虚拟内存对应的物理页基地址。

参考

https://time.geekbang.org/column/article/95209

https://time.geekbang.org/column/article/110474

https://time.geekbang.org/column/article/95223

https://www.cnblogs.com/zhenbianshu/p/10300769.html