golang中的sync.Pool对象缓存

参考文章

知识点

  • golang1.13版本对Pool进行了优化,结构体添加了两个字段 victim 和 victimSize。
  • 适应于通过复用,降低复杂对象的创建和GC代价的场景
  • 每个sync.Pool的生命周期为两次GC中间时段才有效,因为init()的时候会注册一个PoolCleanup函数,他会在gc时清除掉sync.Pool中的所有的缓存的对象。可以手动进行gc操作 runtime.GC()
  • 由于生命周期受GC的影响,一定不要用于数据库连接池这类的应用场景,它只是一个缓存。
  • 由于要保证协程安全,所以会有锁的开销
  • 每个Pool都有一个私有池(协程安全)和共享池(协程不安全),其中私有池只有存放一个值。
    1. 每次Get()时会先从当前P的私有池private中获取(MPG模型
    2. 如果获取失败,再从当前P的共享池share中获取
    3. 如果仍失败,则从其它P中共享池中拿一个,需要加锁保证协程安全
    4. 如果还失败,则表示所有P中的池(也有可能只是共享池)都为空,则需要New()一个并直接返回(此时不会被放入池中)
    每次取值出来后,会从原来的地方删除掉该值。

golang 的编程模式之“功能选项”

最近在用go重构iot中的一个服务时,发现库 rocketmq-client-go@v2.0.0-rc1 在初始化消费客户端实现时,实现的极其优雅,代码见https://github.com/apache/rocketmq-client-go/blob/v2.0.0-rc1/examples/consumer/simple/main.go#L32

c, _ := rocketmq.NewPushConsumer(
    consumer.WithGroupName("testGroup"),
    consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
    msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    for i := range msgs {
        fmt.Printf("subscribe callback: %v \n", msgs[i])
    }

    return consumer.ConsumeSuccess, nil
})

这里创建结构体 rocketmq.NewPushConsumer() 的时候,与我们平时的写法不同并没有写死结构体的字段名和值,而是每个属性都使用了一个函数来实现了,同时也不用考虑属性字段的位置关系,比起以前写kv键值对的方法实在是太灵活了。
我们再看一下其中一个WithGroupName()函数的实现方法

func WithGroupName(group string) Option {
    return func(opts *consumerOptions) {
        if group == "" {
            return
        }
        opts.GroupName = group
    }
}

传递的参数为consumerOptions指针类型,这里用到了一个匿名函数,返回的类型为Option(定义 type Option func(*consumerOptions) )。看到这里大概明白实现原理了吧。
为了确认我们的判断,我们再看一下 rocketmq.NewPushConsumer()函数

func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) {
	return consumer.NewPushConsumer(opts...)
}

这里直接调用了另一个 consumer包里的 NewPushConsumer() 函数,其内容如下( 为了方便理解,在代码里直接加了注释)

// opts 为不定参数
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
    // defaultPushConsumerOptions 见 https://github.com/apache/rocketmq-client-go/blob/7308bc94369320195652243059f63c71bfafc74b/consumer/option.go#L109
	defaultOpts := defaultPushConsumerOptions()

    // 实现动态的给 defaultOpts 属性赋值
	for _, apply := range opts {
        // 重点!重点!重点!传递的是一个指针
        // apply 是一个以 WithXxx 开头的函数的返回值即匿名函数,如
        // func WithGroupName(group string) Option{
        //  return func(opts *consumerOptions) {
        //    if group == "" {
        //      return
        //    }
        //    opts.GroupName = group
        //   }
        // }
		apply(&defaultOpts)
	}
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
	if err != nil {
		return nil, errors.Wrap(err, "new Namesrv failed.")
	}
	if !defaultOpts.Credentials.IsEmpty() {
		srvs.SetCredentials(defaultOpts.Credentials)
	}
	defaultOpts.Namesrv = srvs

	if defaultOpts.Namespace != "" {
		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
	}

	dc := &defaultConsumer{
		client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
		consumerGroup:  defaultOpts.GroupName,
		cType:          _PushConsume,
		state:          int32(internal.StateCreateJust),
		prCh:           make(chan PullRequest, 4),
		model:          defaultOpts.ConsumerModel,
		consumeOrderly: defaultOpts.ConsumeOrderly,
		fromWhere:      defaultOpts.FromWhere,
		allocate:       defaultOpts.Strategy,
		option:         defaultOpts,
		namesrv:        srvs,
	}

	p := &pushConsumer{
		defaultConsumer: dc,
		subscribedTopic: make(map[string]string, 0),
		queueLock:       newQueueLock(),
		done:            make(chan struct{}, 1),
		consumeFunc:     utils.NewSet(),
	}
	dc.mqChanged = p.messageQueueChanged
	if p.consumeOrderly {
		p.submitToConsume = p.consumeMessageOrderly
	} else {
		p.submitToConsume = p.consumeMessageCurrently
	}

	p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)

	return p, nil
}

