golang的channel实现原理

channel是golang中特有的一种数据结构,通常与goroutine一起使用,下面我们就介绍一下这种数据结构。

channel数据结构

channel最重要的一个结构体就是hchan,我们创建一个channel的时候,实际上是创建了一个下面结构体的实例。

hchan结构体

// src/runtime/chan.go

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

字段说明

  • qcount 当前channel中的元素数量
  • dataqsiz 环形队列的大小
  • buf 指向dataqsize的数组指针,只有缓冲chan有效
  • closed 当前channel关闭状态
  • elemsize 存储元素的大小
  • elemtype 存储元素的数据类型
  • sendx 发送操作处理到的索引位置,最大值为数组buf的最大下标值
  • recvx 接收操作处理到的索引位置,最大值为数组buf的最大下标值
  • recvq 接收队列,双向链表,阻塞元素
  • sendq 发送列队,双向链表,阻塞元素
  • lock 锁,,用来保护sudog里的所的字段
hchan struct

其中elemsizeelemtype 表示存储数据的大小和类型;sendxrecvx是指向底层数据的索引位置,表示当前处理的进度位置;recvqsendq 是一个由双向链表实现的队列,它存储的内容是由于队列dataqsize过小,而阻塞的数据。

每次进行发送数据和读取数据时都需要加锁。

waitq结构体

// src/runtime/chan.go

type waitq struct {
	first *sudog
	last  *sudog
}

sudog结构体

// src/runtime/runtime2.go

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

这里 sudog 实际上是对 goroutine 的一个封装,在等待列表中如sendqrecvq,一个sudog 就是一个goroutine。

sudogs 是通过一个特殊的池来分配的,通过acquireSudog() releaseSudog()进行获取和释放。

sudog里的字段是由 hchan.lock 锁来进行保护。

channel 整体结构图

hchan 结构图(来源:互联网技术窝)

创建

// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

通过编译可以发现channel的创建是由makechan()函数来完成的。源码

// src/runtime/chan.go

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
	}
	return c
}

函数返回的是一个指针类型,因此我们可以在函数中通过参数直接传递,不需要再转为指针使传递。

步骤

  1. 数据合法性检查,包括发送数据的类型和大小
  2. 根据不同场景分配内存,主要针对buf字段
    a. 内存大小为0,注意这时c.buf 的值为c.raceaddr()
    b. 元素不包含指针,一次性分配一段内存地址
    c. 元素包含指针,分配内存
  3. 初始化其它字段

第一个参数 *chantype 结构定义

// src/runtime/type.go

type chantype struct {
	typ  _type
	elem *_type
	dir  uintptr
}

实际上创建一个channel, 只是对一个hchan结构体进行了一些初始化操作,并返回其指针。因此我们在函数传递时,不需要传递指针,直接使用就可以了,因为它本身就是一个指针的类型。

注意:对于chan内存是在heap上分配的。

发送数据

对于channel的写操作是由chansend() 函数来实现的。

对于分送数据chan有三种情况,分别是直接发送,缓存区发送阻塞发送,其中阻塞发送涉及到GMP 的调度,理解起来有些吃力。

在发送数据前需要进行加锁操作,发送完再解锁,保证原子性操作。

直接发送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	......

	// 直接发送
	// 如果接收队列中有接收者,则直接将数据发给接收者,重点在send()函数,并在函数里进行解锁
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	......
}

如果接收队列中有接收者,则优化从接收者从队列中取出来sg(sg := c.recvq.dequeue()),然后再通过调用 send() 函数将数据发送给接收者即可。

channel send

在send()函数里,会执行一个回调函数主要用来进行解锁c.lock。真正的发送操作是函数 sendDirect(),通过memmove(dst, src, t.size) 将数据复制过去。

缓冲区发送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	......

	// 缓冲区发送
	// 接收者队列中没有接收者goroutine
	// 当前channel中的元素<队列的大小,有缓冲buffer未满的情况
	// 将数据存放在sendx在buf数组中的索引位置,然后再将sendx索引+1
	// 由于是一个循环数组,所以如果达到了dataqsize,则从0开始,同时个数+1
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}	

	......
}

如果当前recvq 队列里没有处于等待执行的sudog的话,则需要将数据发送到缓冲队列中(如果当前队列为缓冲chan)。

假设当前buffer大小为6(dataqsiz=6),数据个数为0(qcount=0),这里写入6个数据,如下图。

channel send

如果当前缓冲区的元素数量<队列的大小,说明缓冲区还没有满,还可以继续装载数据。

这时第一步先计算出 s.sendx 索引位置的内存地址,然后调用 typememmove() 函数将qp复制到内存地址,再将s.sendx索引值+1,同时c.qcount++

sendx = dataqsiz 的时候,说明已到了数组最后一个元素,下次存储数据的话,则需要重新从0开始了,所以需要重置为0

buf是一个由数组组成的队列,满足队列的FIFO的机制,最新存储的数据也先消费,最多可以存储 dataqsiz 个数量。超出这个数据量就需要使用第三种 阻塞发送 方式了。

sendx 始终保存的是下次存储数据的数组索引位置,每次使用完记得+1 。每次存储以前都需要判断当前buffer是否有空间可用 c.qcount < c.dataqsiz

总结

  • q.sendx 最大值为 c.dataqsiz -1,即数组的最大索引值。
  • q.count 是当前chan 存储的元素个数,有可能 > c.dataqsiz

阻塞发送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	......

	
	// 阻塞发送
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true	

	......
}

如果当buff也写满的话,再send数据的话,则需要进行阻塞发送了。

channel send

假如我们有一个缓冲chan,但缓冲大小已经使用完,再次发送数据的话,则需要进入sendq队列了(将sudog绑定到一个goroutine,并放在sendq,等待读取)

对于阻塞的情况,理解起来有些吃力,因为涉及到GMP的关系和调度。

  1. 调用 getg() 函数获取当前运行的goroutine
  2. 调用 acquireSudog() 函数获取一个sudog,并进行数据绑定
  3. 将mysg 添加到发送队列sendq,并设置为gp.waiting
  4. 更改goroutine状态
  5. 设置goroutine为等待唤醒状态,调用 atomic.Store8(&gp.parkingOnChan, 1)函数?
  6. 通过keepAlive()函数可以保证发送的值一直有效,直到被接收者取走
  7. 进行清理工作
  8. 释放 sudog 结构体

总结

读取数据

对于channel的读取方式:

v <- ch
v, ok <- ch

其中 v<-ch 对应的是runtime.chanrecv1(), v, ok <-ch 对应的是runtime.chanrecv2()。但这两个函数最终调用的还是同一个函数,即 chanrecv()

我们先看一下官方文档对这个函数的说明

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
  • chanrecv 用来从chan 中接收数据,并将接收的数据写入到ep
  • 如果ep为 nil 的话,则接收的数据将被忽略
  • 如果非阻塞的且没有可接收的数据将返回 (false ,false)
  • 如果chan已关闭,零值 *ep 和返回值将是true, false,否则使用一个元素代替*ep并返回 (true, true)
  • 一个非nil的 ep, 必须指向heap或者调用stack
// src/runtime/chan.go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	
	// 如果c为nil,表示非法操作,则直接gopark(),表示出让当前GMP中的P的使用权,允许其它G使用
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	...
	
	// 如果chan已关闭且元素个数为0
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			// 设置内存内容为类型 c.elemtype 的零值
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

}

如果当前读取的 chan 为nil的话,则会产生死锁,最终提示

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive (nil chan)]:

同时出让自己占用的P,允许其它goroutine抢占使用。

如果读取的chan已关闭,则读取出来的值为零值(函数说明第四条)。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	// 在没有获取锁的情况下,检查非阻塞操作失败
	if !block && empty(c) {
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.

		// 如果当前chan未关闭
		if atomic.Load(&c.closed) == 0 {
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	...

}

这段代码主要是对重排读的情况,进行了双重检测,暂是未明白code中考虑的情况,改天再消化消化。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 加锁,下面才是真正要读取的逻辑
	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	...
}

读取之前先加锁。

对chan的读取与发送一样,同样有三种方式,为直接读取、缓冲区读取和阻塞读取。

直接读取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	// 直接读取 
	// 从c.sendq队列中取sudog, 将数据复制到sg
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
}

获取一个待发送者,如果buffer大小为0,则直接从发送者接收数据。否则从队列头部接收,并将发送者发送的数据放在队列尾部。

chan recv

从c.sendq队列里读取一个 *sudog,通过调用 recv() 函数,将数据从发送者复制到ep中,并返回true,true,表示读取成功。真正读取函数为 recvDirect()

缓冲区读取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	// 如果c.qcount>0,说明缓冲区有元素可直接读取
	if c.qcount > 0 {
		// Receive directly from queue
		// 直接从队列中读取
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
}

如果c.qcount > 0,则说明缓冲区里有内容可以读取。则

直接获取 c.recvx 数组索引位置的内存地址,则

  1. r.recvx 索引地址的值读取出来复制给 ep,
  2. 然后更新接收数组索引c.recvx++, 如果>数组索引最大索引值 ,重置为0
  3. 减少元素个数
  4. 释放锁 c.qcount–
  5. 最后unlock返回。
chan recv

阻塞读取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	......

	// c.sendq没有sender,buffer里也是空的,直接阻塞读取
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}
  1. 通过getg()获取一个goroutine
  2. 获取一个sudog结构体
  3. 绑定两者关系
  4. 加入 c.recvq 队列
  5. 设置goroutine为等待唤醒状态
  6. 清理状态
chan recv

关闭chan

关闭chan语句

close(ch)

对于已关闭的chan,是不允许再次关闭的,否则会产生panic。对应的函数为 runtime.closechan()

// src/runtime/chan.go

func closechan(c *hchan) {
	// 如果chan未初始化,触发panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	// 关闭已关闭的chan,触发panicc
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	......

}

对于一个未初始化的chan,或者已关闭的chan,如果再次关闭则会触发panic。

func closechan(c *hchan) {
	......
	// 设置chan关闭状态
	c.closed = 1

	// 声明一个结构体链表gList,主要用来调度使用
	var glist gList

	// release all readers
	// 释放所有readers
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}

		// 设置元素为nil
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	// 释放所有writers,会引起panic,见下面说明
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}

		// 设置元素为nil
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 释放锁
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	// 调度所有g
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		// 唤醒goroutine
		goready(gp, 3)
	}
}
  1. 声明一个gList 链表结构体
  2. 将接收队列 c.recvq 中的所有元素添加到gList 中,并将原来的值设置为
  3. 将发送队列 c.sendq 中的所有元素添加到 gList 中,并将原来的值设置为
  4. 将所有的阻塞goroutine通过函数goready() 进行调度

文章里提到在对c.sendq 处理的时候可能会触发panic。这是因为关闭chan后,执行了 goready() 对原来sendq里的sudogs 进行了进行了重新调度,这时候发现chan已经关闭了,所以会panic。那么又是如何调度的呢?

package main

import (
	"fmt"
	"time"
)

var ch chan int

func f() {

}

func main() {
	ch := make(chan int, 10)
	// buffer大小为10,这里发送11个,使最后一个进入到c.sendq里面
	for i := 0; i < 11; i++ { // i < 10 则正常
		go func(v int) {
			ch <- v
		}(i)

	}
	time.Sleep(time.Second)
	fmt.Println("发送完毕")

	// 关闭chan,将对sendq里的g进行唤醒,唤醒后发现chan关闭状态,直接panic
	close(ch)

	for v := range ch {
		fmt.Println(v)
	}
	time.Sleep(time.Second)
}

有一条广泛流传的关闭 channel 的原则:

don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.

不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。对于只有一个sender的话,直接在sender端关闭就可以。但对于多个sender的话,则需要通过一个信号量进行关闭,参考这里

总结

close 操作会触发goroutine的调度行为。

总结

  1. 对chan 发送或接收数据的时候要保证已初始化状态
  2. 对于已关闭的chan再次关闭会触发panic
  3. 对于发送和读取数据都有三种处理情况,分别是直接读写,缓存区读写和阻塞读写
  4. 发送和接收数据的本质上是对值的复制操作。All transfer of value on the go channels happens with the copy of value.
  5. close(ch)会触发goroutine 的调度行为
  6. 内部使用 sudogs对goroutine进行了一次封装。
  7. 如果buffer中的元素无法保证消费完的话,则会产生内存泄漏的危险,这时gc是无法对这些元素时间清理的,过多的 chan就会占用大量的资源
  8. 对于chan的分配的内存是在哪里,heap还是stack?

参考

Golang 的map实现原理

go Version 1.15.6

map作为一种常见的 key-value 数据结构,不同语言的实现原理基本差不多。首先在系统里分配一段连接的内存地址作为数组,然后通过对map键进行hash算法(最终将键转换成了一个整型数字)定位到不同的桶bucket(数组的索引位置),然后将值存储到对应的bucket里

map hash算法

理想的情况下是一个bucket存储一个值,即数组的形式,时间复杂度为O(1)。

