Golang中的限速器 time/rate

在高并发的系统中,限流已作为必不可少的功能,而常见的限流算法有:计数器、滑动窗口、令牌桶、漏斗(漏桶)。其中滑动窗口算法、令牌桶和漏斗算法应用最为广泛。

常见限流算法

这里不再对计数器算法和滑动窗口作介绍了,有兴趣的同学可以参考其它相关文章。

漏斗算法

非常很好理解,就像有一个漏斗容器一样,漏斗上面一直往容器里倒水(请求),漏斗下方以固定速率一直流出(消费)。如果漏斗容器满的情况下,再倒入的水就会溢出,此时表示新的请求将被丢弃。可以看到这种算法在应对大的突发流量时,会造成部分请求弃用丢失。

可以看出漏斗算法能强行限制数据的传输速率。

漏斗算法

令牌桶算法

从某种意义上来说,令牌算法是对漏斗算法的一种改进。对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发情况。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。

令牌桶算法是指一个固定大小的桶,可以存放的令牌的最大个数也是固定的。此算法以一种固定速率不断的往桶中存放令牌,而每次请求调用前必须先从桶中获取令牌才可以。否则进行拒绝或等待,直到获取到有效令牌为止。如果桶内的令牌数量已达到桶的最大允许上限的话,则丢弃令牌。

Golang中的限制算法

Golang标准库中的限制算法是基于令牌桶算法(Token Bucket) 实现的,库名为golang.org/x/time/rate

对于限流器的消费方式有三种,分别为 Allow()、 Wait()和 Reserve()。前两种内部调用的都是Reserve() ,每个都对应一个XXXN()的方法。如Allow()是AllowN(t, 1)的简写方式。

结构体

type Limiter struct {
	limit Limit
	burst int

	mu     sync.Mutex
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}

主要用来限速控制并发事件,采用令牌池算法实现。

创建限速器

使用 NewLimiter(r Limit, b int) 函数创建限速器,令牌桶容量为b。初始化状态下桶是满的,即桶里装有b 个令牌,以后再以每秒往里面填充 r 个令牌。

func NewLimiter(r Limit, b int) *Limiter {
	return &Limiter{
		limit: r,
		burst: b,
	}
}

允许声明容量为0的限速器,此时将以拒绝所有事件操作。

// As a special case, if r == Inf (the infinite rate), b is ignored.
有一种特殊情况,就是 r == Inf 时,此时b参数将被忽略。

// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)

Limiter 提供了三个主要函数 Allow, Reserve, 和 Wait. 大部分时候使用Wait。其中 AllowN, ReserveN 和 WaitN 允许消费n个令牌。

每个方法都可以消费一个令牌,当没有可用令牌时,三个方法的处理方式不一样

  • 如果没有令牌时,Allow 返回 false。
  • 如果没有令牌时,Wait 会阻塞走到有令牌可用或者超时取消(context.Context)。
  • 如果没有令牌时,Reserve 返回一个 reservation,以便token的预订时,调用之前必须等待一段时间。

1. Allow/AllowN

AllowN方法表示,截止在某一时刻,目前桶中数目是否至少为n个。如果条件满足,则从桶中消费n个token,同时返回true。反之不消费Token,返回false。

使用场景:一般用在如果请求速率过快,直接拒绝请求的情况