其中 defaultPushConsumerOptions 定义如下

func defaultPushConsumerOptions() consumerOptions {
	opts := consumerOptions{
        // ClientOptions 重点字段
		ClientOptions:              internal.DefaultClientOptions(),

		Strategy:                   AllocateByAveragely,
		MaxTimeConsumeContinuously: time.Duration(60 * time.Second),
		RebalanceLockInterval:      20 * time.Second,
		MaxReconsumeTimes:          -1,
		ConsumerModel:              Clustering,
		AutoCommit:                 true,
	}

    // 这里只对 GroupName 属性进行了初始化,未指定的则使用结构体 ClientOptions 字段类型的默认值
	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
	return opts
}

同时他有诸多以 WithXxx 开头的方法体,如 WithGroupName()、WithNameServer()、WithInstance()。

我们再找到 consumerOptions 结构体的定义,最终找到定义如下

type ClientOptions struct {
	GroupName         string
	NameServerAddrs   primitive.NamesrvAddr
	NameServerDomain  string
	Namesrv           *namesrvs
	ClientIP          string
	InstanceName      string
	UnitMode          bool
	UnitName          string
	VIPChannelEnabled bool
	RetryTimes        int
	Interceptors      []primitive.Interceptor
	Credentials       primitive.Credentials
	Namespace         string
}

发现这些才是我们平时使用的属性字段。

这里的实现方法可能还不太好容易理解,强烈推荐阅读 Uber Go 语言编码规范

总结:

动态灵活的实现结构体的属性配置,是通过将每个属性分离出来,重构为一个独立的函数,一般以WithXxx开头,将实现委托给了返回的匿名函数来实现,原理伪代码如下

func WithOptionName(*options Options, optionValue interface{}) {
    options.OptionName = optionValue
}

推荐阅读

Uber Go 语言编码规范:https://github.com/xxjwxc/uber_go_guide_cn#%E7%BC%96%E7%A8%8B%E6%A8%A1%E5%BC%8F

MySQL中的 InnoDB Buffer Pool

一、InnoDB Buffer Pool简介

Buffer Pool是InnoDB引擎内存中的一块区域,主要用来缓存表和索引数据使用。我们知道从内存读取数据要比磁盘读取效率要高的多,这也正是buffer pool发挥的主要作用。一般配置值都比较大,在专用数据库服务器上,大小为物理内存的80%左右。

二、Buffer Pool LRU 算法

Buffer Pool 链表使用优化改良后LRU(最近最少使用)算法进行管理。

整个LRU链表可分为两个子链表,一个是New Sublist,也称为Young列表或新生代,另一个是Old Sublist ,称为Old 列表或老生代。每个子链表都有一个Head和Tail,中间部分是存储Page数据的地方。

当新的Page放入 Buffer Pool 缓存池的时候,会交其Page插入就是两个子链表的交界处,称为midpoint,同时就会有旧的Page被淘汰,整个操作过程都需要对链接进行维护。