如果存在键值碰撞的话,可以通过 链表法 或者 开放寻址法 来解决。

链表法

d2b5ca33bd970f64a6301fa75ae2eb22-1

开放寻址法

对于开放寻址法有多种算法,常见的有线性探测法,线性补偿探测法,随机探测法等。这里不再介绍,有兴趣的话可以查相关资料。

map基本数据结构

hmap结构体

map的核心数据结构是 /runtime/map.go

// A header for a Go map.
type hmap struct {
	// Note: the format of the hmap is also encoded in cmd/compile/internal/gc/reflect.go.
	// Make sure this stays in sync with the compiler's definition.
	count     int // # live cells == size of map.  Must be first (used by len() builtin)
	flags     uint8
	B         uint8  // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
	noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
	hash0     uint32 // hash seed

	buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
	oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
	nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)

	extra *mapextra // optional fields
}

在源码中表示map 的结构体是hmap,即hash map的缩写。

count 指的是当前map的大小,即当前存放的元素个数,必须放在第一个位置,因为通过len()函数时,会通过unsafe.Poiner从这里读取此值并返回
flags 可以理解为一个标记位,几种值的定义(这里),在多个地方需要使用此值,如mapaccess1()
B 是一个对数,表示当前map持有的 buckets 数量(2^B)。但需要考虑一个重要因素装载因子,所以真正可以使用的桶只有loadFactor * 2^B 个,如果超出此值将触发扩容,默认装载因子是6.5
noverflow 溢出buckets数量
hash0 hash种子,在操作键值的时候,需要引入此值加入随机性
buckets 底层数组指针,指向当前map的底层数组指针
oldbuckets 同是底层数组指针,一般在进行map迁移的时候,用来指向原来旧数组。只有迁移过程中此字段才有意义,此字段数组大小只有buckets 的一半
nevacuate 进度计算器,表示扩容进度,小于此地址的 buckets 迁移完成
extra 可选字段,溢出桶专用,只要有桶装满就会使用

mapextra结构体

// mapextra holds fields that are not present on all maps.
type mapextra struct {
	// If both key and elem do not contain pointers and are inline, then we mark bucket
	// type as containing no pointers. This avoids scanning such maps.
	// However, bmap.overflow is a pointer. In order to keep overflow buckets
	// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
	// overflow and oldoverflow are only used if key and elem do not contain pointers.
	// overflow contains overflow buckets for hmap.buckets.
	// oldoverflow contains overflow buckets for hmap.oldbuckets.
	// The indirection allows to store a pointer to the slice in hiter.
	overflow    *[]*bmap
	oldoverflow *[]*bmap

	// nextOverflow holds a pointer to a free overflow bucket.
	nextOverflow *bmap
}

当 map 的 key 和 value 都不是指针且内联时,将会把 bmap 标记为不含指针,这样可以避免 gc 时扫描整个 hmap。但是,我们看到 bmap 有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。

overflowoldoverflow 只有当键和值不包含指针才使用,其中overflow存储hmap.buckets的溢出桶,而oldoverflow存储hmap.oldbuckets的溢出桶。

间接允许将指向切片的指针存储在hiter中

nextOverlflow 指向下一个空的溢出桶的指针

bmap结构体

// A bucket for a Go map.
type bmap struct {
	// tophash generally contains the top byte of the hash value
	// for each key in this bucket. If tophash[0] < minTopHash,
	// tophash[0] is a bucket evacuation state instead.
	tophash [bucketCnt]uint8
	// Followed by bucketCnt keys and then bucketCnt elems.
	// NOTE: packing all the keys together and then all the elems together makes the
	// code a bit more complicated than alternating key/elem/key/elem/... but it allows
	// us to eliminate padding which would be needed for, e.g., map[int64]int8.
	// Followed by an overflow pointer.
}

bmap即bucket map的缩写。

这里只有一个tophash字段,而实际上在使用中值的类型是不固定的,甚至可以是一个自定义结构体的指针类型。这个结构体看起来可能有点让人费解,其实编译器在编译期间会动态创建一个新的同名数据结构,如下

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

每个桶里有0到7共八个存储位置,也就是说每个桶最多可以存储8个元素,其中将所有的键放在一起,按先后顺序存放在一个keys字段中,再将所有的值按同样的顺序放在一起存放在values字段中(注意:没有将每个键值单独存储在一个字段中,主要是为了解决内存对齐带来的浪费问题。)

例如 map[int64]int8 ,如果按key/value/key/value/… 这种方式存储的话,则在每一个key/value之后需要padding7个字节才行。而如果按上面key/key/key/… 和 value/value/value/…这种方式的话,只需要在最后一个值的后面进行一次padding就可以了,大大节省了内存开支。如果不太懂这一块的话,可以了解一下golang的内存对齐。

如果当前bucket已满的话,则会创建新的bucket,并使用 overflow 字段进行bucket之间的链接,实现单向链表功能。

d4a95ed79625399a0ed58638e776f923-1

map结构体关系图

00b06e623a81cd5ed7fb89935b78d44a-1
来源网络

key的定位算法

在map中要存储一个键值对,必须先找到所在的位置,一般采用hash算法即取模的结果来实现,这里我们主要介绍一下map中是如何实现的。

先介绍一下map装载相关的内容。上面我们介绍 bmap 结构体的时候,有一个 B 字段,这个字段决定了map对应的底层数组的大小,它的大小决定了可以容纳的bucket的个数。如果B=5的话,则可以bucket的数组元素值个数为 2^5=32个,但golang中需要考虑到装载分子6.5这个因素,所以真正装载的元素并没有这么多。每当一个map存储的元素个数占比达到65%的时候,就会触发map的扩容操作。

如果想知道一个key要放在哪个bucket个, 需要先计算出一个hash值,然后再除以 32取余数即可。golang 也是如此实现的,只不过是为了性能考虑,使用了位操作而已。如5 *2 换作用位操作的话,就是将5左移1位即可。每左移1位一次就是*2,左移两位就是*4,同理右移则是相除。

如 hash(key) 计算出的结果为

10010111 | 000011110110110010001111001010100010010110010101010 │ 00110

根据上面说的h.B = 5,则取后面的500110,值为6,则需要将key:value存放在第6号的bucket中。

找到了要存储的桶位置,还需要找到放在桶的什么位置,每个位置我们可以称之为slot或者Cell

// src/runtime/map.go

const (
	// Maximum number of key/elem pairs a bucket can hold.
	bucketCntBits = 3
	bucketCnt     = 1 << bucketCntBits
        ......
}

这里定义了一个bucket最多存放元素个数的相关常量,在go里一个bucket最多可以存放 2^3=8个 元素,再多的话就需要使用到溢出桶overflow。

golang map

slot位置计算公式为取hash的高8位,计算函数为tophash()。这里高8位是 10010111,十进制的值为151, 于是在第6号bucket的中遍历每个slot,直到找到bmap.tophash值为151slot,当前slot在bucket中的索引位置就是要找的键和值的索引位置。然后根据当前位置索引值分别从bmap.keysbmap.values中计算出对应的值。

如果当前bucket中没有找到,就去当前bucket 中的溢出桶overflow中查找,如果overflow为nil,则直接返回空值即可。

总结

要想实现元素的存储定位需要三个步骤:

  1. 根据h.B 来的值,来取出hash结果的最后 b.B 位数字(低位),定位到bucket
  2. 再根据hash结果的高8位,实现在bucket定位到指定的位置
  3. 最后根据位置分别从 bmap.key和bmap.values中读取存储的值内容

map创建

package main

func main() {
	m := make(map[int]string)
	m[0] = "a"
	_ = m[0]
	_, _ = m[0]
	delete(m, 0)
}

我们先看一下与map想着的runtime有哪些

$ go tool compile -S main.go | grep runtime 
        0x0080 00128 (main.go:4)        CALL    runtime.fastrand(SB)
        0x00aa 00170 (main.go:5)        CALL    runtime.mapassign_fast64(SB)
        0x00bc 00188 (main.go:5)        CMPL    runtime.writeBarrier(SB), $0
        0x00f1 00241 (main.go:6)        CALL    runtime.mapaccess1_fast64(SB)
        0x0114 00276 (main.go:7)        CALL    runtime.mapaccess2_fast64(SB)
        0x0137 00311 (main.go:8)        CALL    runtime.mapdelete_fast64(SB)
        0x0153 00339 (main.go:5)        CALL    runtime.gcWriteBarrier(SB)
        0x0160 00352 (main.go:3)        CALL    runtime.morestack_noctxt(SB)
        ......

可以看到每个对map操作基本都有相应的同名调用函数。不过对map的初始化操作函数不是fastrand()函数,而是runtime.makemap()函数(还有一个runtime.makemap_small函数)。fastrand()函数只是用来产生随机hash种子的。

注意:我们这里没有指定创建map的长度,runtime会根据是否指定这个字段做一些情况考虑。

// src/runtime/map.go

// makemap implements Go map creation for make(map[k]v, hint).
// If the compiler has determined that the map or the first bucket
// can be created on the stack, h and/or bucket may be non-nil.
// If h != nil, the map can be created directly in h.
// If h.buckets != nil, bucket pointed to can be used as the first bucket.
func makemap(t *maptype, hint int, h *hmap) *hmap {
	mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
	if overflow || mem > maxAlloc {
		hint = 0
	}

	// initialize Hmap
	if h == nil {
		h = new(hmap)
	}
	h.hash0 = fastrand()

	// Find the size parameter B which will hold the requested # of elements.
	// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
	B := uint8(0)
	for overLoadFactor(hint, B) {
		B++
	}
	h.B = B

	// allocate initial hash table
	// if B == 0, the buckets field is allocated lazily later (in mapassign)
	// If hint is large zeroing this memory could take a while.
	if h.B != 0 {
		var nextOverflow *bmap
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

	return h
}

步骤:

  1. 计算指定的大小所需要的内容是否超出出系统允许的最大分配大小
  2. 初始化hmap,并指定随机种子
  3. 通过overLoadFactor(hint, B)函数找到一个能装下指定map大小个元素个数的最小B,从小到大开始循环B。刚开始map越小越好
  4. 开始对hash table进行初始化。如果B==0则buckets 进行延时初始化操作(赋值的时候才进行初始化),如果B值特别大,则初始化需要一段时间,主要通过makeBucketArray()函数实现

注意 makemap()函数返回的是一个指针,而makeslice()返回的是一个新的结构体,在参数传递的时候,是值复制,两者有差异,有些是引用的是同一个数组,有些不是。

// src/runtime/map.go

// makeBucketArray initializes a backing array for map buckets.
// 1<<b is the minimum number of buckets to allocate.
// dirtyalloc should either be nil or a bucket array previously
// allocated by makeBucketArray with the same t and b parameters.
// If dirtyalloc is nil a new backing array will be alloced and
// otherwise dirtyalloc will be cleared and reused as backing array.
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	base := bucketShift(b)
	nbuckets := base
	// For small b, overflow buckets are unlikely.
	// Avoid the overhead of the calculation.
	if b >= 4 {
		// Add on the estimated number of overflow buckets
		// required to insert the median number of elements
		// used with this value of b.
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets
		up := roundupsize(sz)
		if up != sz {
			nbuckets = up / t.bucket.size
		}
	}

	if dirtyalloc == nil {
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		// dirtyalloc was previously generated by
		// the above newarray(t.bucket, int(nbuckets))
		// but may not be empty.
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		if t.bucket.ptrdata != 0 {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}

	if base != nbuckets {
		// We preallocated some overflow buckets.
		// To keep the overhead of tracking these overflow buckets to a minimum,
		// we use the convention that if a preallocated overflow bucket's overflow
		// pointer is nil, then there are more available by bumping the pointer.
		// We need a safe non-nil pointer for the last overflow bucket; just use buckets.
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

初始化bucket

  1. 对于b<4的话(桶的数量< 2^4),基本不需要溢出桶,这样就可以避免开销。对于b>4(桶的数量> 2^4)的话,则需要创建2^(b-4)个溢出桶。
  2. 数组分配通过函数 newarray()实现,第一个参数是元素类型,每二个参数是数组长度,返回的是数组内存首地址

map读取

对于map中的赋值与修改原理是基本一样的,先找到位置,不管原来位置有没有数据直接存储新数据就可以了。

对于map的赋值操作,是由函数 runtime.mapaccess1()runtime.mapaccess2() 来实现的。两者的唯一区别就是返回值不一样,runtime.mapaccess1() 返回的是一个值,runtime.mapaccess2() 返回的是两个值,第二个值决定了key是否在map中存在。这点可以通过上面我们的 compile 结果中看出他们的区别。这里我们只介绍runtime.mapaccess1()

// src/runtime/map.go

// mapaccess1 returns a pointer to h[key].  Never returns nil, instead
// it will return a reference to the zero object for the elem type if
// the key is not in the map.
// NOTE: The returned pointer may keep the whole map live, so don't
// hold onto it for very long.
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := funcPC(mapaccess1)
		racereadpc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled && h != nil {
		msanread(key, t.key.size)
	}
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return unsafe.Pointer(&zeroVal[0])
	}
	if h.flags&hashWriting != 0 {
		throw("concurrent map read and map write")
	}
	hash := t.hasher(key, uintptr(h.hash0))
	m := bucketMask(h.B)
	b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
	if c := h.oldbuckets; c != nil {
		if !h.sameSizeGrow() {
			// There used to be half as many buckets; mask down one more power of two.
			m >>= 1
		}
		oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
		if !evacuated(oldb) {
			b = oldb
		}
	}
	top := tophash(hash)
bucketloop:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
			if t.key.equal(key, k) {
				e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				if t.indirectelem() {
					e = *((*unsafe.Pointer)(e))
				}
				return e
			}
		}
	}
	return unsafe.Pointer(&zeroVal[0])
}

