Golang 的map实现原理

go Version 1.15.6

map作为一种常见的 key-value 数据结构,不同语言的实现原理基本差不多。首先在系统里分配一段连接的内存地址作为数组,然后通过对map键进行hash算法(最终将键转换成了一个整型数字)定位到不同的桶bucket(数组的索引位置),然后将值存储到对应的bucket里

d2b5ca33bd970f64a6301fa75ae2eb22

理想的情况下是一个bucket存储一个值,即数组的形式,时间复杂度为O(1)。

如果存在键值碰撞的话,可以通过 链表法 或者 开放寻址法 来解决。

链表法

d2b5ca33bd970f64a6301fa75ae2eb22-1

开放寻址法

对于开放寻址法朋多种算法,常见的有线性探测法,线性补偿探测法,随机探测法等。这里不再介绍,有兴趣的话可以自行查阅。

map基本数据结构

hmap结构体

map的核心数据结构是 /runtime/map.go

/ A header for a Go map.
type hmap struct {
	// Note: the format of the hmap is also encoded in cmd/compile/internal/gc/reflect.go.
	// Make sure this stays in sync with the compiler's definition.
	count     int // # live cells == size of map.  Must be first (used by len() builtin)
	flags     uint8
	B         uint8  // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
	noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
	hash0     uint32 // hash seed

	buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
	oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
	nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)

	extra *mapextra // optional fields
}

在源码中表示map 的结构体是hmap,即hash map的缩写。

count 指的是当前map的大小,即当前存放的元素个数,必须放在第一个位置,因为通过len()函数时,会通过unsafe.Poiner从这里读取此值并返回
flags 可以理解为一个标记位,几种值的定义(这里),在多个地方需要使用此值,如mapaccess1()
B 是一个对数,表示当前map持有的 buckets 数量(2^B)。但需要考虑一个重要因素装载因子,所以真正可以使用的桶只有loadFactor * 2^B 个,如果超出此值将触发扩容,默认装载因子是6.5
noverflow 溢出buckets数量
hash0 hash种子,在操作键值的时候,需要引入此值加入随机性
buckets 底层数组指针,指向当前map的底层数组指针
oldbuckets 同是底层数组指针,一般在进行map迁移的时候,用来指向原来旧数组。只有迁移过程中此字段才有意义,此字段数组大小只有buckets 的一半
nevacuate 进度计算器,表示扩容进度,小于此地址的 buckets 迁移完成
extra 可选字段,溢出桶专用,只要有桶装满就会使用

mapextra结构体

// mapextra holds fields that are not present on all maps.
type mapextra struct {
	// If both key and elem do not contain pointers and are inline, then we mark bucket
	// type as containing no pointers. This avoids scanning such maps.
	// However, bmap.overflow is a pointer. In order to keep overflow buckets
	// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
	// overflow and oldoverflow are only used if key and elem do not contain pointers.
	// overflow contains overflow buckets for hmap.buckets.
	// oldoverflow contains overflow buckets for hmap.oldbuckets.
	// The indirection allows to store a pointer to the slice in hiter.
	overflow    *[]*bmap
	oldoverflow *[]*bmap

	// nextOverflow holds a pointer to a free overflow bucket.
	nextOverflow *bmap
}

当 map 的 key 和 value 都不是指针且内联时,将会把 bmap 标记为不含指针,这样可以避免 gc 时扫描整个 hmap。

但是,我们看 bmap 其实有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。

overflowoldoverflow 只有当键和值不包含指针才使用,其中overflow存储hmap.buckets的溢出桶,而oldoverflow存储hmap.oldbuckets的溢出桶。

间接允许将指向切片的指针存储在hiter中

nextOverlflow 指向下一个空的溢出桶的指针

bmap结构体

// A bucket for a Go map.
type bmap struct {
	// tophash generally contains the top byte of the hash value
	// for each key in this bucket. If tophash[0] < minTopHash,
	// tophash[0] is a bucket evacuation state instead.
	tophash [bucketCnt]uint8
	// Followed by bucketCnt keys and then bucketCnt elems.
	// NOTE: packing all the keys together and then all the elems together makes the
	// code a bit more complicated than alternating key/elem/key/elem/... but it allows
	// us to eliminate padding which would be needed for, e.g., map[int64]int8.
	// Followed by an overflow pointer.
}

bmap即bucket map的缩写。

这里只有一个tophash字段,而现实我们在使用中值的类型是不固定的,甚至可以是一个自定义结构体的指针类型。这个结构点可能有点让人费解,其实编译器在编译期间通过函数  cmd/compile/internal/gc.bmap 函数会动态创建一个新的同名数据结构,如下

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

桶内的每个元素在存放时,都是先经过hash计算后会得出一个整数,再根据hash值的高8位来决定存储在桶内的那个位置。从0~7,共八个位置,因此每个桶可以存储8个元素,其中将所有的键放在一起,按先后位置存放在keys中,再将所有的值放在一起再按先后位置存放在values中,并没有将每个键值单独存储在一个字段中,主要是为了解决内存对齐带来的浪费问题。

例如 map[int64]int8 ,如果按key/value/key/value/… 这种方式存储的话,则在每一个key/value之后需要padding7个字节才行,而如果按上面key/key/key/… 和 value/value/value/…这种方式的话,只需要最最后一个值后面进行一次padding就可以了,大大节省了内存开支。

如果当前bucket已满的话,则会创建新的桶,并使用 overflow 字段进行链接指向链接,实现链表功能。

d2b5ca33bd970f64a6301fa75ae2eb22-2

map结构体关系图

ef5391134513ceb61e8066544e73e742-38
来源网络

key的定位算法

在map中要存储一个键值对,必须先找到所在的位置,一般采用hash算法和取模的结果来实现,这里我们主要介绍一下map中是如何实现的。

上面我们介绍 bmap 结构体的时候,有一个 B 字段,这个字段决定了map对应的底层数组的大小,它的大小决定了可以容纳的bucket的个数。如果B=5的话,则可以bucket的数组元素值个数为 2^5=32个,但golang中需要考虑到装载分子6.5这个因素,所以真正使用的并没有这么多。每当存储元素个数百分比达到65%的时候,就会触发map的扩容操作。

如果想知道一个key要放在哪个bucket个, 需要先计算出一个hash值,然后再除以 32取余数即可。golang 也是如此实现的,只不过是为了性能考虑,使用了位操作而已。如5 *2 换作用位操作的话,就是将5左移1位即可。每左移1位一次就是*2,左移两位就是*4,同理右移则是相除。

如 hash(key) 计算出的结果为

10010111 | 000011110110110010001111001010100010010110010101010 │ 00110

根据上面说的h.B = 5,则取后面的500110,值为6,则需要将key:value存放在第6号的bucket中。

找到了桶的位置,还需要找到放在桶的什么位置,每个位置我们可以称之为slot。

// src/runtime/map.go

const (
	// Maximum number of key/elem pairs a bucket can hold.
	bucketCntBits = 3
	bucketCnt     = 1 << bucketCntBits
        ......
}

这里定义了一个bucket最多存放元素个数的相关常量,在go里一个bucket最多可以存放 2^3=8个,再多的话就需要使用到溢出桶overflow。

golang map

slot位置计算公式为取hash的高8位,计算函数为tophash()。这里高8位是 10010111,十进制的值为151, 于是在第6号bucket的中遍历每个slot,直到找到bmap.tophash值为151的slot,当前slot在bucket中的索引位置就是要找的键和值的索引位置。然后根据当前位置索引值分别从bmap.keysbmap.values中计算出对应的值。

如果当前bucket中没有找到,就去当前bukcet 中的溢出桶overflow中查找,如果overflow为nil,则直接返回空值即可。

总结

要想实现元素的存储定位需要三个步骤:

  1. 根据h.B 来取计算后hash结果的最后 b.H位数字,定位到bucket
  2. 根据hash结果的高8位,实现在bucket定位到指定的位置
  3. 根据位置分别从 bmap.key和bmap.values中读取存储的值内容

map创建

package main

func main() {
	m := make(map[int]string)
	m[0] = "a"
	_ = m[0]
	_, _ = m[0]
	delete(m, 0)
}

我们先看一下与map想着的runtime有哪些

$ go tool compile -S main.go | grep runtime 
        0x0080 00128 (main.go:4)        CALL    runtime.fastrand(SB)
        0x00aa 00170 (main.go:5)        CALL    runtime.mapassign_fast64(SB)
        0x00bc 00188 (main.go:5)        CMPL    runtime.writeBarrier(SB), $0
        0x00f1 00241 (main.go:6)        CALL    runtime.mapaccess1_fast64(SB)
        0x0114 00276 (main.go:7)        CALL    runtime.mapaccess2_fast64(SB)
        0x0137 00311 (main.go:8)        CALL    runtime.mapdelete_fast64(SB)
        0x0153 00339 (main.go:5)        CALL    runtime.gcWriteBarrier(SB)
        0x0160 00352 (main.go:3)        CALL    runtime.morestack_noctxt(SB)
        ......