Continue reading

使用Dockerfile 多阶段构建Golang 应用

docker在开发和运维中使用的场景越来越多,作为开发人员非常有必要了解一些docker的基本知识,而离我们工作中最近的也就是对应用的docker部署编排了,小到一个dockerfile, docker-compse文件的编写,大到k8s的管理。这里我们以 golang应用为例讲解一些Dockerfile的基本用法,在ci/cd中经常用到这些知识。

前提

项目清单:

drwxr-xr-x   9 sxf  staff   288 12 31 16:13 .
drwx------@ 17 sxf  staff   544 12 31 14:59 ..
-rw-r--r--   1 sxf  staff    14 12 31 16:09 .dockerignore
drwxr-xr-x  14 sxf  staff   448 12 31 16:21 .git
-rw-r--r--   1 sxf  staff   467 12 31 16:08 Dockerfile
-rw-r--r--   1 sxf  staff    11 12 31 15:01 README.md
-rw-r--r--   1 sxf  staff    84 12 31 15:51 go.mod
-rw-r--r--   1 sxf  staff  3433 12 31 15:51 go.sum
-rw-r--r--   1 sxf  staff   191 12 31 16:02 main.go
文件说明:
.dockerignore 看名字就知道他的作用是用为忽略一些文件的,它的使用主要是在Dockerfile中使用COPY/ADD 指令时发挥作用。以行为单位,这里共两行,行内容分别是.git 和 README.md
.git 这个是项目Git仓库
Dockerfile 我们文章的重点
go.mod Golang启用了模块管理功能
go.sum 启用模块管理时,会在此文件中记录依赖的三方库
main.go 我们的主要go程序文件,一个简单的webserver应用

项目仓库地址:github.com/cfanbo/democice

Continue reading

Golang中的goroutine泄漏问题

goroutine作为Go中开发语言中的一大利器,在高并发中发挥着无法忽略的作用。但东西虽好,真正做到用好还是有一些要注意的地方,特别是对于刚刚接触这门开发语言的新手来说,稍有不慎,就极有可能导致goroutine 泄漏。

什么是goroutine Leak

goroutine leak 的意思是go协程泄漏,那么什么又是协程泄漏呢?我们知道每次使用go关键字开启一个gorountine任务,经过一段时间的运行,最终是会结束,从而进行系统资源的释放回收。而如果由于操作不当导致一些goroutine一直处于阻塞状态或者永远运行中,永远也不会结束,这就必定会一直占用系统资源。最球的情况下是随着系统运行,一直在创建此类goroutine,那么最终结果就是程序崩溃或者系统崩溃。这种情况我们一般称为goroutine leak。

出现的问题

先看一段代码:

package main

import (  
    "fmt"  
    "math/rand"  
    "runtime"  
    "time"  
)

func query() int {  
    n := rand.Intn(100)  
    time.Sleep(time.Duration(n) * time.Millisecond)  
    return n  
}

// 每次执行此函数,都会导致有两个goroutine处于阻塞状态
func queryAll() int {  
    ch := make(chan int)  
    go func() { ch <- query() }()  
    go func() { ch <- query() }()  
    go func() { ch <- query() }()  
	// <-ch
	// <-ch
    return <-ch  
}

func main() {  
    // 每次循环都会泄漏两个goroutine
    for i := 0; i < 4; i++ {  
        queryAll()  
        // main()也是一个主groutine
        fmt.Printf("#goroutines: %d\n", runtime.NumGoroutine())  
    }  
}

运行结果

#goroutines: 3
#goroutines: 5
#goroutines: 7
#goroutines: 9

这里发现goroutine的数量一直在增涨,按理说这里的值应该一直是 1 才对的呀(只有一个Main 函数的主goroutine)。其实这里发生了goroutine泄漏的问题。