package main

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	// 初始化一个限速器,每秒产生10个令牌,桶的大小为100个
	// 初始化状态桶是满的
	var limiter = rate.NewLimiter(10, 100)

	for i := 0; i < 20; i++ {
		if limiter.AllowN(time.Now(), 25) {
			fmt.Printf("%03d Ok  %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
		} else {
			fmt.Printf("%03d Err %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
		}
		time.Sleep(500 * time.Millisecond)
	}

}

输出

000 Ok  2020-03-27 16:17:18.604
001 Ok  2020-03-27 16:17:19.110
002 Ok  2020-03-27 16:17:19.612
003 Ok  2020-03-27 16:17:20.115
004 Err 2020-03-27 16:17:20.620
005 Ok  2020-03-27 16:17:21.121
006 Err 2020-03-27 16:17:21.626
007 Err 2020-03-27 16:17:22.127
008 Err 2020-03-27 16:17:22.632
009 Err 2020-03-27 16:17:23.133
010 Ok  2020-03-27 16:17:23.636
011 Err 2020-03-27 16:17:24.138
012 Err 2020-03-27 16:17:24.642
013 Err 2020-03-27 16:17:25.143
014 Err 2020-03-27 16:17:25.644
015 Ok  2020-03-27 16:17:26.147
016 Err 2020-03-27 16:17:26.649
017 Err 2020-03-27 16:17:27.152
018 Err 2020-03-27 16:17:27.653
019 Err 2020-03-27 16:17:28.156

2. Wait/WaitN

当使用Wait方法消费Token时,如果此时桶内Token数量不足(小于N),那么Wait方法将会阻塞一段时间,直至Token满足条件。否则直接返回。
// 可以看到Wait方法有一个context参数。我们可以设置context的Deadline或者Timeout,来决定此次Wait的最长时间。

func main() {
	// 指定令牌桶大小为5,每秒补充3个令牌
	limiter := rate.NewLimiter(3, 5)

	// 指定超时时间为5秒
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	for i := 0; ; i++ {
		fmt.Printf("%03d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))

		// 每次消费2个令牌
		err := limiter.WaitN(ctx, 2)
		if err != nil {
			fmt.Printf("timeout: %s\n", err.Error())
			return
		}
	}

	fmt.Println("main")
}

输出

000 2020-03-27 16:53:34.764
001 2020-03-27 16:53:34.764
002 2020-03-27 16:53:34.764
003 2020-03-27 16:53:35.100
004 2020-03-27 16:53:35.766
005 2020-03-27 16:53:36.434
006 2020-03-27 16:53:37.101
007 2020-03-27 16:53:37.770
008 2020-03-27 16:53:38.437
009 2020-03-27 16:53:39.101
timeout: rate: Wait(n=2) would exceed context deadline

3. Reserve/ReserveN

// 此方法有一点复杂,它返回的是一个*Reservation类型,后续操作主要针对的全是这个类型
// 判断限制器是否能够在指定时间提供指定N个请求令牌。
// 如果Reservation.OK()为true,则表示需要等待一段时间才可以提供,其中Reservation.Delay()返回需要的延时时间。
// 如果Reservation.OK()为false,则Delay返回InfDuration, 此时不想等待的话,可以调用 Cancel()取消此次操作并归还使用的token

func main() {
	// 指定令牌桶大小为5,每秒补充3个令牌
	limiter := rate.NewLimiter(3, 5)

	// 指定超时时间为5秒
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	for i := 0; ; i++ {
		fmt.Printf("%03d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
		reserve := limiter.Reserve()
		if !reserve.OK() {
			//返回是异常的,不能正常使用
			fmt.Println("Not allowed to act! Did you remember to set lim.burst to be > 0 ?")
			return
		}
		delayD := reserve.Delay()
		fmt.Println("sleep delay ", delayD)
		time.Sleep(delayD)
		select {
		case <-ctx.Done():
			fmt.Println("timeout, quit")
			return
		default:
		}
		//TODO 业务逻辑
	}

	fmt.Println("main")
}

输出

000 2020-03-27 16:57:23.135
sleep delay  0s
001 2020-03-27 16:57:23.135
sleep delay  0s
002 2020-03-27 16:57:23.135
sleep delay  0s
003 2020-03-27 16:57:23.135
sleep delay  0s
004 2020-03-27 16:57:23.135
sleep delay  0s
005 2020-03-27 16:57:23.135
sleep delay  333.292866ms
006 2020-03-27 16:57:23.474
sleep delay  328.197741ms
007 2020-03-27 16:57:23.804
sleep delay  331.211817ms
008 2020-03-27 16:57:24.136
sleep delay  332.779335ms
009 2020-03-27 16:57:24.473
sleep delay  328.952586ms
010 2020-03-27 16:57:24.806
sleep delay  329.620588ms
011 2020-03-27 16:57:25.136
sleep delay  332.404798ms
012 2020-03-27 16:57:25.474
sleep delay  328.456103ms
013 2020-03-27 16:57:25.803
sleep delay  331.34754ms
014 2020-03-27 16:57:26.136
sleep delay  332.285545ms
015 2020-03-27 16:57:26.473
sleep delay  328.673618ms
016 2020-03-27 16:57:26.803
sleep delay  332.296438ms
017 2020-03-27 16:57:27.137
sleep delay  332.201646ms
018 2020-03-27 16:57:27.474
sleep delay  328.312813ms
019 2020-03-27 16:57:27.803
sleep delay  332.210098ms
020 2020-03-27 16:57:28.136
sleep delay  332.854719ms
timeout, quit

参考资料

https://www.cyhone.com/articles/analisys-of-golang-rate/
https://zhuanlan.zhihu.com/p/100594314
https://www.jianshu.com/p/1ecb513f7632
https://studygolang.com/articles/10148

Golang中的两个定时器 ticker 和 timer

Golang中time包有两个定时器,分别为ticker 和 timer。两者都可以实现定时功能,但各自都有自己的使用场景。

区别

  • ticker定时器表示每隔一段时间就执行一次,一般可执行多次。
  • timer定时器表示在一段时间后执行,默认情况下只执行一次,如果想再次执行的话,每次都需要调用 time.Reset()方法,此时效果类似ticker定时器。同时也可以调用stop()方法取消定时器
  • timer定时器比ticker定时器多一个Reset()方法,两者都有Stop()方法,表示停止定时器,底层都调用了stopTimer()函数。

Ticker定时器

package main

import (
	"fmt"
	"time"
)

func main() {
    // Ticker 包含一个通道字段C,每隔时间段 d 就向该通道发送当时系统时间。
    // 它会调整时间间隔或者丢弃 tick 信息以适应反应慢的接收者。
    // 如果d <= 0会触发panic。关闭该 Ticker 可以释放相关资源。

	ticker1 := time.NewTicker(5 * time.Second)
	// 一定要调用Stop(),回收资源
	defer ticker1.Stop()
	go func(t *time.Ticker) {
		for {
			// 每5秒中从chan t.C 中读取一次
			<-t.C
			fmt.Println("Ticker:", time.Now().Format("2006-01-02 15:04:05"))
		}
	}(ticker1)

	time.Sleep(30 * time.Second)
	fmt.Println("ok")
}

执行结果

开始时间: 2020-03-19 17:49:41
Ticker: 2020-03-19 17:49:46
Ticker: 2020-03-19 17:49:51
Ticker: 2020-03-19 17:49:56
Ticker: 2020-03-19 17:50:01
Ticker: 2020-03-19 17:50:06
结束时间: 2020-03-19 17:50:11
ok

可以看到每次执行的时间间隔都是一样的。

Timer定时器

package main

import (
	"fmt"
	"time"
)

func main() {

	// NewTimer 创建一个 Timer,它会在最少过去时间段 d 后到期,向其自身的 C 字段发送当时的时间
	timer1 := time.NewTimer(5 * time.Second)

	fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
	go func(t *time.Timer) {
		times := 0
		for {
			<-t.C
			fmt.Println("timer", time.Now().Format("2006-01-02 15:04:05"))

			// 从t.C中获取数据,此时time.Timer定时器结束。如果想再次调用定时器,只能通过调用 Reset() 函数来执行
			// Reset 使 t 重新开始计时,(本方法返回后再)等待时间段 d 过去后到期。
			// 如果调用时 t 还在等待中会返回真;如果 t已经到期或者被停止了会返回假。
			times++
			// 调用 reset 重发数据到chan C
			fmt.Println("调用 reset 重新设置一次timer定时器,并将时间修改为2秒")
			t.Reset(2 * time.Second)
			if times > 3 {
				fmt.Println("调用 stop 停止定时器")
				t.Stop()
			}
		}
	}(timer1)

	time.Sleep(30 * time.Second)
	fmt.Println("结束时间:", time.Now().Format("2006-01-02 15:04:05"))
	fmt.Println("ok")
}

执行结果

开始时间: 2020-03-19 17:41:59
timer 2020-03-19 17:42:04
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
timer 2020-03-19 17:42:06
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
timer 2020-03-19 17:42:08
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
timer 2020-03-19 17:42:10
调用 reset 重新设置一次timer定时器,并将时间修改为2秒
调用 stop 停止定时器
结束时间: 2020-03-19 17:42:29
ok

可以看到,第一次执行时间为5秒以后。然后通过调用 time.Reset() 方法再次激活定时器,定时时间为2秒,最后通过调用time.Stop()把前面的定时器取消掉

注意事项

1. 这里需要注意的时,如果在调用 time.Reset() 或time.Stop() 的时候,timer已经过期或者停止了,则会返回false。


func main() {
	// timer 过期
	timer := time.NewTimer(2 * time.Second)
	time.Sleep(3 * time.Second)
	ret := timer.Reset(2 * time.Second)
	fmt.Println(ret)

	// timer 停止
	timer = time.NewTimer(2 * time.Second)
	timer.Stop()
	ret = timer.Reset(2 * time.Second)
	fmt.Println(ret)

	fmt.Println("ok")
}

执行结果

false
false
ok

2. 如果调用 time.Stop() 时,timer已过期或已stop,则并不会关闭通道。

3. 使用time.NewTicker() 定时器时,需要使用Stop()方法进行资源释放,否则会产生内存泄漏,(Stop the ticker to release associated resources.)

认识虚拟内存

什么是虚拟内存

虚拟内存是计算机系统内存管理的一种技术。它使得应用程序认为它拥有连续的可用的内存(一个连续完整的地址空间),而实际上,它通常是被分隔成多个物理内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换。目前,大多数操作系统都使用了虚拟内存,如Windows家族的“虚拟内存”;Linux的“交换空间”等。

为什么需要虚拟内存

我们知道程序执行指令的时候,程序计数器是顺序地一条一条指令执行下去,这一条条指令就需要连续地存储在一起,所以就需要这块内存是连续的。物理内存是有限的,如果多个程序同时运行的话,访问同一个物理地址的话,就有可能内存地址冲突,怎么办呢?

这时就需要虚拟内存发挥的作用了,程序里有指令和各种内存地址,系统从物理内存申请一段地址,与这个程序指令里用到的内存地址建立映射关系,这样实际程序指令执行的时候,会通过虚拟内存地址,找到对应的物理内存地址执行。对于任何一个程序来说,它看到的都是同样的内存地址。我们只需要维护一个虚拟内存到物理内存的映射表即可。

这种从物理内存申请一段地址建立映射的方法,我们称其为内存分段

看似解决了上面的问题,但这里又引起了新的问题,即内存碎片。由于物理内存每次申请一段地址的时候,都是连接的,如果有三个程序分别执行,中间的程序执行完后,内存也释放回了可用物理内存,此时又来第四个程序,有可能出现内存不足的情况,因为物理内存已经没有了连续的可用地址,如下图所求,共1G内存的情况。

假如程序X需要内存为256MB的话,可以看到两个128MB内存是非连续的,程序无法运行,白白浪费掉了中间的128MB空间,这种情况就是内存碎片

怎么解决呢,只能将Python程序运行信息先存储到硬盘上,先释放最下面的256MB空间,然后再重新载入,这样会腾出256MB的空间。

这种将页面写入磁盘的行为我们称为换出,从磁盘载入内存称为换入。

虽然解决了内存碎片的总是,但由于内存和磁盘两者的速度相差实在太大了,看来这种办法效率太差了,还有其它好的办法没有呢?

内存分页

从上面的介绍可以看出主要问题是因为内存碎片内存交换粒度太大,导致大量的内存交换成本过高,如果我们把这个交换粒度变小一些是不是就好多了呢。

我们将物理内存和虚拟内存都按固定大小进行分页,从虚拟内存到物理内存的映射由原来的段映射调整为了按页映射。

由于内存空间都是预先划分好的,也就没有了不能使用的碎片,而只有被释放出来的很多 4KB 的页。即使内存空间不够,需要让现有的、正在运行的其他程序,通过内存交换释放出一些内存的页出来,一次性写入磁盘的也只有少数的一个页或者几个页,不会花太多时间,让整个机器被内存交换的过程给卡住。

内存映射

上面我们讲过程序运行时,需要将虚拟内存地址即程序指令中用到的地址通过页表来转换成对应的物理内存地址,那么两者之间又是如何实现转换的呢?

每个进程都对应自己的虚拟地址,虚拟地址可分为“虚拟页号”和“偏移量”两部分。

我们的程序看到的内存地址,都是虚拟内存地址。

程序运行时通过三步来实现数据读取

  1. 读取虚拟内存地址的“虚拟页号”值
  2. 在“页表”中根据虚拟页号查找对应的物理地址页号(每个页表,保存每个物理内存页的起始地址)
  3. 将物理内存页基地址+虚拟内存地址的偏移量得到真正的物理内存地址

在加载程序的时候,不再需要一次性都把程序加载到物理内存中。我们完全可以在进行虚拟内存和物理内存的页之间的映射之后,并不真的把页加载到物理内存里,而是只在程序运行中,需要用到对应虚拟内存页里面的指令和数据时,再加载到物理内存里面去。

如果发现尚未为虚拟内存分配物理内存,则申请新的内存,此现象称为“缺页”,等使用完后再释放内存。

对于物理内存,操作系统把它分成一块一块大小相同的页,这样更方便管理,例如有的内存页面长时间不用了,可以暂时写到硬盘上,称为换出。一旦需要的时候,再加载进来,叫作换入。这样可以扩大可用物理内存的大小,提高物理内存的利用率。其中换入与换出是以页为单元,每页大小一般为4KB。

每个进程都有操作系统为自己分配的一块地址连接的虚拟内存,同时也有自己的页表。页表中所有页表项必须提前建好,并且要求是连续的。如果不连续,就没有办法通过虚拟地址里面的页号找到对应的页表项了。

为了节省内存,则使用一种类似于多叉树的“多级页表”数据结构来存储虚拟内存和物理内存的对应关系。

多级页表将虚拟内存的“虚拟页号“分成了4段,从高到低,分成 4 级到 1 级这样 4 个页表索引。

每级页表包含多个条目,每级条目又都保存了下级条目的地址,按4-1级顺序查找虚拟内存对应的物理页基地址。

参考

https://time.geekbang.org/column/article/95209

https://time.geekbang.org/column/article/110474

https://time.geekbang.org/column/article/95223

https://www.cnblogs.com/zhenbianshu/p/10300769.html

Golang中关于defer语句理解的一道题

示例

我们先看一下源代码

package main

import "fmt"

func f(n int) (r int) {
	defer func() {
		r += n
		recover()
	}()

	var fc func()
	defer fc()
	fc = func() {
		r += 2
	}

	return n + 1
}

func main() {
	fmt.Println(f(3))
}

大家感觉着打印的值是多少呢?5、9还是7?执行完以后发现是7。好像与多数理解的有些出入,为什么是7,而不是9呢。下面我们来分析一下。

问题分析

对于defer执行的顺序是FIFO这一点都很清楚,我们只需要看搞懂f()函数的执行顺序就行了。

执行顺序为:

  1. 注册第1个defer 函数, 这里为匿名函数,函数体为 “func() { r += n recover() }()”,内部对应一个函数指针。这里延时函数所有相关的操作一步完成。
  2. 注册第2个defer函数,函数名为fc(),无函数体, 函数指针为nil(也有可能指针不会空,但指针指向的内容非函数体类型)。由于只是注册操作还未执行,所以并不会产生错误,继续执行。
  3. 对上面声明的函数进行函数体定义
  4. 执行return 语句
  5. 处理defer语句,根据FIFO原则,首先执行第二个函数fc(),发现函数指针为nil,此时会抛出一个恐慌,并继续操作。
  6. 执行第一个defer函数,对r值进行操作,同时处理恐慌。由于是最后一个defer语句,所以直接将r的值真正返回

可以看到上面第2、3步骤,是先注册的defer函数(函数不存在,所以指针为nil),再进行的函数体定义,导致第二个defer延时函数执行时产生恐慌,后面对函数体的单独定义没有任何意义,大家可以将此函数删除再次运行会发生没有任何问题,直到第一个defer函数对此处理并返回r值结束。

如果打印恐慌错误信息的话,会输出“runtime error: invalid memory address or nil pointer dereference”。

如果我们将 defer fc()函数函数体定义的下方,则完全不会产生恐慌,此时两个defer都会正常执行,最后的结果为9。

修正后的代码

package main

import "fmt"

func f(n int) (r int) {
	defer func() {
		r += n
		// recover()
		if err := recover(); err != nil {
			fmt.Println(err)
		}
	}()

	var fc func()
	// defer fc()
	fc = func() {
		r += 2
	}
	defer fc()

	return n + 1
}

func main() {
	fmt.Println(f(3))
}

总结:

  • defer延时函数最好使用匿名函数来处理,越简单越好
  • defer语句只执行的时候才会产生恐慌,定义时不会产生。
  • 另外如果在注册defer函数的时候,存在非固定的值,则需要先计算出来值,再进行延时函数注册,如 defer sum(1, sum(10, 20)),自己动手试一下值是多少。

开发者必知redis知识点


剖析Redis常用数据类型对应的数据结构

redis常用有哪些数据类型及每种数据类型的使用场景有哪些

如果存储一个JSON数据时,选择hash还是string 存储数据?


redis与memcache的区别


redis支持多CPU吗?如何发挥多cpu?


redis为什么这么快?底层设计原理说明


redis有哪几种持久化方式?分别有什么不同?

redis为单线程还是多线程,为什么这样设计?

redis中用到哪些数据结构和算法?

redis中用到的IO多路复用机制如何理解?

redis的过期策略有哪些?


redis高可用方案有哪些?


redis中的分布式锁如何理解


reids中的RedLock了解过吗?介绍一下


redis的分区

高可用性中的一致性算法原理

golang中有关select的几个知识点

golang中的select语句格式如下

select { 
    case <-ch1:
        // 如果从 ch1 信道成功接收数据,则执行该分支代码
    case ch2 <- 1:
        // 如果成功向 ch2 信道成功发送数据,则执行该分支代码 
    default:
        // 如果上面都没有成功,则进入 default 分支处理流程 
}

可以看到select的语法结构有点类似于switch,但又有些不同。

select里的case后面并不带判断条件,而是一个信道的操作,不同于switch里的case,对于从其它语言转过来的开发者来说有些需要特别注意的地方。

golang 的 select 就是监听 IO 操作,当 IO 操作发生时,触发相应的动作每个case语句里必须是一个IO操作,确切的说,应该是一个面向channel的IO操作。

注:Go 语言的 select 语句借鉴自 Unix 的 select() 函数,在 Unix 中,可以通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了 IO 动作,该 select() 调用就会被返回(C 语言中就是这么做的),后来该机制也被用于实现高并发的 Socket 服务器程序。Go 语言直接在语言级别支持 select关键字,用于处理并发编程中通道之间异步 IO 通信问题。

注意:如果 ch1 或者 ch2 信道都阻塞的话,就会立即进入 default 分支,并不会阻塞。但是如果没有 default 语句,则会阻塞直到某个信道操作成功为止。

知识点

  1. select语句只能用于信道的读写操作
  2. select中的case条件(非阻塞)是并发执行的,select会选择先操作成功的那个case条件去执行,如果多个同时返回,则随机选择一个执行,此时将无法保证执行顺序。对于阻塞的case语句会直到其中有信道可以操作,如果有多个信道可操作,会随机选择其中一个 case 执行
  3. 对于case条件语句中,如果存在信道值为nil的读写操作,则该分支将被忽略,可以理解为从select语句中删除了这个case语句
  4. 如果有超时条件语句,判断逻辑为如果在这个时间段内一直没有满足条件的case,则执行这个超时case。如果此段时间内出现了可操作的case,则直接执行这个case。一般用超时语句代替了default语句
  5. 对于空的select{},会引起死锁

下面列出每种情况的示例代码

Continue reading

golang中的sync.Pool对象缓存

参考文章

知识点

  • golang1.13版本对Pool进行了优化,结构体添加了两个字段 victim 和 victimSize。
  • 适应于通过复用,降低复杂对象的创建和GC代价的场景
  • 每个sync.Pool的生命周期为两次GC中间时段才有效,因为init()的时候会注册一个PoolCleanup函数,他会在gc时清除掉sync.Pool中的所有的缓存的对象。可以手动进行gc操作 runtime.GC()
  • 由于生命周期受GC的影响,一定不要用于数据库连接池这类的应用场景,它只是一个缓存。
  • 由于要保证协程安全,所以会有锁的开销
  • 每个Pool都有一个私有池(协程安全)和共享池(协程不安全),其中私有池只有存放一个值。
    1. 每次Get()时会先从当前P的私有池private中获取(MPG模型
    2. 如果获取失败,再从当前P的共享池share中获取
    3. 如果仍失败,则从其它P中共享池中拿一个,需要加锁保证协程安全
    4. 如果还失败,则表示所有P中的池(也有可能只是共享池)都为空,则需要New()一个并直接返回(此时不会被放入池中)
    每次取值出来后,会从原来的地方删除掉该值。

golang 的编程模式之“功能选项”

最近在用go重构iot中的一个服务时,发现库 rocketmq-client-go@v2.0.0-rc1 在初始化消费客户端实现时,实现的极其优雅,代码见https://github.com/apache/rocketmq-client-go/blob/v2.0.0-rc1/examples/consumer/simple/main.go#L32

c, _ := rocketmq.NewPushConsumer(
    consumer.WithGroupName("testGroup"),
    consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
    msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    for i := range msgs {
        fmt.Printf("subscribe callback: %v \n", msgs[i])
    }

    return consumer.ConsumeSuccess, nil
})

这里创建结构体 rocketmq.NewPushConsumer() 的时候,与我们平时的写法不同并没有写死结构体的字段名和值,而是每个属性都使用了一个函数来实现了,同时也不用考虑属性字段的位置关系,比起以前写kv键值对的方法实在是太灵活了。
我们再看一下其中一个WithGroupName()函数的实现方法

func WithGroupName(group string) Option {
    return func(opts *consumerOptions) {
        if group == "" {
            return
        }
        opts.GroupName = group
    }
}

传递的参数为consumerOptions指针类型,这里用到了一个匿名函数,返回的类型为Option(定义 type Option func(*consumerOptions) )。看到这里大概明白实现原理了吧。
为了确认我们的判断,我们再看一下 rocketmq.NewPushConsumer()函数

func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) {
	return consumer.NewPushConsumer(opts...)
}

这里直接调用了另一个 consumer包里的 NewPushConsumer() 函数,其内容如下( 为了方便理解,在代码里直接加了注释)

// opts 为不定参数
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
    // defaultPushConsumerOptions 见 https://github.com/apache/rocketmq-client-go/blob/7308bc94369320195652243059f63c71bfafc74b/consumer/option.go#L109
	defaultOpts := defaultPushConsumerOptions()

    // 实现动态的给 defaultOpts 属性赋值
	for _, apply := range opts {
        // 重点!重点!重点!传递的是一个指针
        // apply 是一个以 WithXxx 开头的函数的返回值即匿名函数,如
        // func WithGroupName(group string) Option{
        //  return func(opts *consumerOptions) {
        //    if group == "" {
        //      return
        //    }
        //    opts.GroupName = group
        //   }
        // }
		apply(&defaultOpts)
	}
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
	if err != nil {
		return nil, errors.Wrap(err, "new Namesrv failed.")
	}
	if !defaultOpts.Credentials.IsEmpty() {
		srvs.SetCredentials(defaultOpts.Credentials)
	}
	defaultOpts.Namesrv = srvs

	if defaultOpts.Namespace != "" {
		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
	}

	dc := &defaultConsumer{
		client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
		consumerGroup:  defaultOpts.GroupName,
		cType:          _PushConsume,
		state:          int32(internal.StateCreateJust),
		prCh:           make(chan PullRequest, 4),
		model:          defaultOpts.ConsumerModel,
		consumeOrderly: defaultOpts.ConsumeOrderly,
		fromWhere:      defaultOpts.FromWhere,
		allocate:       defaultOpts.Strategy,
		option:         defaultOpts,
		namesrv:        srvs,
	}

	p := &pushConsumer{
		defaultConsumer: dc,
		subscribedTopic: make(map[string]string, 0),
		queueLock:       newQueueLock(),
		done:            make(chan struct{}, 1),
		consumeFunc:     utils.NewSet(),
	}
	dc.mqChanged = p.messageQueueChanged
	if p.consumeOrderly {
		p.submitToConsume = p.consumeMessageOrderly
	} else {
		p.submitToConsume = p.consumeMessageCurrently
	}

	p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)

	return p, nil
}