可以看到每个对map操作基本都有相应的同名调用函数。不过对map的初始化操作函数不是fastrand()函数,而是runtime.makemap()函数(还有一个runtime.makemap_small函数)。fastrand()函数只是用来产生随机hash种子的。

注意:我们这里没有指定创建map的长度,runtime会根据是否指定这个字段做一些情况考虑。

// src/runtime/map.go

// makemap implements Go map creation for make(map[k]v, hint).
// If the compiler has determined that the map or the first bucket
// can be created on the stack, h and/or bucket may be non-nil.
// If h != nil, the map can be created directly in h.
// If h.buckets != nil, bucket pointed to can be used as the first bucket.
func makemap(t *maptype, hint int, h *hmap) *hmap {
	mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
	if overflow || mem > maxAlloc {
		hint = 0
	}

	// initialize Hmap
	if h == nil {
		h = new(hmap)
	}
	h.hash0 = fastrand()

	// Find the size parameter B which will hold the requested # of elements.
	// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
	B := uint8(0)
	for overLoadFactor(hint, B) {
		B++
	}
	h.B = B

	// allocate initial hash table
	// if B == 0, the buckets field is allocated lazily later (in mapassign)
	// If hint is large zeroing this memory could take a while.
	if h.B != 0 {
		var nextOverflow *bmap
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

	return h
}

步骤:

  1. 计算指定的大小所需要的内容是否超出出系统允许的最大分配大小
  2. 初始化hmap,并指定随机种子
  3. 通过overLoadFactor(hint, B)函数找到一个能装下指定map大小个元素个数的最小B,从小到大开始循环B。刚开始map越小越好
  4. 开始对hash table进行初始化。如果B==0则buckets 进行延时初始化操作(赋值的时候才进行初始化),如果B值特别大,则初始化需要一段时间,主要通过makeBucketArray()函数实现

注意 makemap()函数返回的是一个指针,而makeslice()返回的是一个新的结构体,在参数传递的时候,是值复制,两者有差异,有些是引用的是同一个数组,有些不是。

// src/runtime/map.go

// makeBucketArray initializes a backing array for map buckets.
// 1<<b is the minimum number of buckets to allocate.
// dirtyalloc should either be nil or a bucket array previously
// allocated by makeBucketArray with the same t and b parameters.
// If dirtyalloc is nil a new backing array will be alloced and
// otherwise dirtyalloc will be cleared and reused as backing array.
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	base := bucketShift(b)
	nbuckets := base
	// For small b, overflow buckets are unlikely.
	// Avoid the overhead of the calculation.
	if b >= 4 {
		// Add on the estimated number of overflow buckets
		// required to insert the median number of elements
		// used with this value of b.
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets
		up := roundupsize(sz)
		if up != sz {
			nbuckets = up / t.bucket.size
		}
	}

	if dirtyalloc == nil {
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		// dirtyalloc was previously generated by
		// the above newarray(t.bucket, int(nbuckets))
		// but may not be empty.
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		if t.bucket.ptrdata != 0 {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}

	if base != nbuckets {
		// We preallocated some overflow buckets.
		// To keep the overhead of tracking these overflow buckets to a minimum,
		// we use the convention that if a preallocated overflow bucket's overflow
		// pointer is nil, then there are more available by bumping the pointer.
		// We need a safe non-nil pointer for the last overflow bucket; just use buckets.
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

初始化bucket

  1. 对于b<4的话(桶的数量< 2^4),基本不需要溢出桶,这样就可以避免开销。对于b>4(桶的数量> 2^4)的话,则需要创建2^(b-4)个溢出桶。
  2. 数组分配通过函数 newarray()实现,第一个参数是元素类型,每二个参数是数组长度,返回的是数组内存首地址

map读取

对于map中的赋值与修改原理是基本一样的,先找到位置,不管原来位置有没有数据直接存储新数据就可以了。

对于map的赋值操作,是由函数 runtime.mapaccess1()runtime.mapaccess2() 来实现的。两者的唯一区别就是返回值不一样,runtime.mapaccess1() 返回的是一个值,runtime.mapaccess2() 返回的是两个值,第二个值决定了key是否在map中存在。这点可以通过上面我们的 compile 结果中看出他们的区别。这里我们只介绍runtime.mapaccess1()

// src/runtime/map.go

// mapaccess1 returns a pointer to h[key].  Never returns nil, instead
// it will return a reference to the zero object for the elem type if
// the key is not in the map.
// NOTE: The returned pointer may keep the whole map live, so don't
// hold onto it for very long.
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	if raceenabled &amp;&amp; h != nil {
		callerpc := getcallerpc()
		pc := funcPC(mapaccess1)
		racereadpc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled &amp;&amp; h != nil {
		msanread(key, t.key.size)
	}
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return unsafe.Pointer(&amp;zeroVal[0])
	}
	if h.flags&amp;hashWriting != 0 {
		throw("concurrent map read and map write")
	}
	hash := t.hasher(key, uintptr(h.hash0))
	m := bucketMask(h.B)
	b := (*bmap)(add(h.buckets, (hash&amp;m)*uintptr(t.bucketsize)))
	if c := h.oldbuckets; c != nil {
		if !h.sameSizeGrow() {
			// There used to be half as many buckets; mask down one more power of two.
			m >>= 1
		}
		oldb := (*bmap)(add(c, (hash&amp;m)*uintptr(t.bucketsize)))
		if !evacuated(oldb) {
			b = oldb
		}
	}
	top := tophash(hash)
bucketloop:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
			if t.key.equal(key, k) {
				e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				if t.indirectelem() {
					e = *((*unsafe.Pointer)(e))
				}
				return e
			}
		}
	}
	return unsafe.Pointer(&amp;zeroVal[0])
}

runtime.mapaccess1()函数返回一个指定h[key]的指针,如果key不存在,则返回当前map值类型的zero值,注意永远也不会返回一个nil值。

返回的指针在map中是永远有效的,尽量读取完尽快使用并释放,不要长期持久。

步骤

  1. 判断h是否为nil或者h.count值是否为0,如果h为nil则表示未初始化,则可能panic,如果h.count=0,则表示map为空,则直接返回一个zero值。
  2. 考虑是否处于并发读写状态,否则产生panic
  3. 根据键key计算hash值
  4. 计算低B位的掩码bucketMask(h.B),比如 B=5,那 m 就是31,低五位二进制是全1
  5. 计算当前bucket的地址
  6. 根据h.oldbuckets是否为nil来判断是否处于扩容中,如果不是nil则表示当前map正处于扩容状态中。将m减少一半,重新计算当前key在 oldbuckets中的位置,如果oldbuckets没有全部迁移到新bucket的话(bmap.tophash的意义所在),则在oldbuckets中查找。
  7. 计算tophash,即高八位
  8. 真正开始查找key,外层for是循环bucket及溢出桶overflow,内层for是循环桶内的8个slot
    • 如果 b.tophash[i] != top 则表示当前bucket的第i个位置的tophash 与当前key的tophash不同。这时分两种情况:
      一种是当前的标识是 emptyRest 表示剩下的所有slot全是空,则直接结束当前bucket查找,继续下一下bucket重复此步骤(b = b.overflow(t));
      另一种则是继续查找下一个slot,直到8个slot全部查找完毕,然后再查找下一个bucket
    • 如果b.tophash[i] == top,则根据键bucket的首地址+类型计算出字节长度*slot索引值 计算得出key的位置
    • 判断同时bucket中的key与请求的key是否相等,如果相等的话,按取键的方法取出值,并返回,否则继续
  9. 如果在最后仍未找到key,则直接返回zero 值

key的定位公式

k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

value 的定位公式

e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))

其中 dataOffset 表示key相对于bmap的偏移量,结构体如下

// src/runtime/map.go

// data offset should be the size of the bmap struct, but needs to be
// aligned correctly. For amd64p32 this means 64-bit alignment
// even though pointers are 32 bit.
dataOffset = unsafe.Offsetof(struct {
	b bmap
	v int64
}{}.v)

所以键的地址=当前bucket的起始位置(unsafe.Pointer(b)) + key的偏移量(dataOffset)+当前slot索引值(i) * 每个键的大小(uintptr(t.keysize))

而对于值来说,由于bmap.values在b.map.keys后面,所以要先将8个键的地址全部计算上才行,同样值类型也有自己的大小 t.elemsize

对于bucket的几种状态码如下

// src/runtime/map.go