主要问题发生在 queryAll() 函数里,这个函数在goroutine里往ch里连续三次写入了值,由于这里是无缓冲的ch,所以在写入值的时候,要有在ch有接收者时才可以写入成功,也就是说在从接收者从ch中获取值之前, 前面三个ch<-query() 一直处于阻塞的状态。当执行到queryAll()函数的 return语句 时,ch接收者获取一个值(意思是说三个ch<-query() 中执行最快的那个goroutine写值到ch成功了,还剩下两个执行慢的 ch<-query() 处于阻塞)并返回给调用主函数时,仍有两个ch处于浪费的状态。

在Main函数中对于for循环
第一次:goroutine的总数量为 1个主goroutine + 2个浪费的goroutine = 3
第二次:3 + 再个浪费的2个goroutine = 5
第三次:5 + 再个浪费的2个goroutine = 7
第三次:7 + 再个浪费的2个goroutine = 9

正好是程序的输出结果。

好了,问题我们知道怎么回事了,剩下的就是怎么解决了

解决方案:

可以看到,主要是ch写入值次数与读取的值的次数不一致导致的有ch一直处于阻塞浪费的状态,我们所以我们只要保存写与读的次数完全一样就可以了。

这里我们把上面queryAll() 函数代码注释掉的 <-ch 两行取消掉,再执行就正常了,输出内容如下:

#goroutines: 1
#goroutines: 1
#goroutines: 1
#goroutines: 1