runtime.mapaccess1()函数返回一个指定h[key]的指针,如果key不存在,则返回当前map值类型的zero值,注意永远也不会返回一个nil值。

返回的指针在map中是永远有效的,尽量读取完尽快使用并释放,不要长期持久。

步骤

  1. 判断h是否为nil或者h.count值是否为0,如果h为nil则表示未初始化,则可能panic,如果h.count=0,则表示map为空,则直接返回一个zero值。
  2. 考虑是否处于并发读写状态,否则产生panic
  3. 根据键key计算hash值
  4. 计算低B位的掩码bucketMask(h.B),比如 B=5,那 m 就是31,低五位二进制是全1
  5. 计算当前bucket的地址
  6. 根据h.oldbuckets是否为nil来判断是否处于扩容中,如果不是nil则表示当前map正处于扩容状态中。将m减少一半,重新计算当前key在 oldbuckets中的位置,如果oldbuckets没有全部迁移到新bucket的话(bmap.tophash的意义所在),则在oldbuckets中查找。
  7. 计算tophash,即高八位
  8. 真正开始查找key,外层for是循环bucket及溢出桶overflow,内层for是循环桶内的8个slot
    • 如果 b.tophash[i] != top 则表示当前bucket的第i个位置的tophash 与当前key的tophash不同。这时分两种情况:
      一种是当前的标识是 emptyRest 表示剩下的所有slot全是空,则直接结束当前bucket查找,继续下一下bucket重复此步骤(b = b.overflow(t));
      另一种则是继续查找下一个slot,直到8个slot全部查找完毕,然后再查找下一个bucket
    • 如果b.tophash[i] == top,则根据键bucket的首地址+类型计算出字节长度*slot索引值 计算得出key的位置
    • 判断同时bucket中的key与请求的key是否相等,如果相等的话,按取键的方法取出值,并返回,否则继续
  9. 如果在最后仍未找到key,则直接返回zero 值

key的定位公式

k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

value 的定位公式

e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))

其中 dataOffset 表示key相对于bmap的偏移量,结构体如下

// src/runtime/map.go

// data offset should be the size of the bmap struct, but needs to be
// aligned correctly. For amd64p32 this means 64-bit alignment
// even though pointers are 32 bit.
dataOffset = unsafe.Offsetof(struct {
	b bmap
	v int64
}{}.v)

所以键的地址=当前bucket的起始位置(unsafe.Pointer(b)) + key的偏移量(dataOffset)+当前slot索引值(i) * 每个键的大小(uintptr(t.keysize))

而对于值来说,由于bmap.values在b.map.keys后面,所以要先将8个键的地址全部计算上才行,同样值类型也有自己的大小 t.elemsize

对于bucket的几种状态码如下

// src/runtime/map.go

// Possible tophash values. We reserve a few possibilities for special marks.
// Each bucket (including its overflow buckets, if any) will have either all or none of its
// entries in the evacuated* states (except during the evacuate() method, which only happens
// during map writes and thus no one else can observe the map during that time).
emptyRest      = 0 // this cell is empty, and there are no more non-empty cells at higher indexes or overflows.
emptyOne       = 1 // this cell is empty
evacuatedX     = 2 // key/elem is valid.  Entry has been evacuated to first half of larger table.
evacuatedY     = 3 // same as above, but evacuated to second half of larger table.
evacuatedEmpty = 4 // cell is empty, bucket is evacuated.
minTopHash     = 5 // minimum tophash for a normal filled cell.

emptyRest 表示当前cell为空,并且它后面的所有cell也为空,包括溢出桶overflow。
emptyOne 表示当前cell为空。
evacuatedX 扩容相关,当前bucket的slot值迁移到了新的X部分,由于每次扩容时,都是两倍扩容,所以原来bucket中的8个slot值,有的被迁移到了新bucket的X部分,有的是Y部分(原来的1个桶变成2个桶,分别称作X和Y)。
evacuatedY 扩容相关,第二部分迁移完毕。
evacuatedEmpty 当前cell为空,且迁移完成。

minTopHash tophash最小值,如果在调用 tophash(hash)时,计算出的值小于此值,则会加上此值(代码

map赋值

与赋值相关的函数是mapassign,根据key的不同底层调用相同类型的函数 ,常见的有

key 类型插入
uint32mapassign_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
uint64mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
stringmapassign_faststr(t *maptype, h *hmap, ky string) unsafe.Pointer

我们这里只主要介绍常用的mapassign函数。

函数原型

// src/runtime/map.go

// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {}

步骤

  1. 检查是否处于并发模式,否则产生panic
  2. 进行 hash 值计算(同上)
  3. 检查bucket是否进行过初始化,否则调用 newobject(t.buckt) 函数进行初始化,这点我们上面介绍过
  4. 检查map 是否处于扩容状态(h.growing()),如果是则调用函数growWork(t,h,bucket)进行扩容工作
  5. 计算当前bucket的地址
  6. 计算tophash,开始在bucket内通过 for 遍历出相应的位置,这点同上面的方法一样,双层for查找bucket,如果找到了再去overflow查找。找到则进行typedmemmove(t.key, k, key)更新并直接结束整个逻辑
  7. 如果没有map里没有找到key,则判断如果map元素个数+1的话会不会触发扩容
    扩容所必须的两个条件:
    1. 达到了最大装载因子
    2. 溢出桶是否过多
    只要这两个其中的一个条件满足,则先进行扩容然后再重头开始。
  8. 到目前为止才算正式开始追加新的kv。要写入前必须要知道要写入的位置,有两个重要的变量,一个是inserti,另一个是insertk。如果此时inserti 为 nil的话,说明在上面第6步骤的时候遍历了所有的bucket和overflow也没有找到要插入的位置(bucket或overflow满了),则创建新的overflow,然后在新的overflow里计算出key和vlaue要存放的内存位置(代码
  9. 根据key和value的地址,写入新的值,并将当前map的元素个数加1。(需要特别注意的在这里并没有写入value,因为函数参数并没有传入值,所以返回的是一个存放v的内存地址,有了这个地址赋值就解决了)

总结:

触发map扩容的两个条件,一个是装载因子,另一个是overflow过多,对于overflow多少算多呢见下方说明。

map删除

与删除有关的函数为 mapdelete,原型

// src/runtime/map.go

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {}

根据 key 类型的不同,删除操作会被优化成更具体的函数:

key 类型删除
uint32mapdelete_fast32(t *maptype, h *hmap, key uint32)
uint64mapdelete_fast64(t *maptype, h *hmap, key uint64)
stringmapdelete_faststr(t *maptype, h *hmap, ky string)
golang mapdelete

步骤

  1. 判断map是否为nil或者根本就没有任何元素
  2. 检查是否并发模式
  3. 计算hash,双层for循环遍历查找key,找到位置后进行以下工作
    将key位置写入nil或置零值
    将value位置写入nil或置零值
    将tophash位置置为 emptyOne 状态
    判断当前key是不是bucket中的最后一个元素,如果是最后一个元素,且当前bucket还有overflow,而overflow的首个tophash[0] != emptyRest 说明已经没有元素可查找了,于是直接结束。否则判断当前key的下一个 tophash是不是emptyRest,如果不是的话,直接结束逻辑。后面利用for循环重置tophash 的状态为最合适的状态,以方便后期循环使用

map扩容

随着map元素的添加,有可能出现bucket的个数不够,导致overflow桶越来越多,最后变成了一个单向链表了,查找效率越来越差,最差的情况下可能会变成O(n)。这与我们期待的O(1)相反,所以golang为了解决这个问题,就需要对map中的元素进行扩容重新整理,以达到最优的效果。那么什么时候才需要扩容呢?

// src/runtime/map.go

// Did not find mapping for key. Allocate new cell & add entry.

// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
	hashGrow(t, h)
	goto again // Growing the table invalidates everything, so try again
}

这里有两个条件,只要满足其中一项就会进行扩容

  1. 当装载因子越过6.5的时候,这个是内部固定的一个值(查看说明)(代码)
  2. 当overflow 过多的时候
    a. 当 B 小于 15,也就是 bucket 总数 2^B 小于 2^15 时,如果 overflow 的 bucket 数量超过 2^B;
    b. 当 B >= 15,也就是 bucket 总数 2^B 大于等于 2^15,如果 overflow 的 bucket 数量超过 2^15。
    主要分界点就是15(代码

    针对第一个条件,就是说元素太多,桶装不下了,立即进行扩容,添加一倍的桶,保证元素个数最多只能达到桶总容量的65%;
    针对第二种情况一般是先是大量的添加元素,后来又删除了,再添加导致出现了太多的空的overflow,这样查找key的话,就需要遍历太多的桶了,效率大大下降,需要重新对元素进行位置调整,类似于windows系统中的磁盘碎片整理一样,将元素向前面的bucket中迁移即可,当然这种情况在扩容时底层有些特殊,需要注意一下。
// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
// 装载因子>6.5
func overLoadFactor(count int, B uint8) bool {
	return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

// tooManyOverflowBuckets reports whether noverflow buckets is too many for a map with 1<<B buckets.
// Note that most of these overflow buckets must be in sparse use;
// if use was dense, then we'd have already triggered regular map growth.
// 溢出桶过多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
	// If the threshold is too low, we do extraneous work.
	// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
	// "too many" means (approximately) as many overflow buckets as regular buckets.
	// See incrnoverflow for more details.
	if B > 15 {
		B = 15
	}
	// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
	return noverflow >= uint16(1)<<(B&15)
}

虽然这里调用了 hashGrow() 函数,但它并不有真正的进行扩容,我们先看下它的源码

// src/runtime/map.go

func hashGrow(t *maptype, h *hmap) {
	// If we've hit the load factor, get bigger.
	// Otherwise, there are too many overflow buckets,
	// so keep the same number of buckets and "grow" laterally.
	// 申请2倍空间
	bigger := uint8(1)

	// 判断装载因子>6.5这个条件,否则就是由于overflow过多,此时申请的空间和原来的一样
	if !overLoadFactor(h.count+1, h.B) {
		bigger = 0
		h.flags |= sameSizeGrow
	}

	// 调整hmap中的桶的指针指向,将原来桶变为旧桶,新桶为新申请的数组
	oldbuckets := h.buckets
	newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

	// 重点了解下 &^ 位运算符,其作用是按位置0
	flags := h.flags &^ (iterator | oldIterator)
	if h.flags&iterator != 0 {
		flags |= oldIterator
	}
	// commit the grow (atomic wrt gc)
	// 提交grow操作
	h.B += bigger
	h.flags = flags
	h.oldbuckets = oldbuckets
	h.buckets = newbuckets
	h.nevacuate = 0 // 迁移进度
	h.noverflow = 0 // 溢出桶数量为0

	// 将扩展举出桶指令同样和新旧桶一样进行指针指向变更
	if h.extra != nil && h.extra.overflow != nil {
		// Promote current overflow buckets to the old generation.
		if h.extra.oldoverflow != nil {
			throw("oldoverflow is not nil")
		}
		h.extra.oldoverflow = h.extra.overflow
		h.extra.overflow = nil
	}
	if nextOverflow != nil {
		if h.extra == nil {
			h.extra = new(mapextra)
		}
		h.extra.nextOverflow = nextOverflow
	}

	// the actual copying of the hash table data is done incrementally
	// by growWork() and evacuate().
}

其中 hashGrow() 只是为扩容做的一些准备工作,而真正进行扩容的是 growWork()evacuate()函数。

函数 growWork() 只有在赋值删除的过程中会出现,所以也只有这两个动作才会触发扩容动作。

我们再看看growWrok()源码

// src/runtime/map.go

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// make sure we evacuate the oldbucket corresponding
	// to the bucket we're about to use
	// 确认迁移的老的bucket是我们正在使用的bucket
	evacuate(t, h, bucket&h.oldbucketmask())

	// evacuate one more oldbucket to make progress on growing
	// 如果还是在迁移中,注再迁移一个bucket
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

从代码里可以看到,先是迁移我们正在使用的bucket,如果还有 bucket 没有迁移完的话,就再迁移一个bucket,也就是说一次最多可迁移两个bucket。

这里 h.oldbucketmask()是计算oldbucket的位置的,实现原理我们上面介绍过的,计算hash值的最低B位来决定用哪个bucket。

// src/runtime/map.go

// oldbucketmask provides a mask that can be applied to calculate n % noldbuckets().
func (h *hmap) oldbucketmask() uintptr {
	return h.noldbuckets() - 1
}

下面我们重点关注 evacuate()函数,最复杂的工作就是由它来完成的。

// src/runtime/map.go

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
	// 计算 oldbucket 的地址
	b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
	newbit := h.noldbuckets()

	// 如果未迁移
	if !evacuated(b) {
		// TODO: reuse overflow buckets instead of using new ones, if there
		// is no iterator using the old buckets.  (If !oldIterator.)

		// xy contains the x and y (low and high) evacuation destinations.
		// 由于是两倍大小扩容,这里x和y分别代表新的两个bucket
		// evacDst 是迁移目的结构体,主要有四项数据,分别是要接收新数据的桶、值所在桶内的索引位置、map键和map值的指针地址
		var xy [2]evacDst

		// 获取目标 x bucket 的地址
		x := &xy[0]
		// 设置当前要迁移到的新桶相关信息
		x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
		x.k = add(unsafe.Pointer(x.b), dataOffset)
		x.e = add(x.k, bucketCnt*uintptr(t.keysize))

		// 如果不是等量扩容的话
		if !h.sameSizeGrow() {
			// Only calculate y pointers if we're growing bigger.
			// Otherwise GC can see bad pointers.
			// 获取目标 y bucket 的地址,和上面x一样
			y := &xy[1]
			y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
			y.k = add(unsafe.Pointer(y.b), dataOffset)
			y.e = add(y.k, bucketCnt*uintptr(t.keysize))
		}

		// 使用双层for循环进行数据迁移
		for ; b != nil; b = b.overflow(t) {
			k := add(unsafe.Pointer(b), dataOffset)
			e := add(k, bucketCnt*uintptr(t.keysize))
			for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
				top := b.tophash[i]
				// 如果当前tophash索引位置的元素标记为empty,即为emptyRest或emptyOne,说明根本就不有内容要迁移,可以直接标记为迁移完成状态 evacuatedEmpty
				if isEmpty(top) {
					// 设置旧桶位置为迁移完成标记 evacuatedEmpty
					b.tophash[i] = evacuatedEmpty
					continue
				}
				// 位置标记非法,好像永远也不会出现这个情况
				if top < minTopHash {
					throw("bad map state")
				}
				k2 := k
				if t.indirectkey() {
					k2 = *((*unsafe.Pointer)(k2))
				}
				var useY uint8
				if !h.sameSizeGrow() {
					// Compute hash to make our evacuation decision (whether we need
					// to send this key/elem to bucket x or bucket y).
					hash := t.hasher(k2, uintptr(h.hash0))

					// iterator!=0 说明有协程正在遍历当前map
					// !t.key.equal(k2,k2) 表示同一个key计算出为的hash值不一样
					if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
						// If key != key (NaNs), then the hash could be (and probably
						// will be) entirely different from the old hash. Moreover,
						// it isn't reproducible. Reproducibility is required in the
						// presence of iterators, as our evacuation decision must
						// match whatever decision the iterator made.
						// Fortunately, we have the freedom to send these keys either
						// way. Also, tophash is meaningless for these kinds of keys.
						// We let the low bit of tophash drive the evacuation decision.
						// We recompute a new random tophash for the next level so
						// these keys will get evenly distributed across all buckets
						// after multiple grows.
						useY = top & 1
						top = tophash(hash)
					} else {
						if hash&newbit != 0 {
							useY = 1
						}
					}
				}

				if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
					throw("bad evacuatedN")
				}

				b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
				dst := &xy[useY]                 // evacuation destination
				// 如果目标桶内元素索引值=8,即超出桶内最大元素个数8,则新建溢出桶,并将桶内cell索引值置为0
				if dst.i == bucketCnt {
					dst.b = h.newoverflow(t, dst.b)
					dst.i = 0
					dst.k = add(unsafe.Pointer(dst.b), dataOffset)
					dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
				}
				// 设置tophash值
				dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check

				// 进行数据迁移操作
				if t.indirectkey() {
					*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
				} else {
					typedmemmove(t.key, dst.k, k) // copy elem
				}
				if t.indirectelem() {
					*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
				} else {
					typedmemmove(t.elem, dst.e, e)
				}

				// 索引位置下移一位,即下次存储数据的cell位置
				dst.i++
				// These updates might push these pointers past the end of the
				// key or elem arrays.  That's ok, as we have the overflow pointer
				// at the end of the bucket to protect against pointing past the
				// end of the bucket.
				dst.k = add(dst.k, uintptr(t.keysize))
				dst.e = add(dst.e, uintptr(t.elemsize))
			}
		}
		// Unlink the overflow buckets & clear key/elem to help GC.
		// 如果oldbucket没有协程在使用的话,则帮助gc把bucket清除掉
		if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
			b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
			// Preserve b.tophash because the evacuation
			// state is maintained there.
			// 清除bucket 的 key,value 部分,保留 tophash 部分,指示搬迁状态
			ptr := add(b, dataOffset)
			n := uintptr(t.bucketsize) - dataOffset
			memclrHasPointers(ptr, n)
		}
	}

	// 如果当前迁移的bucket 就是当前的 oldbucket
	if oldbucket == h.nevacuate {
		advanceEvacuationMark(h, t, newbit)
	}
}