// Possible tophash values. We reserve a few possibilities for special marks.
// Each bucket (including its overflow buckets, if any) will have either all or none of its
// entries in the evacuated* states (except during the evacuate() method, which only happens
// during map writes and thus no one else can observe the map during that time).
emptyRest      = 0 // this cell is empty, and there are no more non-empty cells at higher indexes or overflows.
emptyOne       = 1 // this cell is empty
evacuatedX     = 2 // key/elem is valid.  Entry has been evacuated to first half of larger table.
evacuatedY     = 3 // same as above, but evacuated to second half of larger table.
evacuatedEmpty = 4 // cell is empty, bucket is evacuated.
minTopHash     = 5 // minimum tophash for a normal filled cell.

emptyRest 表示当前cell为空,并且它后面的所有cell也为空,包括溢出桶overflow。
emptyOne 表示当前cell为空。
evacuatedX 扩容相关,当前bucket的slot值迁移到了新的X部分,由于每次扩容时,都是两倍扩容,所以原来bucket中的8个slot值,有的被迁移到了新bucket的X部分,有的是Y部分(原来的1个桶变成2个桶,分别称作X和Y)。
evacuatedY 扩容相关,第二部分迁移完毕。
evacuatedEmpty 当前cell为空,且迁移完成。

minTopHash tophash最小值,如果在调用 tophash(hash)时,计算出的值小于此值,则会加上此值(代码

map赋值

与赋值相关的函数是mapassign,根据key的不同底层调用相同类型的函数 ,常见的有

key 类型插入
uint32mapassign_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
uint64mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
stringmapassign_faststr(t *maptype, h *hmap, ky string) unsafe.Pointer

我们这里只主要介绍常用的mapassign函数。

函数原型

// src/runtime/map.go

// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {}

步骤

  1. 检查是否处于并发模式,否则产生panic
  2. 进行 hash 值计算(同上)
  3. 检查bucket是否进行过初始化,否则调用 newobject(t.buckt) 函数进行初始化,这点我们上面介绍过
  4. 检查map 是否处于扩容状态(h.growing()),如果是则调用函数growWork(t,h,bucket)进行扩容工作
  5. 计算当前bucket的地址
  6. 计算tophash,开始在bucket内通过 for 遍历出相应的位置,这点同上面的方法一样,双层for查找bucket,如果找到了再去overflow查找。找到则进行typedmemmove(t.key, k, key)更新并直接结束整个逻辑
  7. 如果没有map里没有找到key,则判断如果map元素个数+1的话会不会触发扩容
    扩容所必须的两个条件:
    1. 达到了最大装载因子
    2. 溢出桶是否过多
    只要这两个其中的一个条件满足,则先进行扩容然后再重头开始。
  8. 到目前为止才算正式开始追加新的kv。要写入前必须要知道要写入的位置,有两个重要的变量,一个是inserti,另一个是insertk。如果此时inserti 为 nil的话,说明在上面第6步骤的时候遍历了所有的bucket和overflow也没有找到要插入的位置(bucket或overflow满了),则创建新的overflow,然后在新的overflow里计算出key和vlaue要存放的内存位置(代码
  9. 根据key和value的地址,写入新的值,并将当前map的元素个数加1。(需要特别注意的在这里并没有写入value,因为函数参数并没有传入值,所以返回的是一个存放v的内存地址,有了这个地址赋值就解决了)

总结:

触发map扩容的两个条件,一个是装载因子,另一个是overflow过多,对于overflow多少算多呢见下方说明。

map删除

与删除有关的函数为 mapdelete,原型

// src/runtime/map.go

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {}

根据 key 类型的不同,删除操作会被优化成更具体的函数:

key 类型删除
uint32mapdelete_fast32(t *maptype, h *hmap, key uint32)
uint64mapdelete_fast64(t *maptype, h *hmap, key uint64)
stringmapdelete_faststr(t *maptype, h *hmap, ky string)
golang mapdelete

步骤

  1. 判断map是否为nil或者根本就没有任何元素
  2. 检查是否并发模式
  3. 计算hash,双层for循环遍历查找key,找到位置后进行以下工作
    将key位置写入nil或置零值
    将value位置写入nil或置零值
    将tophash位置置为 emptyOne 状态
    判断当前key是不是bucket中的最后一个元素,如果是最后一个元素,且当前bucket还有overflow,而overflow的首个tophash[0] != emptyRest 直接结束逻辑。否则判断当前key的下一个 tophash是不是emptyRest,如果不是的话,直接结束逻辑。后面利用for循环重置tophash 的状态为最合适的状态,以方便后期循环使用

map扩容

随着map元素的添加,有可能出现bucket的个数不够,导致overflow桶越来越多,最后变成了一个单向链表了,查找效率越来越差,最差的情况下可能会变成O(n)。这与我们期待的O(1)相反,所以golang为了解决这个问题,就需要对map中的元素进行扩容重新整理,以达到最优的效果。那么什么时候才需要扩容呢?

// src/runtime/map.go

// Did not find mapping for key. Allocate new cell &amp; add entry.

// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
if !h.growing() &amp;&amp; (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
	hashGrow(t, h)
	goto again // Growing the table invalidates everything, so try again
}

这里有两个条件,只要满足其中一项就会进行扩容

  1. 当装载因子越过6.5的时候,这个是内部固定的一个值(查看说明)(代码)
  2. 当overflow 过多的时候
    a. 当 B 小于 15,也就是 bucket 总数 2^B 小于 2^15 时,如果 overflow 的 bucket 数量超过 2^B;
    b. 当 B >= 15,也就是 bucket 总数 2^B 大于等于 2^15,如果 overflow 的 bucket 数量超过 2^15。
    主要分界点就是15(代码

    针对第一个条件,就是说元素太多,桶装不下了,立即进行扩容,添加一倍的桶,保证元素个数最多只能达到桶总容量的65%;
    针对第二种情况一般是先是大量的添加元素,后来又删除了,再添加导致出现了太多的空的overflow,这样查找key的话,就需要遍历太多的桶了,效率大大下降,需要重新对元素进行位置调整,类似于windows系统中的磁盘碎片整理一样,将元素向前面的bucket中迁移即可,当然这种情况在扩容时底层有些特殊,需要注意一下。
// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
// 装载因子>6.5
func overLoadFactor(count int, B uint8) bool {
	return count > bucketCnt &amp;&amp; uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

// tooManyOverflowBuckets reports whether noverflow buckets is too many for a map with 1<<B buckets.
// Note that most of these overflow buckets must be in sparse use;
// if use was dense, then we'd have already triggered regular map growth.
// 溢出桶过多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
	// If the threshold is too low, we do extraneous work.
	// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
	// "too many" means (approximately) as many overflow buckets as regular buckets.
	// See incrnoverflow for more details.
	if B > 15 {
		B = 15
	}
	// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
	return noverflow >= uint16(1)<<(B&amp;15)
}

虽然这里调用了 hashGrow() 函数,但它并不有真正的进行扩容,我们先看下它的源码

// src/runtime/map.go

func hashGrow(t *maptype, h *hmap) {
	// If we've hit the load factor, get bigger.
	// Otherwise, there are too many overflow buckets,
	// so keep the same number of buckets and "grow" laterally.
	// 申请2倍空间
	bigger := uint8(1)

	// 判断装载因子>6.5这个条件,否则就是由于overflow过多,此时申请的空间和原来的一样
	if !overLoadFactor(h.count+1, h.B) {
		bigger = 0
		h.flags |= sameSizeGrow
	}

	// 调整hmap中的桶的指针指向,将原来桶变为旧桶,新桶为新申请的数组
	oldbuckets := h.buckets
	newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

	flags := h.flags &amp;^ (iterator | oldIterator)
	if h.flags&amp;iterator != 0 {
		flags |= oldIterator
	}
	// commit the grow (atomic wrt gc)
	// 提交grow操作
	h.B += bigger
	h.flags = flags
	h.oldbuckets = oldbuckets
	h.buckets = newbuckets
	h.nevacuate = 0 // 迁移进度
	h.noverflow = 0 // 溢出桶数量为0

	// 将扩展举出桶指令同样和新旧桶一样进行指针指向变更
	if h.extra != nil &amp;&amp; h.extra.overflow != nil {
		// Promote current overflow buckets to the old generation.
		if h.extra.oldoverflow != nil {
			throw("oldoverflow is not nil")
		}
		h.extra.oldoverflow = h.extra.overflow
		h.extra.overflow = nil
	}
	if nextOverflow != nil {
		if h.extra == nil {
			h.extra = new(mapextra)
		}
		h.extra.nextOverflow = nextOverflow
	}

	// the actual copying of the hash table data is done incrementally
	// by growWork() and evacuate().
}

其中 hashGrow() 只是为扩容做的一些准备工作,而真正进行扩容的是 growWork()evacuate()函数。

函数 growWork() 只有在赋值删除的过程中会出现,所以也只有这两个动作才会触发扩容动作。

我们再看看growWrok()源码

// src/runtime/map.go

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// make sure we evacuate the oldbucket corresponding
	// to the bucket we're about to use
	// 确认迁移的老的bucket是我们正在使用的bucket
	evacuate(t, h, bucket&amp;h.oldbucketmask())

	// evacuate one more oldbucket to make progress on growing
	// 如果还是在迁移中,注再迁移一个bucket
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

从代码里可以看到,先是迁移我们正在使用的bucket,如果还没有迁移完的话,就再迁移一个bucket,一次最多迁移两个bucket。

这里 h.oldbucketmask()是计算oldbucket的位置的,实现原理我们上面介绍过的,计算hash值的最低B位来决定用哪个bucket。

// oldbucketmask provides a mask that can be applied to calculate n % noldbuckets().func (h *hmap) oldbucketmask() uintptr { return h.noldbuckets() – 1}

下面我们再看一下evacuate()函数,最复杂的工作就是由它来完成的。

// src/runtime/map.go

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
	b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
	newbit := h.noldbuckets()
	if !evacuated(b) {
		// TODO: reuse overflow buckets instead of using new ones, if there
		// is no iterator using the old buckets.  (If !oldIterator.)

		// xy contains the x and y (low and high) evacuation destinations.
		var xy [2]evacDst
		x := &amp;xy[0]
		x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
		x.k = add(unsafe.Pointer(x.b), dataOffset)
		x.e = add(x.k, bucketCnt*uintptr(t.keysize))

		if !h.sameSizeGrow() {
			// Only calculate y pointers if we're growing bigger.
			// Otherwise GC can see bad pointers.
			y := &amp;xy[1]
			y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
			y.k = add(unsafe.Pointer(y.b), dataOffset)
			y.e = add(y.k, bucketCnt*uintptr(t.keysize))
		}

		for ; b != nil; b = b.overflow(t) {
			k := add(unsafe.Pointer(b), dataOffset)
			e := add(k, bucketCnt*uintptr(t.keysize))
			for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
				top := b.tophash[i]
				if isEmpty(top) {
					b.tophash[i] = evacuatedEmpty
					continue
				}
				if top < minTopHash {
					throw("bad map state")
				}
				k2 := k
				if t.indirectkey() {
					k2 = *((*unsafe.Pointer)(k2))
				}
				var useY uint8
				if !h.sameSizeGrow() {
					// Compute hash to make our evacuation decision (whether we need
					// to send this key/elem to bucket x or bucket y).
					hash := t.hasher(k2, uintptr(h.hash0))
					if h.flags&amp;iterator != 0 &amp;&amp; !t.reflexivekey() &amp;&amp; !t.key.equal(k2, k2) {
						// If key != key (NaNs), then the hash could be (and probably
						// will be) entirely different from the old hash. Moreover,
						// it isn't reproducible. Reproducibility is required in the
						// presence of iterators, as our evacuation decision must
						// match whatever decision the iterator made.
						// Fortunately, we have the freedom to send these keys either
						// way. Also, tophash is meaningless for these kinds of keys.
						// We let the low bit of tophash drive the evacuation decision.
						// We recompute a new random tophash for the next level so
						// these keys will get evenly distributed across all buckets
						// after multiple grows.
						useY = top &amp; 1
						top = tophash(hash)
					} else {
						if hash&amp;newbit != 0 {
							useY = 1
						}
					}
				}

				if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
					throw("bad evacuatedN")
				}

				b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
				dst := &amp;xy[useY]                 // evacuation destination

				if dst.i == bucketCnt {
					dst.b = h.newoverflow(t, dst.b)
					dst.i = 0
					dst.k = add(unsafe.Pointer(dst.b), dataOffset)
					dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
				}
				dst.b.tophash[dst.i&amp;(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
				if t.indirectkey() {
					*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
				} else {
					typedmemmove(t.key, dst.k, k) // copy elem
				}
				if t.indirectelem() {
					*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
				} else {
					typedmemmove(t.elem, dst.e, e)
				}
				dst.i++
				// These updates might push these pointers past the end of the
				// key or elem arrays.  That's ok, as we have the overflow pointer
				// at the end of the bucket to protect against pointing past the
				// end of the bucket.
				dst.k = add(dst.k, uintptr(t.keysize))
				dst.e = add(dst.e, uintptr(t.elemsize))
			}
		}
		// Unlink the overflow buckets &amp; clear key/elem to help GC.
		if h.flags&amp;oldIterator == 0 &amp;&amp; t.bucket.ptrdata != 0 {
			b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
			// Preserve b.tophash because the evacuation
			// state is maintained there.
			ptr := add(b, dataOffset)
			n := uintptr(t.bucketsize) - dataOffset
			memclrHasPointers(ptr, n)
		}
	}

	if oldbucket == h.nevacuate {
		advanceEvacuationMark(h, t, newbit)
	}
}

总结

//TODO

Golang并发模式之扇入FAN-IN和扇出FAN-OUT


在现实世界中,经常有一些工作是属于流水线类型的,它们每一个步骤都是紧密关联的,第一步先做什么,再做做么,最后做什么。特别是制造业这个行业,基本全是流水线生产车间。在我们开发中也经常遇到这类的业务场景。

假如我们有个流水线共分三个步骤,分别是 job1、job2和job3。代码:https://play.golang.org/p/e7ZlP9ofXB3

package main

import (
	"fmt"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)
	secondResult := job2(firstResult)
	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果为

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 21s

共计计算21秒。主要是因为job2中的耗时太久导致,现在我们的主要任务就是解决掉这个问题了。
这里只用了一个job2来处理job1的结果,如果我们能多开启几个goroutine job2并行处理会不会提升性能呢?

现在我们改进下代码,解决job2耗时的问题,需要注意一下,这里对ch的关闭也要作一下调整,由于启用了多个job2的goroutine,所以在job2内部进行关闭了。代码https://play.golang.org/p/qebQ00v973C

package main

import (
	"fmt"
	"sync"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func merge(inCh ...<-chan int) <-chan int {
	outCh := make(chan int, 2)

	var wg sync.WaitGroup
	for _, ch := range inCh {
		wg.Add(1)
		go func(wg *sync.WaitGroup, in <-chan int) {
			defer wg.Done()
			for val := range in {
				outCh <- val
			}
		}(&amp;wg, ch)
	}

	// 重要注意,wg.Wait() 一定要在goroutine里运行,否则会引起deadlock
	go func() {
		wg.Wait()
		close(outCh)
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)

	// 拆分成三个job2,即3个goroutine (扇出)
	secondResult1 := job2(firstResult)
	secondResult2 := job2(firstResult)
	secondResult3 := job2(firstResult)

	// 合并结果(扇入)
	secondResult := merge(secondResult1, secondResult2, secondResult3)

	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 12s

可以看到,性能提升了90%,由原来的22s减少到12s。上面代码中为了演示效果,使用的缓冲chan很小,如果调大的话,性能更明显。

FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式(1对多),所以又被称为扇出,可以用来分发任务。

FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式(多对1),所以又被称为扇入,用来收集处理的结果。

是不是很像扇子的状态, 先展开(扇出)再全并(扇入)。

总结:在类似流水线这类的逻辑中,我们可以使用FAN-IN和FAN-OUT模式来提升程序性能。

参考

重新认识Golang中的空结构体

  1. 认识空结构体
  2. 低层实现原理
  3. 空结构体之内存对齐
  4. 应用场景

在golang中,如果我们想实现一个set集合的话,一般会使用map来实现,其中将set的值作为map的键,对于map的值一般使用一个空结构体来实现,当然对map值也可以使用一个bool类型或者数字类型等,只要符合一个键值对应关系即可。但我们一般推荐使用struct{}来实现,为什么呢?

package main

import "fmt"

func main() {
	m := make(map[int]struct{})
	m[1] = struct{}{}
	m[2] = struct{}{}
	
	if _, ok := m[1]; ok {
		fmt.Println("exists")
	}
	
}

上面这段代码是一个很简单的使用map实现的set功能,这里是采用空结构体struct{}来实现。

在分析为什么使用struct{}以前,我看先认识一个struct。

认识空结构体 struct{}

我们先看一个这段代码

package main

import (
	"fmt"
	"unsafe"
)

type emptyStruct struct{}

func main() {
	a := struct{}{}
	b := struct{}{}

	c := emptyStruct{}

	fmt.Println(a)
	fmt.Printf("%pn", &amp;a)
	fmt.Printf("%pn", &amp;b)
	fmt.Printf("%pn", &amp;c)

	fmt.Println(a == b)

	fmt.Println(unsafe.Sizeof(a))
}


{} // 值
0x586a00 // a 内存地址
0x586a00 // b 内存地址, 同a一样
0x586a00 // c 内存地址,别名类型变量,同a一样
true // 丙个结构体是否相等,很显示,上面打印的是同一个内存地址
0 // 占用内存大小 

从打印结果里我们可以得出以下结论
1. 空结构体是一个无内容值的值,即空值
2. 空结构体占用0大小的内存,即不分配内存
3. 凡是空结构体他们都是一样的,即底层指向的是同一个内存地址,如何实现的呢?

对于高性能并发应用来说,内存占用大小一般都是我们关注重点对象,使用空struct{}根本不占用内存大小,相比使用其它类型的值,如bool(占用两个字节)int64(8个字节)性能要好的多,毕竟不用分配和回收内存了。

当然有人可能会说开发的应用map值不多的话,这点内存可以忽略不计。是的,确实是这样的,但这会带来一个另一个语义理解问题。如:

package main

import "fmt"

func main() {
	m := make(map[int]bool)
	m[1] = true
	m[2] = true

	if _, ok := m[1]; ok {
		fmt.Println("exists")
	}

}

我们用bool代替了空结构体,至于值是 true 还是 false 是没有任何影响的,都是bool数据类型占用的内存大小也一样。那么如果另一位开发的同事查看review源码的时候,如果这个map出现在一个大型应用的时候,会大多处出现,就很有可能带来疑惑,对于值所表达的意图就有所担心怀疑,提高了理解代码的门槛。心里如果值为true 的话,会执行一个逻辑,为false的话会执行另一个逻辑。而相比使用一个空结构体strcut{}来理解起来容易提高心智,别人一看空结构体struct{}就知道要表达的意思是不需要关心值是什么,只需要关心键值即可。

要记住一点,我们写的代码虽然是让机器运行的,但却是让人看的,能让人一眼看明白就不要看两眼,这点不正是符合了golang 这门开发语言的特性吗,只需要记住25个关键字,简单易理解,性能还高效。

底层原理

那么在底层一个空结构体又是怎么一回事呢? 为什么多个空结构体会指向同一个内存地址呢? 这和一个很重要的 zerobase 变量有关(在runtime里多次使用到了这个变量),而zerobase 变量是一个 uintptr 的全局变量,占用8个字节 (https://github.com/golang/go/blob/master/src/runtime/malloc.go#L840-L841),只要你将struct{} 赋值给一个或者多个变量,它都返回这个 zerobase 的地址,这点我们上面已经证实过这一点了。

在golang中大量的地方使用到了这个 zerobase 变量,只要分配的内存为0,就返回这个变量地址。

内存分配函数 https://github.com/golang/go/blob/master/src/runtime/malloc.go#L902-L1171

// Allocate an object of size bytes.
// Small objects are allocated from the per-P cache's free lists.
// Large objects (> 32 kB) are allocated straight from the heap.
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
	if gcphase == _GCmarktermination {
		throw("mallocgc called with gcphase == _GCmarktermination")
	}

	if size == 0 {
		return unsafe.Pointer(&amp;zerobase)
	}
       
        .......
}

结论:只要分配的内存大小为 0 字节,就返回 &zerobase 的指针,不仅仅是空结构体。

内存对齐

如果您对内存对齐不太了解的话,可以参考这篇Memory Layouts,注意下32位和64位系统之间的差异。有一点需要保证在 32 位系统上想要原子操作 64 位字(如 uint64)的话,需要由调用方保证其数据地址是 64 位对齐的,否则原子访问会有异常。

还拿uint64来说,大小为 8bytes,32 位系统上按 4bytes 对齐,64 位系统上按 8bytes 对齐。

在 64 位系统上,8bytes 刚好和其字长相同,所以可以一次完成原子的访问,不被其他操作影响或打断。

而 32 位系统,4byte 对齐,字长也为 4bytes,可能出现uint64的数据分布在两个数据块中,需要两次操作才能完成访问。

如果两次操作中间有可能别其他操作修改,不能保证原子性。

这样的访问方式也是不安全的。

这一点issue-6404[5]中也有提到:

This is because the int64 is not aligned following the bool. It is 32-bit aligned but not 64-bit aligned, because we’re on a 32-bit system so it’s really just two 32-bit values side by side.

For the numeric types, the following sizes are guaranteed:

type                                 size in bytes

byte, uint8, int8                     1
uint16, int16                         2
uint32, int32, float32                4
uint64, int64, float64, complex64     8
complex128                           16

空结构体作为一个占用0字节的数据类型与其它基本类型对比的话,确实有些特殊。

代码 https://play.golang.org/p/HDOXko5iLNO

package main

import (
	"fmt"
	"unsafe"
)

// 放在第1个字段,共需要24字节
type T0 struct {
	s  struct{} // 0
	f2 int32    // 4
	f3 int32    // 4
	f1 bool     // 1
	f4 int64    // 8
}

// 放在第1个字段,共需要16字节
// f2为 int16 类型
type T1 struct {
	s  struct{} // 0
	f1 bool     // 1
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
}

// 中间字段,共需要16字节
type T2 struct {
	f1 bool     // 1
	s  struct{} // 0
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
}

// 最后一个字段, 共需要24字节
type T3 struct {
	f1 bool     // 1
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
	s  struct{} // 0
}

// 最后一个字段, 共需要24字节
type T4 struct {
	f1 bool     // 1
	f2 int16    // 2
	f4 int64    // 8
	f3 int32    // 4
	s  struct{} // 0
}

// 最后一个字段, 共需要24字节
// 这里f1数据类型由bool变成了int16
type T5 struct {
	f4 int64    // 8
	f3 int32    // 4
	f1 int16    // 2
	f2 int16    // 2
	s  struct{} // 0
}

func main() {
	bit := 32 << (^uint(0) >> 63)
	fmt.Printf("当前系统为 %d 位n", bit)

	var t0 T0
	var t1 T1
	var t2 T2
	var t3 T3
	var t4 T4
	var t5 T5

	fmt.Println("t0:", unsafe.Sizeof(t0)) // (0+4)+(4)+ (8) 共占用24字节对齐
	fmt.Println("t1:", unsafe.Sizeof(t1)) // (0+1+2+4)+ (8) 共占用8字节对齐

	fmt.Println("t2:", unsafe.Sizeof(t2)) // (1+0+2+4)+ (8) 共占用8字节对齐

	fmt.Println("t3:", unsafe.Sizeof(t3)) // (1+2+4)+(8)+(0)共占用24字节
	fmt.Println("t4:", unsafe.Sizeof(t4)) // (1+2)+(8)+(4+0)共占用24字节
	fmt.Println("t5:", unsafe.Sizeof(t5)) // (8)+(4+2+2)+ (0) 共占用24字节
}


结果

当前系统为 64 位
t0: 24
t1: 16
t2: 16
t3: 24
t4: 24
t5: 24

这里运行环境为64位系统,所以按8字节对齐。

T0T1是将struct{}放在首字段的情况;
T2是放在中间位置的情况;
T3T4T5是作为最后一个字段的情况;

当作为最后一个字段的时候,如果它前面的字段放在一起的话等于8的话,则空结构体单独占用8字节(T5),否则可以与前面的共占8字节(T4)。

总结:上面结果我们可以看出,虽然空结构体占用0字节,但在进行内存对齐的时候是需要考虑这个字段,可以理解为将其视为需占用1个字节,不然这个结构体就没有任何意义了 。

对于Golang为什么要内存对齐, 有没有必要使用,可以看看这篇文章 Golang 是否有必要内存对齐?

空结构体struct{}使用场景

一般我们用在用户不关注值内容的情况下,只是作为一个信号或一个占位符来使用。

  1. 基于map实现集合功能
    就是我们上面提到的情况
  2. 与channel组合使用,实现一个信号
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	conLimit := make(chan struct{}, 2)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		conLimit <- struct{}{}
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			// doing...

			fmt.Println(i)
			time.Sleep(time.Second)
			<-conLimit
		}(i)
	}

	wg.Wait()
	close(conLimit)

	fmt.Println("ok")

}

基于有缓冲的channel是实现并发限速。

参考

https://golang.org/ref/spec#Size_and_alignment_guarantees

Memory Layouts

Dig101:Go之聊聊struct的内存对齐

Golang 是否有必要内存对齐?

认识Golang内存对齐

Golang中的内存重排(Memory Reordering)

什么是内存重排

内存重排指的是内存的读/写指令重排。— Xargin

为什么要内存重排

为了提升程序执行效率,减少一些IO操作,一些硬件或者编译器会对程序进行一些指令优化,优化后的结果可能会导致程序编码时的顺序与代码编译后的先后顺序不一致。

就拿做饭场景来说吧,是先蒸米还是先炒菜,这两者是没有冲突的,编译器在编译时有可能与你要求的顺序不一样。

编译器重排

如下面这段代码

X = 0
for i in range(100):
    X = 1
    print X

要实现打印100次1,很显示在for里面每次都执行X=1语句有些浪费资源,如果将初始变量值修改为1,是不是要快的多。编译器也分析到了这一点,于是在编译时对代码做了以下优化

X = 1
for i in range(100):
    print X

最终输出结果是一样的,两段代码功能也一样。

但是如果此时有另一个线程里执行了一个 X=0 的赋值语句的话(两个线程同时运行),那么输出结果就可能与我们想要的不一样了。

优化前情况:第一个线程执行到了第3次print X 后,第二个线程执行了X=0,把X 的值进行了修改,结果就有可能是1110 1111(在线程执行第5次时,重新执行了X=1,而且之后一直都是 1)。这就有问题了。

优化后情况:按上面的逻辑来执行的话,结果就是11100000…, 后面全是0,再也没有机会将X贬值为1了

由此我们可以得出一个结果,在多线程下,是无法保证编译前后的代码功能是“待价”的。

所以要开发时,这一点我们一定要小心。搞不好在单线程运行没有问题的程序在多线程下会出现各种各样的问题。

每当我们提到内存重排的时候,其实真实在讨论的主题是 内存屏障Memory barrier。指令重排无法逾越内存屏障。

CPU 重排

参考:https://www.w3xue.com/exp/article/20196/40758.html

CPU 架构

Figure 1. CPU Architecture
Figure 2. Store Buffer
Figure 3. MESI Protocol

MESI 可参考 https://mp.weixin.qq.com/s/vnm9yztpfYA4w-IM6XqyIA

参考

从 Memory Reordering 说起
CPU指令重排/内存乱序
谈谈Go语言中的内存重排

相关话题

锁、内存屏障与缓存一致性 https://gocode.cc/project/9/article/128

memory barrier

[译] 什么是缓存 false sharing 以及如何解决(Golang 示例) https://juejin.cn/post/6844903866270482445

CPU缓存体系对Go程序的影响 https://mp.weixin.qq.com/s/vnm9yztpfYA4w-IM6XqyIA

Golang中的并发原语 Singleflight

在Golang中有一个并发原语是Singleflight,好像知道的开发者并不多。其中著名的 https://github.com/golang/groupcache 就用到了这个并发原语。

Golang版本

go1.15.5

相关知识点

map、Mutex、channel、

使用场景

一般用在对指定资源频繁操作的情况下,如高并发下的“缓存击穿”问题。

缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去DB读取数据,这些请求都会击穿缓存到DB,造成瞬时DB请求量大、压力瞬间骤增,导致数据库负载过高,影响整个系统正常运行。(缓存击穿不同于 缓存雪崩 和 缓存穿透)

缓存击穿

怎么理解这个原语呢,简单的讲就是将对同一个资源的多个请求合并为一个请求。

举例说明,假如当有10万个请求来获取同一个key的值的时候,正常情况下会执行10万次get操作。而使用singleflight并发语后,只需要首次的地个请求执行一次get操作就可以了,其它请求再过来时,只需要只需要等待即可。待执行结果返回后,再把结果分别返回给等待中的请求,每个请求再返回给客户端,由此看看,在一定的高并发场景下可以大大减少系统的负载,节省大量的资源。

注意这个与 sync.Once是不一样的,sync.Once 是全局只能有一个,但本并发原语则是根据key来划分的,并且可以根据需求来决定什么情况下共用一个。

实现原理

主要使用 Mutext 和 Map 来实现,以 key 为键,值为 *call。每个*call中存储有一个请求 chans 字段,用来存储所有请求此key的客户端,等有返回结果的时候,再从chans字段中读取出来,分别写入即可。

源文件为 /src/internal/singleflight/singleflight.go

Singleflight 数据结构如下

  1. Do()
    这个方法是一个执行函数并返回执行结果
    参数
    key 要请求的key,多个请求可能请求的是同一个key,同时也只有一个函数在执行,
    fn key对应的执行函数,此函数有三个返回值v, err, shared。其中shared表示当前返回结果值是只有一个请求还是由于多个请求发生返回的
  2. DoChan()
    类型与Do()方法,但返回的是个 ch 类型,等函数 fn 执行后,可以通过读取返回的ch来获取函数结果
  3. Forget()
    在官方库internal/singleflight/singleflight.go中这个名字是ForgetUnshared, 告诉 Group 忘记请求的这个key,下次再请求时,直接当作新的key来处理就可以了。其实就是将这个key从map中删除,后续再有这个key的操作的话,即视为新一轮的处理逻辑。

其中Do() 和 DoChan() 的功能一样,只是获取数据的方式有所区别,开发者可以根据自己的情况来选择使用哪一个。另外还包含一个由Do()方法调用的私有方法 doCall(),直接执行key处理方法的函数

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {...}

实现数据结构

实现内部主要的数据结构一共三个,分别是 call、Group 和 Result。

// 定义 map[key] 的值 call 类型
// 在call 内部存储对应key相关的所有请求客户端信息
type call struct {
	wg sync.WaitGroup

	// These fields are written once before the WaitGroup is done
	// and are only read after the WaitGroup is done.
        // 请求返回结果,会在sync.WaitGroup为Done的时候执行
	val interface{}
	err error

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
        // 可以理解为请求key的个数,每增加一个请求,则值加1 
	dups  int

        // 存储所有key对应的 Result{}, 一个请求对应一个 Result
	chans []chan<- Result
}

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
// SingleFlight 的主结构体
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Result holds the results of Do, so they can be passed
// on a channel.
// 定义请求结果数据结构
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

call 数据结构是用来记录key相关数据,则请求当前key的个数和请求结果
Group 是并发原语 SingleFlight 的主数据结构, m 字段用来存储key与请求的关系,而mu则是Mutex锁
Result 是请求结果的数据结构,其中Val和Err 是实现请求后返回的结果,而 Shared 字段则是用来表示当前结果是否进行了合并处理,如果当前key只有一个请求在使用的话,则返回false, 否则返回 true

用法

package main

import (
	"fmt"
	"golang.org/x/sync/singleflight"
	"log"
	"sync"
	"time"
)

func getDataFromDB(key string) (string, error) {
	defer func() {
		log.Println("db query end")
	}()

	log.Println("db query begin")
	time.Sleep(2 * time.Second)
	result := key + "abcxyz"

	return result, nil
}

func main() {
	var singleRequest singleflight.Group

	getData := func(requestID int, key string) (string, error) {
		log.Printf("request %v start request ...", requestID)

		// 合并请求
		value, _, _ := singleRequest.Do(key, func() (ret interface{}, err error) {
			log.Printf("request %v is begin...", requestID)
			ret, err = getDataFromDB(key)
			log.Printf("request %v end!", requestID)
			return
		})

		return value.(string), nil
	}

	var wg sync.WaitGroup
	key := "orderID"
	for i := 1; i < 10; i++ {
		wg.Add(1)
		go func(wg *sync.WaitGroup, requestID int) {
			defer wg.Done()
			value, _ := getData(requestID, key)
			log.Printf("request %v get value: %v", requestID, value)
		}(&amp;wg, i)
	}
	wg.Wait()

	fmt.Println("main end")
}

输出结果

2020/11/20 12:11:44 request 6 start request …
2020/11/20 12:11:44 request 6 is begin…
2020/11/20 12:11:44 request 2 start request …
2020/11/20 12:11:44 request 9 start request …
2020/11/20 12:11:44 request 7 start request …
2020/11/20 12:11:44 request 3 start request …
2020/11/20 12:11:44 request 4 start request …
2020/11/20 12:11:44 request 1 start request …
2020/11/20 12:11:44 request 5 start request …
2020/11/20 12:11:44 request 8 start request …
2020/11/20 12:11:44 db query begin
2020/11/20 12:11:46 db query end
2020/11/20 12:11:46 request 6 end!
2020/11/20 12:11:46 request 6 get value: orderIDabcxyz
2020/11/20 12:11:46 request 8 get value: orderIDabcxyz
2020/11/20 12:11:46 request 2 get value: orderIDabcxyz
2020/11/20 12:11:46 request 9 get value: orderIDabcxyz
2020/11/20 12:11:46 request 7 get value: orderIDabcxyz
2020/11/20 12:11:46 request 3 get value: orderIDabcxyz
2020/11/20 12:11:46 request 4 get value: orderIDabcxyz
2020/11/20 12:11:46 request 1 get value: orderIDabcxyz
2020/11/20 12:11:46 request 5 get value: orderIDabcxyz
main end

从上面的输出内容可以看到,我们同时发起了10个gorourtine来查询同一个订单信息,真正在DB 层查询操作只有一次,大大减少了DB的压力。

总结

使用SingleFlight时,在高并发下且对少量key频繁读取的话,可以大大减少服务器负载。在一定场景下特别的有用,如 CDN。

Protobuf协议实现原理

protobuf是Google开源的一款支持跨平台、语言中立的结构化数据描述和高性能序列化协议,此协议完全基于二进制,所以性能要远远高于JSON/XML。由于出色的传输性能所以常见于微服务之间的通讯,其中最为著名的是Google开源的 gRPC 框架。

那么protobuf是如何实现高性能的,又是如何实现数据的编码和解码的呢?

protobuf协议原理

基于128bits的数据存储方式(Base 128 Varints)

Varint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。


比如对于 int32 类型的数字,一般需要 4 个 byte 来表示。但是采用 Varint,对于很小的 int32 类型的数字,则可以用 1 个 byte 来表示。当然凡事都有好的也有不好的一面,采用 Varint 表示法,大的数字则需要 5 个 byte 来表示。从统计的角度来说,一般不会所有的消息中的数字都是大数,因此大多数情况下,采用 Varint 后,可以用更少的字节数来表示数字信息。

Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。其他的 7 个 bit 都用来表示数字。因此小于 128 的数字都可以用一个 byte 表示。大于 128 的数字,比如 300,会用两个字节来表示:1010 1100 0000 0010。

另外如果从数据大小角度来看,这种表示方式比实现的数据多了一个bit, 所以其实际传输大小就多14%(1/7 = 0.142857143)。

数字1表示方式:0000 0001

对于小的数据比较好理解,正常情况下1的二进制是 0000 0001,使用128bits表示的话,首位结束标识位也是0,所以两者结果是一样的 0000 0001。

数字 300 表示方式:1010 1100 0000 0010

300

这个有点不太好理解了,这是因为原本用一个字节(8bit)就可以表示,但由于使用128bits表示方法,需要对每个字节的最高位添加一个结束标识位来表示,所以一个字节已经不够用了,需要占用两个字节来表示,其中两个字节最高位都是结束标识位。

如果正向推算的话,我们知道数字300的二进制值 1 0010 1100,用两个字节表示完整值则为
0000 0001 0010 1100 # 二进制
_000 0010 _010 1100 # 二进制每个字节的最高位向左移动一个位置,放入结束标识位
0000 0010 1010 1100 # 转换为128bits方式,1:结束,0:未结束
1010 1100 0000 0010 # 转换为小端字节序, 低字节在前,高字节在后

注意这里是先添加结束标识符,然后再转为小端字节序。

协议数据结构

消息经过序列化后会成为一个二进制数据流,该流中的数据为一系列的 Key-Value 对。如下图所示:

图 7. Message Buffer
图 7. Message Buffer

采用这种 Key-Pair 结构无需使用分隔符来分割不同的 Field。对于可选的 Field,如果消息中不存在该 field,那么在最终的 Message Buffer 中就没有该 field,这些特性都有助于节约消息本身的大小。

Key 用来标识具体的 field,在解包的时候,客户端创建一个结构对象,Protocol Buffer 从数据流中读取并反序列化数据,并根据 Key 就可以知道相应的 Value 应该对应于结构体中的哪一个 field。

而Key也是由以下两部分组成

Key 的定义如下:

1(field_number << 3) | wire_type
Key的定义

可以看到 Key 由两部分组成。第一部分是 field_number。第二部分为 wire_type。表示 Value 的传输类型。

一个字节的低3位表示数据类型,其它位则表示字段序号。

Wire Type 可能的类型如下表所示:

表 1. Wire Type

Type
MeaningUsed For
0Varintint32, int64, uint32, uint64, sint32, sint64, bool, enum
164-bitfixed64, sfixed64, double
2Length-delimistring, bytes, embedded messages, packed repeated fields
3Start groupGroups (deprecated)
4End groupGroups (deprecated)
532-bitfixed32, sfixed32, float
Wire Type 数据类型
message Test1 {
    required int32 a = 1;
}

在我们的例子当中,field id 所采用的数据类型为 int32,因此对应的 wire type 为 0。细心的读者或许会看到在 Type 0 所能表示的数据类型中有 int32 和 sint32 这两个非常类似的数据类型。Google Protocol Buffer 区别它们的主要意图也是为了减少 encoding 后的字节数。

每个数据头同样采用128bits方式,一般1个字节就足够了,

本例中字段a 的序号是1

如上创建了 Test1 的结构并且把 a 的值设为 2,序列化后的二进制数据为
0000 1000 0000 0010

Key 部分是 0000 1000
value 部分是 0000 0010, 其中字节最高位是结束标识位,即10进制的2,我们在转换的时候统一将符号位转为0即可。

协议规定数据头的低3位表示wire_type, 其它字段表示字段序号field_number,因此
0000 1000
_000 1000 # 去掉结束标识符位
_000 1000 # 000 表示数据类型, 这里是Varint
_000 1000 # 0001 这四位表示字段序号

参考

https://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

Golang开发中中使用GitHub私有仓库

私有仓库地址为

github.com/cfanbo/websocket

一、设置私有环境变量 GOPRIVATE

$ go env -w GOPRIVATE=github.com/cfanbo/websocket

对于为什么需要设置 GOPRIMARY 变量,可以参考这里

对于GOPRIVATE值级别分为仓库级别和账号级别。

如果只有一个仓库,直接设置为仓库地址即可。如果有多个私有仓库的话,使用”,”分开,都在这个账号下,也可以将值设置为账号级别,这样账号下的所有私有仓库都可以正常访问。如 http://github.com/cfanbo

如果不想每次都重新设置,我们也可以利用通配符,例如:

$ go env -w GOPRIVATE="*.example.com"

这样子设置的话,所有模块路径为 example.com 的子域名(例如:git.example.com)都将不经过 Go module proxy 和 Go checksum database,需要注意的是不包括 example.com 本身。

国内用户访问仓库建议设置 GORPOXY为 https://proxy.golang.org,direct

二、设置凭证

使用私有仓库一定要绕不开权限设置这一步。访问仓库来常见的有两种方式,分别为SSH和 Https 。对于私有仓库来说,ssh可以设置rsa私钥来访问,https这种则可以使用用户名和密码,一般通过命令行访问的时候,会自动提示用户输入这些信息。

对于权限控制这一块可参考官方文档 。其实在官方文档里还提供了第三种访问仓库的方式,那就是 Personal access token,简称 PAT, 这种 Token 是专门为api调用提供的,常见于自动化工作流中,如 CICD场景。

这里我们就利用PAT 来实现

  1. 在Github.com 网站生成 Personal access tokens,新手可参考官方教程文档
  2. 本地配置token凭证
$ git config --global url."https://${username}:${access_token}@github.com".insteadOf / "https://github.com"

如果你在使用Github Actions部署时,遇到无法读取版本号问题,需要改写成 git config –global url.”https://${username}:${access_token}@github.com”.insteadOf “https://github.com

命令验证

go get github.com/cfanbo/websocket

到这里基本配置基本完成了。

其它场景

如果要用在docker环境中的话,也要记得设置上面的几个环境变量值。

以下为一个docker示例

# Start from the latest golang base image
FROM golang:alpine

RUN GOCACHE=OFF

# 设置环境变量
RUN go env -w GOPRIVATE=github.com/ereshzealous

# Set the Current Working Directory inside the container
WORKDIR /app

# Copy everything from the current directory to the Working Directory inside the container
COPY . .

RUN apk add git

# 设置访问仓库凭证
RUN git config --global url."https://user-name:<access-token>@github.com".insteadOf "https://github.com"

# Build the Go app
RUN go build -o main .

# Expose port 8080 to the outside world
EXPOSE 8080

#ENTRYPOINT ["/app"]

# Command to run the executable
CMD ["./main"]

利用jenkins+github实现应用的自动部署及回滚

对于jenkins的介绍这里不再详细写了,此教程只是为了让大家对部署和回滚原理有所了解。

一、创建项目

点击左侧的“New Item”,输入项目名称,如 rollback-demo。

选中 ” 丢弃旧的构建(Discard old builds)”项,在“策略(Strategy” 选择”Log Rotation“, 并输入保留的最大构建个数。

二、常规配置

设置参数,点击”Add Parameter“,依次选择 “Choice Parameter” 和 “String Parameter“这两,填写如下

这里的Name 项为参数名称,用户在操作的时候,会在deploy 和 rollback 两个值中选择一项。

三、源码管理

我们这里选择Git.并填写github.com上的项目地址,记得设置认证 Credentials。构建分支直接使用默认的 */master 即可以了。查看代码浏览器选择 githubweb,并填写项目的github地址。

四、构建触发事件

选择 “GitHub hook trigger for GITScm polling”,表示使用github webhook来触发构建操作,要实现引功能,需要在项目地址github.com里的“setting”里添加一个webhook的url地址,一般地址为http://jenkins.com/github-webhook/

同时为了防止网络通讯不稳定的情况,同时选择 “Poll SCM”, 在调度Schedule 杠中填写 H/5 * * * *,表示5分钟自动从github上拉取数据一次,如果有变化就进行构建。

五、构建环境

我们演示为了简单,使用了php项目,这里不进行任何操作。如果java、NodeJS或者Golang的话,可能需要进行一些操作, 有时为了方便会把这些操作放在下一步shell脚本里进行。

六、构建配置

1.添加构建步骤,点击Add build step,选择“Execute shell”,填写内容如下

#!/bin/bash
case $deploy_env in
deploy)
	echo "deploy $deploy_env"
    ;;
rollback)
	echo "rollback $deploy_env version=$version"
    cp -R ${JENKINS_HOME}/jobs/rollback-demo/builds/${version}/archive/*.* ./
    pwd &amp;&amp; ls
    ;;
    *)
    exit
    ;;
esac    

可以看到当rollback的时候,是从原来构建归档路径里把文件复制出来。

这里正常情况下应该有一些单元测试之类的脚本,这里省略不写了。

2.添加构建后的操作。点击“Add post-build action” -> “Archive the artifacts” 配置用于归档的文件为“**/*“,表示所有文件。这点十分重要,只有每次构建完归档了才有东西回滚,另外时间长了,归档的内容越来越多,所以上面设置了最大归档个数。
3.再次添加”Send build artifacts over SSH”,配置内容如下