其中 defaultPushConsumerOptions 定义如下

func defaultPushConsumerOptions() consumerOptions {
	opts := consumerOptions{
        // ClientOptions 重点字段
		ClientOptions:              internal.DefaultClientOptions(),

		Strategy:                   AllocateByAveragely,
		MaxTimeConsumeContinuously: time.Duration(60 * time.Second),
		RebalanceLockInterval:      20 * time.Second,
		MaxReconsumeTimes:          -1,
		ConsumerModel:              Clustering,
		AutoCommit:                 true,
	}

    // 这里只对 GroupName 属性进行了初始化,未指定的则使用结构体 ClientOptions 字段类型的默认值
	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
	return opts
}

同时他有诸多以 WithXxx 开头的方法体,如 WithGroupName()、WithNameServer()、WithInstance()。

我们再找到 consumerOptions 结构体的定义,最终找到定义如下

type ClientOptions struct {
	GroupName         string
	NameServerAddrs   primitive.NamesrvAddr
	NameServerDomain  string
	Namesrv           *namesrvs
	ClientIP          string
	InstanceName      string
	UnitMode          bool
	UnitName          string
	VIPChannelEnabled bool
	RetryTimes        int
	Interceptors      []primitive.Interceptor
	Credentials       primitive.Credentials
	Namespace         string
}

发现这些才是我们平时使用的属性字段。

这里的实现方法可能还不太好容易理解,强烈推荐阅读 Uber Go 语言编码规范

总结:

动态灵活的实现结构体的属性配置,是通过将每个属性分离出来,重构为一个独立的函数,一般以WithXxx开头,将实现委托给了返回的匿名函数来实现,原理伪代码如下

func WithOptionName(*options Options, optionValue interface{}) {
    options.OptionName = optionValue
}

推荐阅读

Uber Go 语言编码规范:https://github.com/xxjwxc/uber_go_guide_cn#%E7%BC%96%E7%A8%8B%E6%A8%A1%E5%BC%8F