函数advanceEvacuationMark()源码

// src/runtime/map.go

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
	// 更新当前map的迁移进度
	h.nevacuate++
	// Experiments suggest that 1024 is overkill by at least an order of magnitude.
	// Put it in there as a safeguard anyway, to ensure O(1) behavior.
	// 一次标记1024个
	stop := h.nevacuate + 1024

	// 如果标记个数超出了实际oldbucket的人数,则使用oldbucket的数量值
	if stop > newbit {
		stop = newbit
	}
	// 循环标记bucket
	for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
		h.nevacuate++
	}
	// 当前迁移桶等于oldbucket的总数量,即迁移完毕
	if h.nevacuate == newbit { // newbit == # of oldbuckets
		// Growing is all done. Free old main bucket array.
		// 重置oldbuckets为nil,即表示oldbuckets 迁移完毕
		h.oldbuckets = nil
		
		// Can discard old overflow buckets as well.
		// If they are still referenced by an iterator,
		// then the iterator holds a pointers to the slice.
		// 如果h.extra != nil 说明保存的kv中没有指令,使用了slice存储,所以这里也需要同时清除oldoverflow
		if h.extra != nil {
			h.extra.oldoverflow = nil
		}
		h.flags &^= sameSizeGrow
	}
}

对于map 的扩容,主要分两种情况,一个是等量扩容,另一种是非等量扩容,什么意思呢?

所谓等量扩容是指原来是一个bucket,虽然扩容的时候创建了两个新的bucket,但使用的时候只使用了其中一个bucket,另一个没有使用,所以这种情况下的扩容是直接将原来oldbucket中的元素直接迁移到新 bucket中的x桶就可以了。

而非等量扩容就是将原来一个bucket 中的内容迁移到新的两个bucket中。

我们知道每个bucket 最多只有8个元素,迁移需要一个一个的迁移,每迁移完一个元素,都需要将当前cell作个标记,表示位置的元素已经迁移完成,对于等量扩容和非等量扩容两者是有些细微的差别的。我们先看一下等量迁移。

对于等量扩容来说,由于bucket是1对1的迁移,况且都迁移到了x桶,所以oldbucket中每个cell在迁移后被标记为evacuatedX,表示当前oldbucket中的元素全部被迁移到了x桶了。

csdn

经过等量扩容后,原来的overflow消失了,所有元素都放在了同一个bucket,节省资源的同时查询效率也提高了。是不是有点像widows 的磁盘碎片整理。

等量扩容后

而对于非等量扩容,bucket的关系就是1对2了,oldbucket 中的元素有可能经过rehash后,被迁移到了x桶,也有可能被迁移到了y 桶。

map非等量扩容

如果是被迁移到了y桶的话,则标记为evacuatedY,否则就是evacuatedX

如果oldbucket中的元素为空(emptyRestemptyOne),即未存储任何元素或者原来存储过后来删除了,则需要标记为evacuatedEmpty

经过迁移后,我们发现元素在oldbucket中的存储顺序,到新的bucket中发生了变化。如odlbucket里有8个元素(1-8),遍历打印的时候,会打印12345678。后来分成了两个bucket, 其中3/4/8 放在了x桶,1/2/5/6/7放在了y桶,那么这时遍历打印的话,则是遍历打印x桶,然后是y桶,最后打印34812567,与原来的顺序不一样了,这正是我们平时所说的对map的遍历并不保证一定的顺序的。

当然在源代码里起始位置也是随机定位的,先是随机选择一个bucket,然后再随机选择一个cell位置,开始往后遍历查找,当达到最后bucket的最后一个元素的时候再才第一个bucket查找,走到遍历到当前元素等于起始元素为止,或者官方是为了引起我们的注意故意这样设计的吧。所在就算你是硬编码写死的map不进行删除或添加,打印的也是随机的。

由于map内部是分bucket迁移的,每次最多迁移两个bucket,所以在遍历数据的时候有也些复杂,即可考虑新bucket的数据,也要考虑oldbucket中的数据。有些oldbucket中的数据被分两到两个新bucket中了,导致要考虑的情况要多一些。等有时间了再单独写写这一块的逻辑。

最后,当所有数据都迁移完成后,还需要将oldbucket清除,直接通过设置 h.oldbuckets为nil即可,记得还有一个mapextra.oldoverflow

在扩容过程中,有一段说明需要注意到

// If key != key (NaNs), then the hash could be (and probably
// will be) entirely different from the old hash. Moreover,
// it isn’t reproducible. Reproducibility is required in the
// presence of iterators, as our evacuation decision must
// match whatever decision the iterator made.
// Fortunately, we have the freedom to send these keys either
// way. Also, tophash is meaningless for these kinds of keys.
// We let the low bit of tophash drive the evacuation decision.
// We recompute a new random tophash for the next level so
// these keys will get evenly distributed across all buckets
// after multiple grows.

在进行hash计算的时候,有一种key,它每次经过hash计算的时候,得出的结果都不一样,这个key就是math.NaN(),表示Not a Number,类型是float64。一旦使用此key进行存储后,几乎无法正确读取出来值,只有在遍历的时候才会读取到此值。为了解决这个问题,官方采用了使用tophash的最低位来决定是放在x还是y,如果是0则放在x,否则放在y。

总结

1. map不是并发安全的,所以在源码里有大量的代码判断


if h.flags&hashWriting == 0 {
	throw("concurrent map writes")
}

在赋值时会设置map处于协程写的状态


// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.

// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.
h.flags ^= hashWriting

2. 遍历map是无序的,因为随着元素的添加和删除会进行扩容,元素位置会发生变化

参考

https://blog.csdn.net/u010853261/article/details/99699350
map 的底层实现原理是什么
https://studygolang.com/articles/22236
https://draveness.me/golang/docs/part2-foundation/ch03-datastructure/golang-hashmap/

Golang并发模式之扇入FAN-IN和扇出FAN-OUT


在现实世界中,经常有一些工作是属于流水线类型的,它们每一个步骤都是紧密关联的,第一步先做什么,再做做么,最后做什么。特别是制造业这个行业,基本全是流水线生产车间。在我们开发中也经常遇到这类的业务场景。

假如我们有个流水线共分三个步骤,分别是 job1、job2和job3。代码:https://play.golang.org/p/e7ZlP9ofXB3

package main

import (
	"fmt"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)
	secondResult := job2(firstResult)
	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果为

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 21s

共计计算21秒。主要是因为job2中的耗时太久导致,现在我们的主要任务就是解决掉这个问题了。
这里只用了一个job2来处理job1的结果,如果我们能多开启几个goroutine job2并行处理会不会提升性能呢?

现在我们改进下代码,解决job2耗时的问题,需要注意一下,这里对ch的关闭也要作一下调整,由于启用了多个job2的goroutine,所以在job2内部进行关闭了。代码https://play.golang.org/p/qebQ00v973C

package main