注意 shell脚本里的路径比 Remote directory 的路径里多一个/data 目录,这是由于在配置 ssh server 的时候指定了一个根目录为 /data.

如果在 Add post-build action 中找不到send build artifacts over ssh ,则说明需要安装一下插件,左侧点击“Manage Jenkins”-> “Manage Plugis”, 搜索“Publish Over SSH”安装即可。

到这里为了基本配置完成了。

测试配置

这里我们首次手动构建一次,点击项目页面左侧菜单的“build with Parameter”,显示如下

在正常deploy的时候,version字段时忽略掉即可。如果要加滚的话,则需要选择”rollback”,同时填写 version字段号,这个字段号为页面左下角build history的编号,就是以#开始的那些数字

golang中几种对goroutine的控制方法

我们先看一个代码片断

func listen() {
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <-ticker.C:
			fmt.Println(time.Now())
		}
	}
}
func main() {
	go listen()
	time.Sleep(time.Second * 5)
	fmt.Println("main exit")
}

非常简单的一个goroutine用法,想必每个gopher都看过的。

不过在实际生产中,我们几乎看不到这种用法的的身影,原因很简单,我们无法实现对goroutine的控制,而一般业务中我们需要根据不同情况对goroutine进行各种操作。

要实现对goroutine的控制,一般有以下两种。