对于goroutine的数量只有一个,也必须有一个,因为Main()函数也是一个goroutine。(http://docs.studygolang.com/src/runtime/proc.go?s=102731:102737#L3607 https://www.kancloud.cn/mutouzhang/go/596824

当然对于解决goroutine的方法不是仅仅这一种,也可以利用context来解决,参考:https://www.cnblogs.com/chenqionghe/p/9769351.html

使用 uber-go/goleak 工具检测goleak

上面我们是手动通过获取 groutine数量来判断是否存在泄漏的,下面我们使用 uber-go/goleak工具来检测是否存在泄漏问题

func TestGoleak(t *testing.T) {
        // 重要的一条检测goroutine泄漏语句
	defer goleak.VerifyNone(t)

	// chan 长度为0
	ch := make(chan int)
	go func() {
		for i := 0; i < 4; i++ {
			ch <- i
		}
	}()
	go func() {
		for i := 0; i < 3; i++ {
			<-ch
		}
	}()

	time.Sleep(time.Second * 2)

	// 会打印2,因为这里为阻塞chan, 第2个go func(){}只消费了三个,所以当第一个go func(){} 生产第4个数据时会一直牌阻塞状态
	// 此时会有一个goroutine一直处于阻塞运行状态,再加上一个Main()函数的主goroutine,正好是一1个
	fmt.Printf("#goroutines: %d\n", runtime.NumGoroutine())
}

工具使用方法只需要在单元测试方法里添加一条“defer goleak.VerifyNone(t)” 语句即可。

go test -run TestGoleak
#goroutines: 3
--- FAIL: TestGoleak (2.46s)
    leaks.go:78: found unexpected goroutines:
        [Goroutine 8 in state chan send, with iot/pkg.TestGoleak.func1 on top of the stack:
        goroutine 8 [chan send]:
        iot/pkg.TestGoleak.func1(0xc000022300)
                /Users/sxf/iot-server/pkg/a_test.go:39 +0x43
        created by iot/pkg.TestGoleak
                /Users/sxf/iot-server/pkg/a_test.go:37 +0xcd
        ]
FAIL
exit status 1
FAIL    iot/pkg 3.342s

从输出内容“ound unexpected goroutines”可以看出存在泄漏的goroutine。

如果想一次性执行多个单元测试的话,则需要另外一种方法,就是定义一个 TestMain() 函数,如

func TestMain(m *testing.M) {
	goleak.VerifyTestMain(m)
}

使用此方法的话,就不需要在每个单元测试方法里写 “defer goleak.VerifyNone(t)
”了。TestMain()函数会打印出来每个单元测试方法的检查结果。

上面我们使用了阻塞chan方法,如果你修改为非阻塞,长度为1的话,就不会发生goroutine泄漏的问题了,

另外也可以使用golang/gops 工具,参考https://studygolang.com/articles/12495,更多用法可以通过 gops –help 命令查看

提醒:

垃圾收集器不会收集以下形式的goroutines:

go func() {
// <操作会在这里永久阻塞>
}()
// Do work

这个goroutine将一直存在,直到整个程序退出,属于一种高级特性。是否属于goroutine leak 还需要看如何使用了,如https://www.jianshu.com/p/b524c6762662。如果处理不好,如for{} 根本就不可能结束,就算泄漏,所以我们写程序时,至少要保证他们结束的条件,且一定可以结束才算正常。

产生goroutine leak的原因

  • goroutine由于channel的读/写端退出而一直阻塞,导致goroutine一直占用资源,而无法退出,如只有写入,没有接收,反之一样
  • goroutine进入死循环中,导致资源一直无法释放

goroutine终止的场景

  • goroutine完成它的工作
  • 由于发生了没有处理的错误
  • 收到结束信号,直接终止任务

检测工具

参考

MySQL8.0中的跳跃范围扫描优化Skip Scan Range Access Method介绍

在MySQL8.0以前,索引使用规则有一项是索引左前缀,假如说有一个索引idx_abc(a,b,c),能用到索引的情况只有查询条件为a、ab、abc、ac这四种,对于只有字段b的where条件是无法用到这个idx_abcf索引的。这里再强调一下,这里的顺序并不是在where中字段出现的顺序,where b=2 and 1=1 也是可以利用到索引的,只是用到了(a,b)这两个字段

针对这一点, 从MySQL 8.0.13开始引入了一种新的优化方案,叫做 Skip Scan Range,翻译过来的话是跳跃范围扫描。如何理解这个概念呢?我们可以拿官方的SQL示例具体讲一下(https://dev.mysql.com/doc/refman/8.0/en/range-optimization.html

CREATE TABLE t1 (f1 INT NOT NULL, f2 INT NOT NULL, PRIMARY KEY(f1, f2));
INSERT INTO t1 VALUES
  (1,1), (1,2), (1,3), (1,4), (1,5),
  (2,1), (2,2), (2,3), (2,4), (2,5);
INSERT INTO t1 SELECT f1, f2 + 5 FROM t1;
INSERT INTO t1 SELECT f1, f2 + 10 FROM t1;
INSERT INTO t1 SELECT f1, f2 + 20 FROM t1;
INSERT INTO t1 SELECT f1, f2 + 40 FROM t1;
ANALYZE TABLE t1;

EXPLAIN SELECT f1, f2 FROM t1 WHERE f2 > 40;

我们这里创建了一个t1表,其中主键为(f1,f2),这里是两个字段。执行完这个sql语句后表里有160条记录,执行计划为

mysql> EXPLAIN SELECT f1, f2 FROM t1 WHERE f2 > 40;
+----+-------------+-------+------------+-------+---------------+---------+---------+------+------+----------+----------------------------------------+
| id | select_type | table | partitions | type  | possible_keys | key     | key_len | ref  | rows | filtered | Extra                                  |
+----+-------------+-------+------------+-------+---------------+---------+---------+------+------+----------+----------------------------------------+
|  1 | SIMPLE      | t1    | NULL       | range | PRIMARY       | PRIMARY | 8       | NULL |   53 |   100.00 | Using where; Using index for skip scan |
+----+-------------+-------+------------+-------+---------------+---------+---------+------+------+----------+----------------------------------------+
1 row in set, 1 warning (0.00 sec)

这里可以看到 type 为 rang,说明用到了范围查询,key为 PRIMARY, Extra中 Using where; Using index for skip scan

说明确实用到了新特性 skip scan。

那么在MySQL内部这个 skip scan 它又是如何执行的呢,我们可以理解以下几步

  1. 先统计一下索引前缀字段 f1 字段值有几个唯一值,这里一共有1 和2
  2. 对其余索引部分上的f2> 40条件的每个不同的前缀值执行子范围扫描

对于详细的执行流程如下:

  1. 获取f1的第一个唯一值(f1=1)
  2. 组合能用到索引的sql语句(f1=1 AND f2>40)
  3. 执行组合后的sql语句,进行范围扫描,并将结果放入记录集
  4. 重复上面的步骤,获取f1的第二个唯一值(f1=2)
  5. 组合能用到索引的sql语句(f1=2 AND f2>40)
  6. 执行组合后的sql语句,进行范围扫描,并将结果放入记录集
  7. 全部执行完毕,返回记录集给客户端

不错,原理很简单,就是将f1字段拆分成不同的值,将每个值带入到适合左前缀索引的SQL语句中,最后再合并记录集并返回即可,类似UNION操作。够简单吧!

但有同学可能会问,是所有的查询都不会执行这个优化吗?答案是否定的,主要还要看左前缀有字段值的分散情况,如果值过多的话,性能还是比较差的。系统会进行全表扫描,这里就需要单独为这个字段创建一个单独的索引。

skip scan特性虽好,但也有一些使用条件。

skip scan触发条件

(1)必须是联合索引

(2)只能是一个表

(3)不能使用distinct或group by ;

(4)SQL不能回表,即select列和where条件列都要包含在一个索引中

(5)默认optimizer_switch=’skip_scan=on’开启;

一致性哈希算法及其在分布式系统中的应用(推荐)

摘要

本文将会从实际应用场景出发,介绍一致性哈希算法(Consistent Hashing)及其在分布式系统中的应用。首先本文会描述一个在日常开发中经常会遇到的问题场景,借此介绍一致性哈希算法以及这个算法如何解决此问题;接下来会对这个算法进行相对详细的描述,并讨论一些如虚拟节点等与此算法应用相关的话题。

分布式缓存问题

假设我们有一个网站,最近发现随着流量增加,服务器压力越来越大,之前直接读写数据库的方式不太给力了,于是我们想引入Memcached作为缓存机制。现在我们一共有三台机器可以作为Memcached服务器,如下图所示。

很显然,最简单的策略是将每一次Memcached请求随机发送到一台Memcached服务器,但是这种策略可能会带来两个问题:一是同一份数据可能被存在不同的机器上而造成数据冗余,二是有可能某数据已经被缓存但是访问却没有命中,因为无法保证对相同key的所有访问都被发送到相同的服务器。因此,随机策略无论是时间效率还是空间效率都非常不好。

Continue reading

一文理解MySQL中的page页

在介绍InnoDB中的页的时候,很有必要先让大家了解一下InnoDB中的存储结构

从InnoDB存储引擎的逻辑结构看,所有数据都被逻辑地存放在一个空间内,称为表空间(tablespace),而表空间由段(sengment)、区(extent)、页(page)组成。 在一些文档中extend又称块(block)。

一、表空间(table space)

表空间(Tablespace)是一个逻辑容器,表空间存储的对象是段,在一个表空间中可以有一个或多个段,但是一个段只能属于一个表空间。数据库由一个或多个表空间组成,表空间从管理上可以划分为系统表空间、用户表空间、撤销表空间、临时表空间等。

在 InnoDB 中存在两种表空间的类型:共享表空间和独立表空间。如果是共享表空间就意味着多张表共用一个表空间。如果是独立表空间,就意味着每张表有一个独立的表空间,也就是数据和索引信息都会保存在自己的表空间中。独立的表空间可以在不同的数据库之间进行迁移。可通过命令

Continue reading