import (
	"fmt"
	"sync"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func merge(inCh ...<-chan int) <-chan int {
	outCh := make(chan int, 2)

	var wg sync.WaitGroup
	for _, ch := range inCh {
		wg.Add(1)
		go func(wg *sync.WaitGroup, in <-chan int) {
			defer wg.Done()
			for val := range in {
				outCh <- val
			}
		}(&amp;wg, ch)
	}

	// 重要注意,wg.Wait() 一定要在goroutine里运行,否则会引起deadlock
	go func() {
		wg.Wait()
		close(outCh)
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)

	// 拆分成三个job2,即3个goroutine (扇出)
	secondResult1 := job2(firstResult)
	secondResult2 := job2(firstResult)
	secondResult3 := job2(firstResult)

	// 合并结果(扇入)
	secondResult := merge(secondResult1, secondResult2, secondResult3)

	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 12s

可以看到,性能提升了90%,由原来的22s减少到12s。上面代码中为了演示效果,使用的缓冲chan很小,如果调大的话,性能更明显。

FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式(1对多),所以又被称为扇出,可以用来分发任务。

FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式(多对1),所以又被称为扇入,用来收集处理的结果。

是不是很像扇子的状态, 先展开(扇出)再全并(扇入)。

总结:在类似流水线这类的逻辑中,我们可以使用FAN-IN和FAN-OUT模式来提升程序性能。

参考

重新认识Golang中的空结构体

  1. 认识空结构体
  2. 低层实现原理
  3. 空结构体之内存对齐
  4. 应用场景

在golang中,如果我们想实现一个set集合的话,一般会使用map来实现,其中将set的值作为map的键,对于map的值一般使用一个空结构体来实现,当然对map值也可以使用一个bool类型或者数字类型等,只要符合一个键值对应关系即可。但我们一般推荐使用struct{}来实现,为什么呢?

package main

import "fmt"

func main() {
	m := make(map[int]struct{})
	m[1] = struct{}{}
	m[2] = struct{}{}
	
	if _, ok := m[1]; ok {
		fmt.Println("exists")
	}
	
}

上面这段代码是一个很简单的使用map实现的set功能,这里是采用空结构体struct{}来实现。

在分析为什么使用struct{}以前,我看先认识一个struct。

认识空结构体 struct{}

我们先看一个这段代码

package main

import (
	"fmt"
	"unsafe"
)

type emptyStruct struct{}

func main() {
	a := struct{}{}
	b := struct{}{}

	c := emptyStruct{}

	fmt.Println(a)
	fmt.Printf("%pn", &amp;a)
	fmt.Printf("%pn", &amp;b)
	fmt.Printf("%pn", &amp;c)

	fmt.Println(a == b)

	fmt.Println(unsafe.Sizeof(a))
}


{} // 值
0x586a00 // a 内存地址
0x586a00 // b 内存地址, 同a一样
0x586a00 // c 内存地址,别名类型变量,同a一样
true // 丙个结构体是否相等,很显示,上面打印的是同一个内存地址
0 // 占用内存大小 

从打印结果里我们可以得出以下结论
1. 空结构体是一个无内容值的值,即空值
2. 空结构体占用0大小的内存,即不分配内存
3. 凡是空结构体他们都是一样的,即底层指向的是同一个内存地址,如何实现的呢?

对于高性能并发应用来说,内存占用大小一般都是我们关注重点对象,使用空struct{}根本不占用内存大小,相比使用其它类型的值,如bool(占用两个字节)int64(8个字节)性能要好的多,毕竟不用分配和回收内存了。

当然有人可能会说开发的应用map值不多的话,这点内存可以忽略不计。是的,确实是这样的,但这会带来一个另一个语义理解问题。如:

package main

import "fmt"

func main() {
	m := make(map[int]bool)
	m[1] = true
	m[2] = true

	if _, ok := m[1]; ok {
		fmt.Println("exists")
	}

}

我们用bool代替了空结构体,至于值是 true 还是 false 是没有任何影响的,都是bool数据类型占用的内存大小也一样。那么如果另一位开发的同事查看review源码的时候,如果这个map出现在一个大型应用的时候,会大多处出现,就很有可能带来疑惑,对于值所表达的意图就有所担心怀疑,提高了理解代码的门槛。心里如果值为true 的话,会执行一个逻辑,为false的话会执行另一个逻辑。而相比使用一个空结构体strcut{}来理解起来容易提高心智,别人一看空结构体struct{}就知道要表达的意思是不需要关心值是什么,只需要关心键值即可。

要记住一点,我们写的代码虽然是让机器运行的,但却是让人看的,能让人一眼看明白就不要看两眼,这点不正是符合了golang 这门开发语言的特性吗,只需要记住25个关键字,简单易理解,性能还高效。

底层原理

那么在底层一个空结构体又是怎么一回事呢? 为什么多个空结构体会指向同一个内存地址呢? 这和一个很重要的 zerobase 变量有关(在runtime里多次使用到了这个变量),而zerobase 变量是一个 uintptr 的全局变量,占用8个字节 (https://github.com/golang/go/blob/master/src/runtime/malloc.go#L840-L841),只要你将struct{} 赋值给一个或者多个变量,它都返回这个 zerobase 的地址,这点我们上面已经证实过这一点了。

在golang中大量的地方使用到了这个 zerobase 变量,只要分配的内存为0,就返回这个变量地址。

内存分配函数 https://github.com/golang/go/blob/master/src/runtime/malloc.go#L902-L1171

// Allocate an object of size bytes.
// Small objects are allocated from the per-P cache's free lists.
// Large objects (> 32 kB) are allocated straight from the heap.
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
	if gcphase == _GCmarktermination {
		throw("mallocgc called with gcphase == _GCmarktermination")
	}

	if size == 0 {
		return unsafe.Pointer(&amp;zerobase)
	}
       
        .......
}

结论:只要分配的内存大小为 0 字节,就返回 &zerobase 的指针,不仅仅是空结构体。

内存对齐

如果您对内存对齐不太了解的话,可以参考这篇Memory Layouts,注意下32位和64位系统之间的差异。有一点需要保证在 32 位系统上想要原子操作 64 位字(如 uint64)的话,需要由调用方保证其数据地址是 64 位对齐的,否则原子访问会有异常。

还拿uint64来说,大小为 8bytes,32 位系统上按 4bytes 对齐,64 位系统上按 8bytes 对齐。

在 64 位系统上,8bytes 刚好和其字长相同,所以可以一次完成原子的访问,不被其他操作影响或打断。

而 32 位系统,4byte 对齐,字长也为 4bytes,可能出现uint64的数据分布在两个数据块中,需要两次操作才能完成访问。

如果两次操作中间有可能别其他操作修改,不能保证原子性。

这样的访问方式也是不安全的。

这一点issue-6404[5]中也有提到:

This is because the int64 is not aligned following the bool. It is 32-bit aligned but not 64-bit aligned, because we’re on a 32-bit system so it’s really just two 32-bit values side by side.

For the numeric types, the following sizes are guaranteed:

type                                 size in bytes

byte, uint8, int8                     1
uint16, int16                         2
uint32, int32, float32                4
uint64, int64, float64, complex64     8
complex128                           16

空结构体作为一个占用0字节的数据类型与其它基本类型对比的话,确实有些特殊。

代码 https://play.golang.org/p/HDOXko5iLNO

package main

import (
	"fmt"
	"unsafe"
)

// 放在第1个字段,共需要24字节
type T0 struct {
	s  struct{} // 0
	f2 int32    // 4
	f3 int32    // 4
	f1 bool     // 1
	f4 int64    // 8
}

// 放在第1个字段,共需要16字节
// f2为 int16 类型
type T1 struct {
	s  struct{} // 0
	f1 bool     // 1
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
}

// 中间字段,共需要16字节
type T2 struct {
	f1 bool     // 1
	s  struct{} // 0
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
}

// 最后一个字段, 共需要24字节
type T3 struct {
	f1 bool     // 1
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
	s  struct{} // 0
}

// 最后一个字段, 共需要24字节
type T4 struct {
	f1 bool     // 1
	f2 int16    // 2
	f4 int64    // 8
	f3 int32    // 4
	s  struct{} // 0
}

// 最后一个字段, 共需要24字节
// 这里f1数据类型由bool变成了int16
type T5 struct {
	f4 int64    // 8
	f3 int32    // 4
	f1 int16    // 2
	f2 int16    // 2
	s  struct{} // 0
}

func main() {
	bit := 32 << (^uint(0) >> 63)
	fmt.Printf("当前系统为 %d 位n", bit)

	var t0 T0
	var t1 T1
	var t2 T2
	var t3 T3
	var t4 T4
	var t5 T5

	fmt.Println("t0:", unsafe.Sizeof(t0)) // (0+4)+(4)+ (8) 共占用24字节对齐
	fmt.Println("t1:", unsafe.Sizeof(t1)) // (0+1+2+4)+ (8) 共占用8字节对齐

	fmt.Println("t2:", unsafe.Sizeof(t2)) // (1+0+2+4)+ (8) 共占用8字节对齐

	fmt.Println("t3:", unsafe.Sizeof(t3)) // (1+2+4)+(8)+(0)共占用24字节
	fmt.Println("t4:", unsafe.Sizeof(t4)) // (1+2)+(8)+(4+0)共占用24字节
	fmt.Println("t5:", unsafe.Sizeof(t5)) // (8)+(4+2+2)+ (0) 共占用24字节
}


结果

当前系统为 64 位
t0: 24
t1: 16
t2: 16
t3: 24
t4: 24
t5: 24

这里运行环境为64位系统,所以按8字节对齐。

T0T1是将struct{}放在首字段的情况;
T2是放在中间位置的情况;
T3T4T5是作为最后一个字段的情况;

当作为最后一个字段的时候,如果它前面的字段放在一起的话等于8的话,则空结构体单独占用8字节(T5),否则可以与前面的共占8字节(T4)。

总结:上面结果我们可以看出,虽然空结构体占用0字节,但在进行内存对齐的时候是需要考虑这个字段,可以理解为将其视为需占用1个字节,不然这个结构体就没有任何意义了 。

对于Golang为什么要内存对齐, 有没有必要使用,可以看看这篇文章 Golang 是否有必要内存对齐?

空结构体struct{}使用场景

一般我们用在用户不关注值内容的情况下,只是作为一个信号或一个占位符来使用。

  1. 基于map实现集合功能
    就是我们上面提到的情况
  2. 与channel组合使用,实现一个信号
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	conLimit := make(chan struct{}, 2)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		conLimit <- struct{}{}
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			// doing...

			fmt.Println(i)
			time.Sleep(time.Second)
			<-conLimit
		}(i)
	}

	wg.Wait()
	close(conLimit)

	fmt.Println("ok")

}

基于有缓冲的channel是实现并发限速。另外一种限速用法

下面的例子来自《Go 语言高级编程》, 为里使用空结构体进行了调整

var limit = make(chan struct{}, 3)

func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- struct{}{}
            w()
            <-limit
        }()
    }
    // …………
}

那么这两种写法是否有差异呢?

这里,limit <- 1 放在 func 内部而不是外部,原因是:

如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。

limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。

还有一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。

参考

https://golang.org/ref/spec#Size_and_alignment_guarantees

Memory Layouts

Dig101:Go之聊聊struct的内存对齐

Golang 是否有必要内存对齐?

认识Golang内存对齐

https://github.com/qcrao/Go-Questions/blob/master/channel/channel%20%E6%9C%89%E5%93%AA%E4%BA%9B%E5%BA%94%E7%94%A8.md

Golang中的内存重排(Memory Reordering)

什么是内存重排

内存重排指的是内存的读/写指令重排。— Xargin

为什么要内存重排

为了提升程序执行效率,减少一些IO操作,一些硬件或者编译器会对程序进行一些指令优化,优化后的结果可能会导致程序编码时的顺序与代码编译后的先后顺序不一致。

就拿做饭场景来说吧,是先蒸米还是先炒菜,这两者是没有冲突的,编译器在编译时有可能与你要求的顺序不一样。

编译器重排

如下面这段代码

X = 0
for i in range(100):
    X = 1
    print X

要实现打印100次1,很显示在for里面每次都执行X=1语句有些浪费资源,如果将初始变量值修改为1,是不是要快的多。编译器也分析到了这一点,于是在编译时对代码做了以下优化

X = 1
for i in range(100):
    print X

最终输出结果是一样的,两段代码功能也一样。

但是如果此时有另一个线程里执行了一个 X=0 的赋值语句的话(两个线程同时运行),那么输出结果就可能与我们想要的不一样了。

优化前情况:第一个线程执行到了第3次print X 后,第二个线程执行了X=0,把X 的值进行了修改,结果就有可能是1110 1111(在线程执行第5次时,重新执行了X=1,而且之后一直都是 1)。这就有问题了。

优化后情况:按上面的逻辑来执行的话,结果就是11100000…, 后面全是0,再也没有机会将X贬值为1了

由此我们可以得出一个结果,在多线程下,是无法保证编译前后的代码功能是“待价”的。

所以要开发时,这一点我们一定要小心。搞不好在单线程运行没有问题的程序在多线程下会出现各种各样的问题。

每当我们提到内存重排的时候,其实真实在讨论的主题是 内存屏障Memory barrier。指令重排无法逾越内存屏障。

CPU 重排

参考:https://www.w3xue.com/exp/article/20196/40758.html

CPU 架构

Figure 1. CPU Architecture
Figure 2. Store Buffer
Figure 3. MESI Protocol

MESI 可参考 https://mp.weixin.qq.com/s/vnm9yztpfYA4w-IM6XqyIA

参考