一、手动发送goroutine控制信号

这里我们发送一个退出goroutine的信号。

// listen 利用只读chan控制goroutine的退出
func listen(ch <-chan bool) {
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <-ticker.C:
			fmt.Println(time.Now())
		case <-ch:
			fmt.Println("goroutine exit")
			return
		}
	}
}

func main() {
	// 声明一个控制goroutine退出的chan
	ch := make(chan bool, 1)
	go listen(ch)

	// 只写chan
	func(ch chan<- bool) {
		time.Sleep(time.Second * 3)
		fmt.Println("发送退出chan信号")
		ch <- true
		close(ch)
	}(ch)

	time.Sleep(time.Second * 5)
	fmt.Println("main exit")
}

我们在main函数里发送一个控制goroutine的退出信息,在goroutine里我们会select这个通道,如果收到此信息,则直接退出goroutine。这里我们使用到了单向chan。

二、利用 context 包来控制goroutine

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()

	go func() {
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				fmt.Println(time.Now())
			case <-ctx.Done():
				// 3秒后会收到信号
				fmt.Println("goroutine exit")
				return
			}
		}
	}()

	time.Sleep(time.Second * 5)
	fmt.Println("main exit")
}

这里主要用到了上下文包context来实现,如果你看过包源码的话会发现 ctx.Done() 其实也是chan的读取操作,其原理和第一种方法是完全一样。