从 Memory Reordering 说起
CPU指令重排/内存乱序
谈谈Go语言中的内存重排

相关话题

锁、内存屏障与缓存一致性 https://gocode.cc/project/9/article/128

memory barrier

[译] 什么是缓存 false sharing 以及如何解决(Golang 示例) https://juejin.cn/post/6844903866270482445

CPU缓存体系对Go程序的影响 https://mp.weixin.qq.com/s/vnm9yztpfYA4w-IM6XqyIA

Golang中的并发原语 Singleflight

在Golang中有一个并发原语是Singleflight,好像知道的开发者并不多。其中著名的 https://github.com/golang/groupcache 就用到了这个并发原语。

Golang版本

go1.15.5

相关知识点

map、Mutex、channel、

使用场景

一般用在对指定资源频繁操作的情况下,如高并发下的“缓存击穿”问题。

缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去DB读取数据,这些请求都会击穿缓存到DB,造成瞬时DB请求量大、压力瞬间骤增,导致数据库负载过高,影响整个系统正常运行。(缓存击穿不同于 缓存雪崩 和 缓存穿透)

缓存击穿

怎么理解这个原语呢,简单的讲就是将对同一个资源的多个请求合并为一个请求。

举例说明,假如当有10万个请求来获取同一个key的值的时候,正常情况下会执行10万次get操作。而使用singleflight并发语后,只需要首次的地个请求执行一次get操作就可以了,其它请求再过来时,只需要只需要等待即可。待执行结果返回后,再把结果分别返回给等待中的请求,每个请求再返回给客户端,由此看看,在一定的高并发场景下可以大大减少系统的负载,节省大量的资源。

注意这个与 sync.Once是不一样的,sync.Once 是全局只能有一个,但本并发原语则是根据key来划分的,并且可以根据需求来决定什么情况下共用一个。

实现原理

主要使用 Mutext 和 Map 来实现,以 key 为键,值为 *call。每个*call中存储有一个请求 chans 字段,用来存储所有请求此key的客户端,等有返回结果的时候,再从chans字段中读取出来,分别写入即可。

源文件为 /src/internal/singleflight/singleflight.go

Singleflight 数据结构如下

  1. Do()
    这个方法是一个执行函数并返回执行结果
    参数
    key 要请求的key,多个请求可能请求的是同一个key,同时也只有一个函数在执行,
    fn key对应的执行函数,此函数有三个返回值v, err, shared。其中shared表示当前返回结果值是只有一个请求还是由于多个请求发生返回的
  2. DoChan()
    类型与Do()方法,但返回的是个 ch 类型,等函数 fn 执行后,可以通过读取返回的ch来获取函数结果
  3. Forget()
    在官方库internal/singleflight/singleflight.go中这个名字是ForgetUnshared, 告诉 Group 忘记请求的这个key,下次再请求时,直接当作新的key来处理就可以了。其实就是将这个key从map中删除,后续再有这个key的操作的话,即视为新一轮的处理逻辑。

其中Do() 和 DoChan() 的功能一样,只是获取数据的方式有所区别,开发者可以根据自己的情况来选择使用哪一个。另外还包含一个由Do()方法调用的私有方法 doCall(),直接执行key处理方法的函数

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {...}

实现数据结构

实现内部主要的数据结构一共三个,分别是 call、Group 和 Result。

// 定义 map[key] 的值 call 类型
// 在call 内部存储对应key相关的所有请求客户端信息
type call struct {
	wg sync.WaitGroup

	// These fields are written once before the WaitGroup is done
	// and are only read after the WaitGroup is done.
        // 请求返回结果,会在sync.WaitGroup为Done的时候执行
	val interface{}
	err error

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
        // 可以理解为请求key的个数,每增加一个请求,则值加1 
	dups  int

        // 存储所有key对应的 Result{}, 一个请求对应一个 Result
	chans []chan<- Result
}

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
// SingleFlight 的主结构体
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Result holds the results of Do, so they can be passed
// on a channel.
// 定义请求结果数据结构
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

call 数据结构是用来记录key相关数据,则请求当前key的个数和请求结果
Group 是并发原语 SingleFlight 的主数据结构, m 字段用来存储key与请求的关系,而mu则是Mutex锁
Result 是请求结果的数据结构,其中Val和Err 是实现请求后返回的结果,而 Shared 字段则是用来表示当前结果是否进行了合并处理,如果当前key只有一个请求在使用的话,则返回false, 否则返回 true

用法

package main

import (
	"fmt"
	"golang.org/x/sync/singleflight"
	"log"
	"sync"
	"time"
)

func getDataFromDB(key string) (string, error) {
	defer func() {
		log.Println("db query end")
	}()

	log.Println("db query begin")
	time.Sleep(2 * time.Second)
	result := key + "abcxyz"

	return result, nil
}

func main() {
	var singleRequest singleflight.Group

	getData := func(requestID int, key string) (string, error) {
		log.Printf("request %v start request ...", requestID)

		// 合并请求
		value, _, _ := singleRequest.Do(key, func() (ret interface{}, err error) {
			log.Printf("request %v is begin...", requestID)
			ret, err = getDataFromDB(key)
			log.Printf("request %v end!", requestID)
			return
		})

		return value.(string), nil
	}

	var wg sync.WaitGroup
	key := "orderID"
	for i := 1; i < 10; i++ {
		wg.Add(1)
		go func(wg *sync.WaitGroup, requestID int) {
			defer wg.Done()
			value, _ := getData(requestID, key)
			log.Printf("request %v get value: %v", requestID, value)
		}(&amp;wg, i)
	}
	wg.Wait()

	fmt.Println("main end")
}

输出结果

2020/11/20 12:11:44 request 6 start request …
2020/11/20 12:11:44 request 6 is begin…
2020/11/20 12:11:44 request 2 start request …
2020/11/20 12:11:44 request 9 start request …
2020/11/20 12:11:44 request 7 start request …
2020/11/20 12:11:44 request 3 start request …
2020/11/20 12:11:44 request 4 start request …
2020/11/20 12:11:44 request 1 start request …
2020/11/20 12:11:44 request 5 start request …
2020/11/20 12:11:44 request 8 start request …
2020/11/20 12:11:44 db query begin
2020/11/20 12:11:46 db query end
2020/11/20 12:11:46 request 6 end!
2020/11/20 12:11:46 request 6 get value: orderIDabcxyz
2020/11/20 12:11:46 request 8 get value: orderIDabcxyz
2020/11/20 12:11:46 request 2 get value: orderIDabcxyz
2020/11/20 12:11:46 request 9 get value: orderIDabcxyz
2020/11/20 12:11:46 request 7 get value: orderIDabcxyz
2020/11/20 12:11:46 request 3 get value: orderIDabcxyz
2020/11/20 12:11:46 request 4 get value: orderIDabcxyz
2020/11/20 12:11:46 request 1 get value: orderIDabcxyz
2020/11/20 12:11:46 request 5 get value: orderIDabcxyz
main end

从上面的输出内容可以看到,我们同时发起了10个gorourtine来查询同一个订单信息,真正在DB 层查询操作只有一次,大大减少了DB的压力。

总结

使用SingleFlight时,在高并发下且对少量key频繁读取的话,可以大大减少服务器负载。在一定场景下特别的有用,如 CDN。

Protobuf协议实现原理

protobuf是Google开源的一款支持跨平台、语言中立的结构化数据描述和高性能序列化协议,此协议完全基于二进制,所以性能要远远高于JSON/XML。由于出色的传输性能所以常见于微服务之间的通讯,其中最为著名的是Google开源的 gRPC 框架。

那么protobuf是如何实现高性能的,又是如何实现数据的编码和解码的呢?

protobuf协议原理

基于128bits的数据存储方式(Base 128 Varints)

Varint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。


比如对于 int32 类型的数字,一般需要 4 个 byte 来表示。但是采用 Varint,对于很小的 int32 类型的数字,则可以用 1 个 byte 来表示。当然凡事都有好的也有不好的一面,采用 Varint 表示法,大的数字则需要 5 个 byte 来表示。从统计的角度来说,一般不会所有的消息中的数字都是大数,因此大多数情况下,采用 Varint 后,可以用更少的字节数来表示数字信息。

Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。其他的 7 个 bit 都用来表示数字。因此小于 128 的数字都可以用一个 byte 表示。大于 128 的数字,比如 300,会用两个字节来表示:1010 1100 0000 0010。

另外如果从数据大小角度来看,这种表示方式比实现的数据多了一个bit, 所以其实际传输大小就多14%(1/7 = 0.142857143)。

数字1表示方式:0000 0001

对于小的数据比较好理解,正常情况下1的二进制是 0000 0001,使用128bits表示的话,首位结束标识位也是0,所以两者结果是一样的 0000 0001。

数字 300 表示方式:1010 1100 0000 0010

300

这个有点不太好理解了,这是因为原本用一个字节(8bit)就可以表示,但由于使用128bits表示方法,需要对每个字节的最高位添加一个结束标识位来表示,所以一个字节已经不够用了,需要占用两个字节来表示,其中两个字节最高位都是结束标识位。

如果正向推算的话,我们知道数字300的二进制值 1 0010 1100,用两个字节表示完整值则为
0000 0001 0010 1100 # 二进制
_000 0010 _010 1100 # 二进制每个字节的最高位向左移动一个位置,放入结束标识位
0000 0010 1010 1100 # 转换为128bits方式,1:结束,0:未结束
1010 1100 0000 0010 # 转换为小端字节序, 低字节在前,高字节在后

注意这里是先添加结束标识符,然后再转为小端字节序。

协议数据结构

消息经过序列化后会成为一个二进制数据流,该流中的数据为一系列的 Key-Value 对。如下图所示:

图 7. Message Buffer
图 7. Message Buffer

采用这种 Key-Pair 结构无需使用分隔符来分割不同的 Field。对于可选的 Field,如果消息中不存在该 field,那么在最终的 Message Buffer 中就没有该 field,这些特性都有助于节约消息本身的大小。

Key 用来标识具体的 field,在解包的时候,客户端创建一个结构对象,Protocol Buffer 从数据流中读取并反序列化数据,并根据 Key 就可以知道相应的 Value 应该对应于结构体中的哪一个 field。

而Key也是由以下两部分组成

Key 的定义如下:

1(field_number << 3) | wire_type
Key的定义

可以看到 Key 由两部分组成。第一部分是 field_number。第二部分为 wire_type。表示 Value 的传输类型。

一个字节的低3位表示数据类型,其它位则表示字段序号。

Wire Type 可能的类型如下表所示:

表 1. Wire Type

Type
MeaningUsed For
0Varintint32, int64, uint32, uint64, sint32, sint64, bool, enum
164-bitfixed64, sfixed64, double
2Length-delimistring, bytes, embedded messages, packed repeated fields
3Start groupGroups (deprecated)
4End groupGroups (deprecated)
532-bitfixed32, sfixed32, float
Wire Type 数据类型
message Test1 {
    required int32 a = 1;
}

在我们的例子当中,field id 所采用的数据类型为 int32,因此对应的 wire type 为 0。细心的读者或许会看到在 Type 0 所能表示的数据类型中有 int32 和 sint32 这两个非常类似的数据类型。Google Protocol Buffer 区别它们的主要意图也是为了减少 encoding 后的字节数。

每个数据头同样采用128bits方式,一般1个字节就足够了,

本例中字段a 的序号是1

如上创建了 Test1 的结构并且把 a 的值设为 2,序列化后的二进制数据为
0000 1000 0000 0010

Key 部分是 0000 1000
value 部分是 0000 0010, 其中字节最高位是结束标识位,即10进制的2,我们在转换的时候统一将符号位转为0即可。

协议规定数据头的低3位表示wire_type, 其它字段表示字段序号field_number,因此
0000 1000
_000 1000 # 去掉结束标识符位
_000 1000 # 000 表示数据类型, 这里是Varint
_000 1000 # 0001 这四位表示字段序号

参考

https://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

Golang开发中中使用GitHub私有仓库

私有仓库地址为

github.com/cfanbo/websocket

一、设置私有环境变量 GOPRIVATE

$ go env -w GOPRIVATE=github.com/cfanbo/websocket

对于为什么需要设置 GOPRIMARY 变量,可以参考这里

对于GOPRIVATE值级别分为仓库级别和账号级别。

如果只有一个仓库,直接设置为仓库地址即可。如果有多个私有仓库的话,使用”,”分开,都在这个账号下,也可以将值设置为账号级别,这样账号下的所有私有仓库都可以正常访问。如 http://github.com/cfanbo

如果不想每次都重新设置,我们也可以利用通配符,例如:

$ go env -w GOPRIVATE="*.example.com"

这样子设置的话,所有模块路径为 example.com 的子域名(例如:git.example.com)都将不经过 Go module proxy 和 Go checksum database,需要注意的是不包括 example.com 本身。

国内用户访问仓库建议设置 GORPOXY为 https://proxy.golang.org,direct

二、设置凭证