Golang遍历切片删除元素引起恐慌问题

删除一个切片的一些元素,https://github.com/golang/go/wiki/SliceTricks告知切片操作:Golang遍历切片恐慌时删除元素

a = append(a[:i], a[i+1:]...)

然后我下面的编码:

package main 

import (
    "fmt" 
) 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
    for i, value := range slice { 
     if value%3 == 0 { // remove 3, 6, 9 
      slice = append(slice[:i], slice[i+1:]...) 
     } 
    } 
    fmt.Printf("%vn", slice) 
} 

go run hello.go,它恐慌:

panic: runtime error: slice bounds out of range 

goroutine 1 [running]: 
panic(0x4ef680, 0xc082002040) 
    D:/Go/src/runtime/panic.go:464 +0x3f4 
main.main() 
    E:/Code/go/test/slice.go:11 +0x395 
exit status 2 

我该如何更改此代码才能正确使用?

想到的有几下几种方法

1、使用goto和标签

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
Label: 
    for i, n := range slice { 
     if n%3 == 0 { 
      slice = append(slice[:i], slice[i+1:]...) 
      goto Label 
     } 
    } 
    fmt.Printf("%vn", slice) 
} 

它的工作原理很简单,就是不停的迭代,每次删除一个元素后,都需要从切片的开头重新迭代,太过于效率低下了,特别是当一个切片很大的情况下。

2,使用另一个变量临时存储想要的元素

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
    dest := slice[:0] 
    for _, n := range slice { 
     if n%3 != 0 { // filter 
      dest = append(dest, n) 
     } 
    } 
    slice = dest 
    fmt.Printf("%vn", slice) 
} 

这种方法有些浪费内存,一是申请变量占用内存,另外切片扩大时,底层会重新申请内存空间,并将数据迁移过去。

3,从Remove elements in slice,与len操作:

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
    for i := 0; i < len(slice); i++ { 
     if slice[i]%3 == 0 { 
      slice = append(slice[:i], slice[i+1:]...) 
      i-- // should I decrease index here? 
     } 
    } 
    fmt.Printf("%vn", slice) 
} 

要选哪一个,这里看一下基准测试

与基准:

func BenchmarkRemoveSliceElementsBySlice(b *testing.B) { 
    for i := 0; i < b.N; i++ { 
     slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
     dest := slice[:0] 
     for _, n := range slice { 
      if n%3 != 0 { 
       dest = append(dest, n) 
      } 
     } 
    } 
} 

func BenchmarkRemoveSliceElementByLen(b *testing.B) { 
    for i := 0; i < b.N; i++ { 
     slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 
     for i := 0; i < len(slice); i++ { 
      if slice[i]%3 == 0 { 
       slice = append(slice[:i], slice[i+1:]...) 
      } 
     } 
    } 
} 