使用私有仓库一定要绕不开权限设置这一步。访问仓库来常见的有两种方式,分别为SSH和 Https 。对于私有仓库来说,ssh可以设置rsa私钥来访问,https这种则可以使用用户名和密码,一般通过命令行访问的时候,会自动提示用户输入这些信息。

对于权限控制这一块可参考官方文档 。其实在官方文档里还提供了第三种访问仓库的方式,那就是 Personal access token,简称 PAT, 这种 Token 是专门为api调用提供的,常见于自动化工作流中,如 CICD场景。

这里我们就利用PAT 来实现

  1. 在Github.com 网站生成 Personal access tokens,新手可参考官方教程文档
  2. 本地配置token凭证
$ git config --global url."https://${username}:${access_token}@github.com".insteadOf / "https://github.com"

如果你在使用Github Actions部署时,遇到无法读取版本号问题,需要改写成 git config –global url.”https://${username}:${access_token}@github.com”.insteadOf “https://github.com

命令验证

go get github.com/cfanbo/websocket

到这里基本配置基本完成了。

其它场景

如果要用在docker环境中的话,也要记得设置上面的几个环境变量值。

以下为一个docker示例

# Start from the latest golang base image
FROM golang:alpine

RUN GOCACHE=OFF

# 设置环境变量
RUN go env -w GOPRIVATE=github.com/ereshzealous

# Set the Current Working Directory inside the container
WORKDIR /app

# Copy everything from the current directory to the Working Directory inside the container
COPY . .

RUN apk add git

# 设置访问仓库凭证
RUN git config --global url."https://user-name:<access-token>@github.com".insteadOf "https://github.com"

# Build the Go app
RUN go build -o main .

# Expose port 8080 to the outside world
EXPOSE 8080

#ENTRYPOINT ["/app"]

# Command to run the executable
CMD ["./main"]

MySQL DBA利器innodb_ruby

innodb_ruby简介

innodb_ruby是一款用ruby写的用来分析 innodb 物理文件的专业DBA工具,可以通过这款工具来窥探innodb内部的一些结构。
注意不要在生产环境中使用此工具,以避对线上服务造成影响。官方网址 https://rubygems.org/gems/innodb_ruby

注意如果(Linux)平台安装中遇到错误一般情况是由于缺少依赖库造成的,可以先安装 sudo apt-get install libxslt1-dev libxml2-dev 相关库。

命令语法

在执行以下命令时,建议切换到MySQL 的 datadir 目录里。

sxf@ubuntu:~$ innodb_space --help

Usage: innodb_space <options> <mode>
innodb_space <选项> <模式>
命令主要分 options 和 mode 两大部分。

Invocation examples:

  innodb_space -s ibdata1 [-T tname [-I iname]] [options] <mode>
    Use ibdata1 as the system tablespace and load the tname table (and the
    iname index for modes that require it) from data located in the system
    tablespace data dictionary. This will automatically generate a record
    describer for any indexes.

    参数:
    -s 参数指的是系统表空间文件 ibdata1, 这个一般在datadir目录里可以找到。
    -T 数据表名称,一般为数据库其中一个表的物理文件路径
    -I 表示索引的名称, 如果是主键的话,直接填写 -I PRIMARY 即可,此时可省略此参数

    如 innodb_space -s ibdata1 -T lab/tb space-indexes,则表示查看lab数据库的tb表的索引统计信息


  innodb_space -f tname.ibd [-r ./desc.rb -d DescClass] [options] <mode>
    Use the tname.ibd table (and the DescClass describer where required).

The following options are supported:

  --help, -?
    Print this usage text.

  --trace, -t
    Enable tracing of all data read. Specify twice to enable even more
    tracing (including reads during opening of the tablespace) which can
    be quite noisy.

  --system-space-file, -s <arg>
    Load the system tablespace file or files <arg>: Either a single file e.g.
    "ibdata1", a comma-delimited list of files e.g. "ibdata1,ibdata1", or a
    directory name. If a directory name is provided, it will be scanned for all
    files named "ibdata?" which will then be sorted alphabetically and used to
    load the system tablespace.

  --table-name, -T <name>
    Use the table name <name>.
    表名

  --index-name, -I <name>
    Use the index name <name>.
    索引名

  --space-file, -f <file>
    Load the tablespace file <file>.

  --page, -p <page>
    Operate on the page <page>.
    页数

  --level, -l <level>
    Operate on the level <level>.
    索引树层级数,一般不会超过3

  --list, -L <list>
    Operate on the list <list>.

  --fseg-id, -F <fseg_id>
      Operate on the file segment (fseg) <fseg_id>.

  --require, -r <file>
    Use Ruby's "require" to load the file <file>. This is useful for loading
    classes with record describers.

  --describer, -d <describer>
    Use the named record describer to parse records in index pages.

The following modes are supported:
模式项列表

  系统表空间
  system-spaces
    Print a summary of all spaces in the system.
    

  数据字典表(information_schema中数据库SYS_TABLES表内容,下同)
  data-dictionary-tables
    Print all records in the SYS_TABLES data dictionary table.

  data-dictionary-columns
    Print all records in the SYS_COLUMNS data dictionary table.

  data-dictionary-indexes
    Print all records in the SYS_INDEXES data dictionary table.

  data-dictionary-fields
    Print all records in the SYS_FIELDS data dictionary table.

  
  汇总表空间中的所有页信息,需要使用 --page/-p 参数指定页数
  space-summary
    Summarize all pages within a tablespace. A starting page number can be
    provided with the --page/-p argument.

  汇总表空间中的所有索引页信息,对于分析每个页记录填充率情况的时候很有用,同样需要使用--page/-p指定页数
  space-index-pages-summary
    Summarize all "INDEX" pages within a tablespace. This is useful to analyze
    page fill rates and record counts per page. In addition to "INDEX" pages,
    "ALLOCATED" pages are also printed and assumed to be completely empty.
    A starting page number can be provided with the --page/-p argument.

  与space-index-pages-summary差不多,但只显示一些摘要信息,需要配合参数一块使用
  space-index-fseg-pages-summary
    The same as space-index-pages-summary but only iterate one fseg, provided
    with the --fseg-id/-F argument.

  space-index-pages-free-plot
    Use Ruby's gnuplot module to produce a scatterplot of page free space for
    all "INDEX" and "ALLOCATED" pages in a tablespace. More aesthetically
    pleasing plots can be produced with space-index-pages-summary output,
    but this is a quick and easy way to produce a passable plot. A starting
    page number can be provided with the --page/-p argument.

  遍历空间中的所有页面,统计每个类型的页共占用了多少页
  space-page-type-regions
    Summarize all contiguous regions of the same page type. This is useful to
    provide an overall view of the space and allocations within it. A starting
    page number can be provided with the --page/-p argument.

  按类型汇总所有页面信息
  space-page-type-summary
    Summarize all pages by type. A starting page number can be provided with
    the --page/-p argument.

  表空间中所有索引统计信息(系统空间或每个文件表空间)
  space-indexes
    Summarize all indexes (actually each segment of the indexes) to show
    the number of pages used and allocated, and the segment fill factor.

  space-lists
    Print a summary of all lists in a space.

  space-list-iterate
    Iterate through the contents of a space list.

  space-extents
    Iterate through all extents, printing the extent descriptor bitmap.

  space-extents-illustrate
    Iterate through all extents, illustrating the extent usage using ANSI
    color and Unicode box drawing characters to show page usage throughout
    the space.

  space-extents-illustrate-svg
    Iterate through all extents, illustrating the extent usage in SVG format
    printed to stdout to show page usage throughout the space.

  space-lsn-age-illustrate
    Iterate through all pages, producing a heat map colored by the page LSN
    using ANSI color and Unicode box drawing characters, allowing the user to
    get an overview of page modification recency.

  space-lsn-age-illustrate-svg
    Iterate through all pages, producing a heat map colored by the page LSN
    producing SVG format output, allowing the user to get an overview of page
    modification recency.

  space-inodes-fseg-id
    Iterate through all inodes, printing only the FSEG ID.

  space-inodes-summary
    Iterate through all inodes, printing a short summary of each FSEG.

  space-inodes-detail
    Iterate through all inodes, printing a detailed report of each FSEG.

  通过递归整个B+树(通过递归扫描所有页面,而不仅仅是按列表的叶子页面)来执行索引扫描(执行完整索引扫描)
  index-recurse
    Recurse an index, starting at the root (which must be provided in the first
    --page/-p argument), printing the node pages, node pointers (links), leaf
    pages. A record describer must be provided with the --describer/-d argument
    to recurse indexes (in order to parse node pages).

  将索引作为索引递归进行递归处理,但在索引页中打印每条记录的偏移量
  index-record-offsets
    Recurse an index as index-recurse does, but print the offsets of each
    record within the page.

  index-digraph
    Recurse an index as index-recurse does, but print a dot-compatible digraph
    instead of a human-readable summary.

  打印指定 level 级别的所有page信息
  index-level-summary
    Print a summary of all pages at a given level (provided with the --level/-l
    argument) in an index.

  index-fseg-internal-lists
  index-fseg-leaf-lists
    Print a summary of all lists in an index file segment. Index root page must
    be provided with --page/-p.

  index-fseg-internal-list-iterate
  index-fseg-leaf-list-iterate
    Iterate the file segment list (whose name is provided in the first --list/-L
    argument) for internal or leaf pages for a given index (whose root page
    is provided in the first --page/-p argument). The lists used for each
    index are "full", "not_full", and "free".

  index-fseg-internal-frag-pages
  index-fseg-leaf-frag-pages
    Print a summary of all fragment pages in an index file segment. Index root
    page must be provided with --page/-p.

  page-dump
    Dump the contents of a page, using the Ruby pp ("pretty-print") module.

  page-account
    Account for a page's usage in FSEGs.

  page-validate
    Validate the contents of a page.

  页目录字典记录
  page-directory-summary
    Summarize the record contents of the page directory in a page. If a record
    describer is available, the key of each record will be printed.

  对一个页的所有记录进行汇总
  page-records
    Summarize all records within a page.

  详细说明一个页面的内容,并且根据类型进行着色显示
  page-illustrate
    Produce an illustration of the contents of a page.

  record-dump
    Dump a detailed description of a record and the data it contains. A record
    offset must be provided with -R/--record.

  record-history
    Summarize the history (undo logs) for a record. A record offset must be
    provided with -R/--record.

  undo-history-summary
    Summarize all records in the history list (undo logs).

  undo-record-dump
    Dump a detailed description of an undo record and the data it contains.
    A record offset must be provided with -R/--record.

参数详解

测试数据库 lab ,表名 tb ,表结构如下,

CREATE TABLE `tb` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `num` int(11) NOT NULL,
  `age` tinyint(1) unsigned DEFAULT '13',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=40001 DEFAULT CHARSET=latin1;

这里先添加了3万多的测试数据。

系统表空间 system-spaces

root@ubuntu:/var/lib/mysql# innodb_space -s ibdata1 -T lab/tb system-spaces
name                            pages       indexes
(system)                        768         7
lab/tb                          448         1
mysql/engine_cost               6           1
mysql/gtid_executed             6           1
mysql/help_category             7           2
mysql/help_keyword              15          2
mysql/help_relation             9           1
mysql/help_topic                576         2
mysql/innodb_index_stats        6           1
mysql/innodb_table_stats        6           1
mysql/plugin                    6           1
mysql/server_cost               6           1
mysql/servers                   6           1
mysql/slave_master_info         6           1
mysql/slave_relay_log_info      6           1
mysql/slave_worker_info         6           1
mysql/time_zone                 6           1
mysql/time_zone_leap_second     6           1
mysql/time_zone_name            6           1
mysql/time_zone_transition      6           1
mysql/time_zone_transition_type 6           1
sys/sys_config                  6           1

innodb_space列出所有物理对象的数量。这些文件一般在相应数据库中可以找到扩展名为.ibd 的文件,如 sys库的sys_config.ibd文件

索引结构、数据分配情 space-indexes

root@ubuntu:/var/lib/mysql# innodb_space -s ibdata1 -T lab/tb space-indexes
id          name                            root        fseg        fseg_id     used        allocated   fill_factor
43          PRIMARY                         3           internal    1           1           1           100.00%
43          PRIMARY                         3           leaf        2           75          96          78.12%

列说明:
name:索引的名称,PRIMARY代表的就是聚集索引,因为InnoDB表是聚集所以组织表,行记录就是聚集索引;idx_c就是辅助索引的名称。
root:索引中根节点的page号;可以看出聚集索引的根节点是第3个page(为什么是从第三个page开始,看下文space-page-type-regions),辅助索引的根节点是第4个page。
fseg:page的说明,internal表示非叶子节点或属于根节点,leaf表示叶子节点(也就是数据页)。
used:索引使用了多少个page,可以看出聚集索引的根节点点使用了1个page,叶子节点使用了3个page;辅助索引idx_c的叶子节点使用了1个page。
allocated:索引分配了多少个page,可以看出聚集索引的根节点分配了1个page,叶子节点分配了3个page;辅助索引idx_c的叶子节点分配了1个page
fill_factor:索引的填充度,所有的填充度都是100%。