$ go test -v -bench=".*" 
testing: warning: no tests to run 
PASS 
BenchmarkRemoveSliceElementsBySlice-4 50000000    26.6 ns/op 
BenchmarkRemoveSliceElementByLen-4  50000000    32.0 ns/op 

从结果上看来使用第2种方法好像好一些的。还哪更好的解决办法没有了呢,网友给出了一种更合理的解决方案,也是使用迭代。

最佳方案:

package main 

import "fmt" 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 

    k := 0 
    for _, n := range slice { 
     if n%3 != 0 { // filter 
      slice[k] = n 
      k++ 
     } 
    } 
    slice = slice[:k] 

    fmt.Println(slice) //[1 2 4 5 7 8] 
} 

实现原理就是将要保留的数据向左边存储,参考:https://play.golang.org/p/eMZltc_gEB,不得不说这个方法真是让人脑洞大开。

来源: https://stackoverflow.com/questions/38387633/golang-remove-elements-when-iterating-over-slice-panics/38387701#38387701

package main 

import "fmt" 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 

    k := 0 
    for i, n := range slice { 
     if n%3 != 0 { // filter 
      if i != k { 
       slice[k] = n 
      } 
      k++ 
     } 
    } 
    slice = slice[:k] 

    fmt.Println(slice) //[1 2 4 5 7 8] 
} 

如果您需要新片保留旧切片

package main 

import "fmt" 

func main() { 
    slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} 

    s2 := make([]int, len(slice)) 
    k := 0 
    for _, n := range slice { 
     if n%3 != 0 { // filter 
      s2[k] = n 
      k++ 
     } 
    } 
    s2 = s2[:k] 

    fmt.Println(s2) //[1 2 4 5 7 8] 
} 

http://cn.voidcc.com/question/p-mkbvfagj-hy.html