遍历空间中的所有页面,统计每个类型的页共占用了多少 space-page-type-regions

start       end         count       type
0           0           1           FSP_HDR
1           1           1           IBUF_BITMAP
2           2           1           INODE
3           5           3           INDEX
6           6           1           FREE (INDEX)
7           36          30          INDEX
37          63          27          FREE (ALLOCATED)
64          106         43          INDEX
107         127         21          FREE (ALLOCATED)

列说明:
start:从第几个page开始。
end:从第几个page结束。
count:占用了多少个page。
type:page的类型。

从上面的结果可以看出:“FSP_HDR”、“IBUF_BITMAP”、“INODE”是分别占用了0,1,2号的page,从3号page开始才是存放数据和索引的页(Index)
接下来,根据得到的聚集索引和辅助索引的根节点来获取索引上的其他page的信息。

索引级数统计信息 index-level-summary

root@ubuntu:/var/lib/mysql# innodb_space -s ibdata1 -T lab/tb -I PRIMARY -l 0 index-level-summary
page    index   level   data    free    records min_key
4       43      0       14952   1036    534     id=1
5       43      0       14952   1036    534     id=535
7       43      0       14952   1036    534     id=1069
8       43      0       14952   1036    534     id=1603
9       43      0       14952   1036    534     id=2137
10      43      0       14952   1036    534     id=2671
11      43      0       14952   1036    534     id=3205
12      43      0       14952   1036    534     id=3739
13      43      0       14952   1036    534     id=4273
14      43      0       14952   1036    534     id=4807
15      43      0       14952   1036    534     id=5341
16      43      0       14952   1036    534     id=5875
17      43      0       14952   1036    534     id=6409
18      43      0       14952   1036    534     id=6943
19      43      0       14952   1036    534     id=7477
20      43      0       14952   1036    534     id=8011
21      43      0       14952   1036    534     id=8545
22      43      0       14952   1036    534     id=9079
23      43      0       14952   1036    534     id=9613
24      43      0       14952   1036    534     id=10147
25      43      0       14952   1036    534     id=10681
26      43      0       14952   1036    534     id=11215
27      43      0       14952   1036    534     id=11749
28      43      0       14952   1036    534     id=12283
29      43      0       14952   1036    534     id=12817
30      43      0       14952   1036    534     id=13351
31      43      0       14952   1036    534     id=13885
32      43      0       14952   1036    534     id=14419
33      43      0       14952   1036    534     id=14953
34      43      0       14952   1036    534     id=15487
35      43      0       14952   1036    534     id=16021
36      43      0       14952   1036    534     id=16555
64      43      0       14952   1036    534     id=17089
65      43      0       14952   1036    534     id=17623
66      43      0       14952   1036    534     id=18157
67      43      0       14952   1036    534     id=18691
68      43      0       14952   1036    534     id=19225
69      43      0       14952   1036    534     id=19759
70      43      0       14952   1036    534     id=20293
71      43      0       14952   1036    534     id=20827
72      43      0       14952   1036    534     id=21361
73      43      0       14952   1036    534     id=21895
74      43      0       14952   1036    534     id=22429
75      43      0       14952   1036    534     id=22963
76      43      0       14952   1036    534     id=23497
77      43      0       14952   1036    534     id=24031
78      43      0       14952   1036    534     id=24565
79      43      0       14952   1036    534     id=25099
80      43      0       14952   1036    534     id=25633
81      43      0       14952   1036    534     id=26167
82      43      0       14952   1036    534     id=26701
83      43      0       14952   1036    534     id=27235
84      43      0       14952   1036    534     id=27769
85      43      0       14952   1036    534     id=28303
86      43      0       14952   1036    534     id=28837
87      43      0       14952   1036    534     id=29371
88      43      0       14952   1036    534     id=29905
89      43      0       14952   1036    534     id=30439
90      43      0       14952   1036    534     id=30973
91      43      0       14952   1036    534     id=31507
92      43      0       14952   1036    534     id=32041
93      43      0       14952   1036    534     id=32575
94      43      0       14952   1036    534     id=33109
95      43      0       14952   1036    534     id=33643
96      43      0       14952   1036    534     id=34177
97      43      0       14952   1036    534     id=34711
98      43      0       14952   1036    534     id=35245
99      43      0       14952   1036    534     id=35779
100     43      0       14952   1036    534     id=36313
101     43      0       14952   1036    534     id=36847
102     43      0       14952   1036    534     id=37381
103     43      0       14952   1036    534     id=37915
104     43      0       14952   1036    534     id=38449
105     43      0       14952   1036    534     id=38983
106     43      0       13552   2460    484     id=39517
root@ubuntu:/var/lib/mysql# innodb_space -s ibdata1 -T lab/tb -I PRIMARY -l 1 index-level-summary
page    index   level   data    free    records min_key
3       43      1       1050    15168   75      id=1
root@ubuntu:/var/lib/mysql# innodb_space -s ibdata1 -T lab/tb -I PRIMARY -l 2 index-level-summary
page    index   level   data    free    records min_key

这里我们分别查看了0、1和2级别的信息,但2级别是没有任何信息输出的,所以这里的索引树高度是2。

列说明:

page 页数,可以看到并不一定是连续的
index 待确认
level 级数
data 数据大小
free 空闲大小
records 记录个数
min_key 最小记录id,每个page都会有一个最小记录id,二分法查找记录时使用.

查看汇总页记录 page-records

root@ubuntu:/var/lib/mysql# innodb_space -s ibdata1 -T lab/tb -p 3 page-records
Record 126: (id=1) → #4
Record 140: (id=535) → #5
Record 154: (id=1069) → #7
Record 168: (id=1603) → #8
Record 182: (id=2137) → #9
Record 196: (id=2671) → #10
Record 210: (id=3205) → #11
Record 224: (id=3739) → #12
Record 238: (id=4273) → #13
Record 252: (id=4807) → #14
Record 266: (id=5341) → #15
Record 280: (id=5875) → #16
Record 294: (id=6409) → #17
Record 308: (id=6943) → #18
Record 322: (id=7477) → #19
Record 336: (id=8011) → #20
Record 350: (id=8545) → #21
Record 364: (id=9079) → #22
Record 378: (id=9613) → #23
Record 392: (id=10147) → #24
Record 406: (id=10681) → #25
Record 420: (id=11215) → #26
Record 434: (id=11749) → #27
Record 448: (id=12283) → #28
Record 462: (id=12817) → #29
Record 476: (id=13351) → #30
Record 490: (id=13885) → #31
Record 504: (id=14419) → #32
Record 518: (id=14953) → #33
Record 532: (id=15487) → #34
Record 546: (id=16021) → #35
Record 560: (id=16555) → #36
Record 574: (id=17089) → #64
Record 588: (id=17623) → #65
Record 602: (id=18157) → #66
Record 616: (id=18691) → #67
Record 630: (id=19225) → #68
Record 644: (id=19759) → #69
Record 658: (id=20293) → #70
Record 672: (id=20827) → #71
Record 686: (id=21361) → #72
Record 700: (id=21895) → #73
Record 714: (id=22429) → #74
Record 728: (id=22963) → #75
Record 742: (id=23497) → #76
Record 756: (id=24031) → #77
Record 770: (id=24565) → #78
Record 784: (id=25099) → #79
Record 798: (id=25633) → #80
Record 812: (id=26167) → #81
Record 826: (id=26701) → #82
Record 840: (id=27235) → #83
Record 854: (id=27769) → #84
Record 868: (id=28303) → #85
Record 882: (id=28837) → #86
Record 896: (id=29371) → #87
Record 910: (id=29905) → #88
Record 924: (id=30439) → #89
Record 938: (id=30973) → #90
Record 952: (id=31507) → #91
Record 966: (id=32041) → #92
Record 980: (id=32575) → #93
Record 994: (id=33109) → #94
Record 1008: (id=33643) → #95
Record 1022: (id=34177) → #96
Record 1036: (id=34711) → #97
Record 1050: (id=35245) → #98
Record 1064: (id=35779) → #99
Record 1078: (id=36313) → #100
Record 1092: (id=36847) → #101
Record 1106: (id=37381) → #102
Record 1120: (id=37915) → #103
Record 1134: (id=38449) → #104
Record 1148: (id=38983) → #105
Record 1162: (id=39517) → #106

每一行代表一个page记录,id=1表示这个表中的记录最小主键id=1, #4则表示在页号是4。

上面我们使用 index-level-summary 查看的level 1级别的索引page 3中共有75条记录,最小id为1,这里通过 page-records确认了这一点。

这里查看的是聚集索引(主键索引),如果是普通索引的话,会看到打印内容有一些不一样,类似于 RECORD: (age=21) → (id=100) 这种的,即指向了主键值。

现在我们在看一下page 4中的内容

root@ubuntu:/var/lib/mysql# innodb_space -s ibdata1 -T lab/tb -p 4 page-records | head
Record 126: (id=1) → (num=1, age=13)
Record 154: (id=2) → (num=2, age=13)
Record 182: (id=3) → (num=3, age=13)
Record 210: (id=4) → (num=4, age=13)
Record 238: (id=5) → (num=5, age=13)

我们发现输出的内容与page 3 的有些不一样,这里输出的是完整的详情记录,但page 3是一个一条记录与页的对应关系,我们一般称其为页目录。

推荐阅读

http://vlambda.com/wz_xeipHG6Q3r.html
https://www.cnblogs.com/cnzeno/p/6322842.html
https://blog.csdn.net/weixin_34368949/article/details/91381989
https://www.jianshu.com/p/c51873ea129a

利用jenkins+github实现应用的自动部署及回滚

对于jenkins的介绍这里不再详细写了,此教程只是为了让大家对部署和回滚原理有所了解。

一、创建项目

点击左侧的“New Item”,输入项目名称,如 rollback-demo。

选中 ” 丢弃旧的构建(Discard old builds)”项,在“策略(Strategy” 选择”Log Rotation“, 并输入保留的最大构建个数。

二、常规配置

设置参数,点击”Add Parameter“,依次选择 “Choice Parameter” 和 “String Parameter“这两,填写如下

这里的Name 项为参数名称,用户在操作的时候,会在deploy 和 rollback 两个值中选择一项。

三、源码管理

我们这里选择Git.并填写github.com上的项目地址,记得设置认证 Credentials。构建分支直接使用默认的 */master 即可以了。查看代码浏览器选择 githubweb,并填写项目的github地址。

四、构建触发事件

选择 “GitHub hook trigger for GITScm polling”,表示使用github webhook来触发构建操作,要实现引功能,需要在项目地址github.com里的“setting”里添加一个webhook的url地址,一般地址为http://jenkins.com/github-webhook/

同时为了防止网络通讯不稳定的情况,同时选择 “Poll SCM”, 在调度Schedule 杠中填写 H/5 * * * *,表示5分钟自动从github上拉取数据一次,如果有变化就进行构建。

五、构建环境

我们演示为了简单,使用了php项目,这里不进行任何操作。如果java、NodeJS或者Golang的话,可能需要进行一些操作, 有时为了方便会把这些操作放在下一步shell脚本里进行。

六、构建配置

1.添加构建步骤,点击Add build step,选择“Execute shell”,填写内容如下

#!/bin/bash
case $deploy_env in
deploy)
	echo "deploy $deploy_env"
    ;;
rollback)
	echo "rollback $deploy_env version=$version"
    cp -R ${JENKINS_HOME}/jobs/rollback-demo/builds/${version}/archive/*.* ./
    pwd &amp;&amp; ls
    ;;
    *)
    exit
    ;;
esac    

可以看到当rollback的时候,是从原来构建归档路径里把文件复制出来。

这里正常情况下应该有一些单元测试之类的脚本,这里省略不写了。

2.添加构建后的操作。点击“Add post-build action” -> “Archive the artifacts” 配置用于归档的文件为“**/*“,表示所有文件。这点十分重要,只有每次构建完归档了才有东西回滚,另外时间长了,归档的内容越来越多,所以上面设置了最大归档个数。
3.再次添加”Send build artifacts over SSH”,配置内容如下

注意 shell脚本里的路径比 Remote directory 的路径里多一个/data 目录,这是由于在配置 ssh server 的时候指定了一个根目录为 /data.

如果在 Add post-build action 中找不到send build artifacts over ssh ,则说明需要安装一下插件,左侧点击“Manage Jenkins”-> “Manage Plugis”, 搜索“Publish Over SSH”安装即可。

到这里为了基本配置完成了。

测试配置

这里我们首次手动构建一次,点击项目页面左侧菜单的“build with Parameter”,显示如下

在正常deploy的时候,version字段时忽略掉即可。如果要加滚的话,则需要选择”rollback”,同时填写 version字段号,这个字段号为页面左下角build history的编号,就是以#开始的那些数字