golang的channel实现原理

channel是golang中特有的一种数据结构,通常与goroutine一起使用,下面我们就介绍一下这种数据结构。

channel数据结构

channel最重要的一个结构体就是hchan,我们创建一个channel的时候,实际上是创建了一个下面结构体的实例。

hchan结构体

// src/runtime/chan.go

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

字段说明

  • qcount 当前channel中的元素数量
  • dataqsiz 环形队列的大小
  • buf 指向dataqsize的数组指针,只有缓冲chan有效
  • closed 当前channel关闭状态
  • elemsize 存储元素的大小
  • elemtype 存储元素的数据类型
  • sendx 发送操作处理到的索引位置,最大值为数组buf的最大下标值
  • recvx 接收操作处理到的索引位置,最大值为数组buf的最大下标值
  • recvq 接收队列,双向链表,阻塞元素
  • sendq 发送列队,双向链表,阻塞元素
  • lock 锁,,用来保护sudog里的所的字段
hchan struct

其中elemsizeelemtype 表示存储数据的大小和类型;sendxrecvx是指向底层数据的索引位置,表示当前处理的进度位置;recvqsendq 是一个由双向链表实现的队列,它存储的内容是由于队列dataqsize过小,而阻塞的数据。

每次进行发送数据和读取数据时都需要加锁。

waitq结构体

// src/runtime/chan.go

type waitq struct {
	first *sudog
	last  *sudog
}

sudog结构体

// src/runtime/runtime2.go

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

这里 sudog 实际上是对 goroutine 的一个封装,在等待列表中如sendqrecvq,一个sudog 就是一个goroutine。

sudogs 是通过一个特殊的池来分配的,通过acquireSudog() releaseSudog()进行获取和释放。

sudog里的字段是由 hchan.lock 锁来进行保护。

channel 整体结构图

hchan 结构图(来源:互联网技术窝)

创建

// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

通过编译可以发现channel的创建是由makechan()函数来完成的。源码

// src/runtime/chan.go

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
	}
	return c
}

函数返回的是一个指针类型,因此我们可以在函数中通过参数直接传递,不需要再转为指针使传递。

步骤

  1. 数据合法性检查,包括发送数据的类型和大小
  2. 根据不同场景分配内存,主要针对buf字段
    a. 内存大小为0,注意这时c.buf 的值为c.raceaddr()
    b. 元素不包含指针,一次性分配一段内存地址
    c. 元素包含指针,分配内存
  3. 初始化其它字段

第一个参数 *chantype 结构定义

// src/runtime/type.go

type chantype struct {
	typ  _type
	elem *_type
	dir  uintptr
}

实际上创建一个channel, 只是对一个hchan结构体进行了一些初始化操作,并返回其指针。因此我们在函数传递时,不需要传递指针,直接使用就可以了,因为它本身就是一个指针的类型。

注意:对于chan内存是在heap上分配的。

发送数据

对于channel的写操作是由chansend() 函数来实现的。

对于分送数据chan有三种情况,分别是直接发送,缓存区发送阻塞发送,其中阻塞发送涉及到GMP 的调度,理解起来有些吃力。

在发送数据前需要进行加锁操作,发送完再解锁,保证原子性操作。

直接发送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	......

	// 直接发送
	// 如果接收队列中有接收者,则直接将数据发给接收者,重点在send()函数,并在函数里进行解锁
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	......
}

如果接收队列中有接收者,则优化从接收者从队列中取出来sg(sg := c.recvq.dequeue()),然后再通过调用 send() 函数将数据发送给接收者即可。

channel send

在send()函数里,会执行一个回调函数主要用来进行解锁c.lock。真正的发送操作是函数 sendDirect(),通过memmove(dst, src, t.size) 将数据复制过去。

缓冲区发送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	......

	// 缓冲区发送
	// 接收者队列中没有接收者goroutine
	// 当前channel中的元素<队列的大小,有缓冲buffer未满的情况
	// 将数据存放在sendx在buf数组中的索引位置,然后再将sendx索引+1
	// 由于是一个循环数组,所以如果达到了dataqsize,则从0开始,同时个数+1
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}	

	......
}

如果当前recvq 队列里没有处于等待执行的sudog的话,则需要将数据发送到缓冲队列中(如果当前队列为缓冲chan)。

假设当前buffer大小为6(dataqsiz=6),数据个数为0(qcount=0),这里写入6个数据,如下图。

channel send

如果当前缓冲区的元素数量<队列的大小,说明缓冲区还没有满,还可以继续装载数据。

这时第一步先计算出 s.sendx 索引位置的内存地址,然后调用 typememmove() 函数将qp复制到内存地址,再将s.sendx索引值+1,同时c.qcount++

sendx = dataqsiz 的时候,说明已到了数组最后一个元素,下次存储数据的话,则需要重新从0开始了,所以需要重置为0

buf是一个由数组组成的队列,满足队列的FIFO的机制,最新存储的数据也先消费,最多可以存储 dataqsiz 个数量。超出这个数据量就需要使用第三种 阻塞发送 方式了。

sendx 始终保存的是下次存储数据的数组索引位置,每次使用完记得+1 。每次存储以前都需要判断当前buffer是否有空间可用 c.qcount < c.dataqsiz

总结

  • q.sendx 最大值为 c.dataqsiz -1,即数组的最大索引值。
  • q.count 是当前chan 存储的元素个数,有可能 > c.dataqsiz

阻塞发送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	......

	
	// 阻塞发送
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true	

	......
}

如果当buff也写满的话,再send数据的话,则需要进行阻塞发送了。

channel send

假如我们有一个缓冲chan,但缓冲大小已经使用完,再次发送数据的话,则需要进入sendq队列了(将sudog绑定到一个goroutine,并放在sendq,等待读取)

对于阻塞的情况,理解起来有些吃力,因为涉及到GMP的关系和调度。

  1. 调用 getg() 函数获取当前运行的goroutine
  2. 调用 acquireSudog() 函数获取一个sudog,并进行数据绑定
  3. 将mysg 添加到发送队列sendq,并设置为gp.waiting
  4. 更改goroutine状态
  5. 设置goroutine为等待唤醒状态,调用 atomic.Store8(&gp.parkingOnChan, 1)函数?
  6. 通过keepAlive()函数可以保证发送的值一直有效,直到被接收者取走
  7. 进行清理工作
  8. 释放 sudog 结构体

总结

读取数据

对于channel的读取方式:

v <- ch
v, ok <- ch

其中 v<-ch 对应的是runtime.chanrecv1(), v, ok <-ch 对应的是runtime.chanrecv2()。但这两个函数最终调用的还是同一个函数,即 chanrecv()

我们先看一下官方文档对这个函数的说明

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
  • chanrecv 用来从chan 中接收数据,并将接收的数据写入到ep
  • 如果ep为 nil 的话,则接收的数据将被忽略
  • 如果非阻塞的且没有可接收的数据将返回 (false ,false)
  • 如果chan已关闭,零值 *ep 和返回值将是true, false,否则使用一个元素代替*ep并返回 (true, true)
  • 一个非nil的 ep, 必须指向heap或者调用stack
// src/runtime/chan.go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	
	// 如果c为nil,表示非法操作,则直接gopark(),表示出让当前GMP中的P的使用权,允许其它G使用
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	...
	
	// 如果chan已关闭且元素个数为0
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			// 设置内存内容为类型 c.elemtype 的零值
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

}

如果当前读取的 chan 为nil的话,则会产生死锁,最终提示

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive (nil chan)]:

同时出让自己占用的P,允许其它goroutine抢占使用。

如果读取的chan已关闭,则读取出来的值为零值(函数说明第四条)。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	// 在没有获取锁的情况下,检查非阻塞操作失败
	if !block && empty(c) {
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.

		// 如果当前chan未关闭
		if atomic.Load(&c.closed) == 0 {
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	...

}

这段代码主要是对重排读的情况,进行了双重检测,暂是未明白code中考虑的情况,改天再消化消化。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 加锁,下面才是真正要读取的逻辑
	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	...
}

读取之前先加锁。

对chan的读取与发送一样,同样有三种方式,为直接读取、缓冲区读取和阻塞读取。

直接读取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	// 直接读取 
	// 从c.sendq队列中取sudog, 将数据复制到sg
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
}

获取一个待发送者,如果buffer大小为0,则直接从发送者接收数据。否则从队列头部接收,并将发送者发送的数据放在队列尾部。

chan recv

从c.sendq队列里读取一个 *sudog,通过调用 recv() 函数,将数据从发送者复制到ep中,并返回true,true,表示读取成功。真正读取函数为 recvDirect()

缓冲区读取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...

	// 如果c.qcount>0,说明缓冲区有元素可直接读取
	if c.qcount > 0 {
		// Receive directly from queue
		// 直接从队列中读取
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
}

如果c.qcount > 0,则说明缓冲区里有内容可以读取。则

直接获取 c.recvx 数组索引位置的内存地址,则

  1. r.recvx 索引地址的值读取出来复制给 ep,
  2. 然后更新接收数组索引c.recvx++, 如果>数组索引最大索引值 ,重置为0
  3. 减少元素个数
  4. 释放锁 c.qcount–
  5. 最后unlock返回。
chan recv

阻塞读取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	......

	// c.sendq没有sender,buffer里也是空的,直接阻塞读取
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}
  1. 通过getg()获取一个goroutine
  2. 获取一个sudog结构体
  3. 绑定两者关系
  4. 加入 c.recvq 队列
  5. 设置goroutine为等待唤醒状态
  6. 清理状态
chan recv

关闭chan

关闭chan语句

close(ch)

对于已关闭的chan,是不允许再次关闭的,否则会产生panic。对应的函数为 runtime.closechan()

// src/runtime/chan.go

func closechan(c *hchan) {
	// 如果chan未初始化,触发panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	// 关闭已关闭的chan,触发panicc
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	......

}

对于一个未初始化的chan,或者已关闭的chan,如果再次关闭则会触发panic。

func closechan(c *hchan) {
	......
	// 设置chan关闭状态
	c.closed = 1

	// 声明一个结构体链表gList,主要用来调度使用
	var glist gList

	// release all readers
	// 释放所有readers
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}

		// 设置元素为nil
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	// 释放所有writers,会引起panic,见下面说明
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}

		// 设置元素为nil
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 释放锁
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	// 调度所有g
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		// 唤醒goroutine
		goready(gp, 3)
	}
}
  1. 声明一个gList 链表结构体
  2. 将接收队列 c.recvq 中的所有元素添加到gList 中,并将原来的值设置为
  3. 将发送队列 c.sendq 中的所有元素添加到 gList 中,并将原来的值设置为
  4. 将所有的阻塞goroutine通过函数goready() 进行调度

文章里提到在对c.sendq 处理的时候可能会触发panic。这是因为关闭chan后,执行了 goready() 对原来sendq里的sudogs 进行了进行了重新调度,这时候发现chan已经关闭了,所以会panic。那么又是如何调度的呢?

package main

import (
	"fmt"
	"time"
)

var ch chan int

func f() {

}

func main() {
	ch := make(chan int, 10)
	// buffer大小为10,这里发送11个,使最后一个进入到c.sendq里面
	for i := 0; i < 11; i++ { // i < 10 则正常
		go func(v int) {
			ch <- v
		}(i)

	}
	time.Sleep(time.Second)
	fmt.Println("发送完毕")

	// 关闭chan,将对sendq里的g进行唤醒,唤醒后发现chan关闭状态,直接panic
	close(ch)

	for v := range ch {
		fmt.Println(v)
	}
	time.Sleep(time.Second)
}

有一条广泛流传的关闭 channel 的原则:

don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.

不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。对于只有一个sender的话,直接在sender端关闭就可以。但对于多个sender的话,则需要通过一个信号量进行关闭,参考这里

总结

close 操作会触发goroutine的调度行为。

总结

  1. 对chan 发送或接收数据的时候要保证已初始化状态
  2. 对于已关闭的chan再次关闭会触发panic
  3. 对于发送和读取数据都有三种处理情况,分别是直接读写,缓存区读写和阻塞读写
  4. 发送和接收数据的本质上是对值的复制操作。All transfer of value on the go channels happens with the copy of value.
  5. close(ch)会触发goroutine 的调度行为
  6. 内部使用 sudogs对goroutine进行了一次封装。
  7. 如果buffer中的元素无法保证消费完的话,则会产生内存泄漏的危险,这时gc是无法对这些元素时间清理的,过多的 chan就会占用大量的资源
  8. 对于chan的分配的内存是在哪里,heap还是stack?

参考

Golang 的map实现原理

go Version 1.15.6

map作为一种常见的 key-value 数据结构,不同语言的实现原理基本差不多。首先在系统里分配一段连接的内存地址作为数组,然后通过对map键进行hash算法(最终将键转换成了一个整型数字)定位到不同的桶bucket(数组的索引位置),然后将值存储到对应的bucket里

map hash算法

理想的情况下是一个bucket存储一个值,即数组的形式,时间复杂度为O(1)。

如果存在键值碰撞的话,可以通过 链表法 或者 开放寻址法 来解决。

链表法

d2b5ca33bd970f64a6301fa75ae2eb22-1

开放寻址法

对于开放寻址法有多种算法,常见的有线性探测法,线性补偿探测法,随机探测法等。这里不再介绍,有兴趣的话可以查相关资料。

map基本数据结构

hmap结构体

map的核心数据结构是 /runtime/map.go

// A header for a Go map.
type hmap struct {
	// Note: the format of the hmap is also encoded in cmd/compile/internal/gc/reflect.go.
	// Make sure this stays in sync with the compiler's definition.
	count     int // # live cells == size of map.  Must be first (used by len() builtin)
	flags     uint8
	B         uint8  // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
	noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
	hash0     uint32 // hash seed

	buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
	oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
	nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)

	extra *mapextra // optional fields
}

在源码中表示map 的结构体是hmap,即hash map的缩写。

count 指的是当前map的大小,即当前存放的元素个数,必须放在第一个位置,因为通过len()函数时,会通过unsafe.Poiner从这里读取此值并返回
flags 可以理解为一个标记位,几种值的定义(这里),在多个地方需要使用此值,如mapaccess1()
B 是一个对数,表示当前map持有的 buckets 数量(2^B)。但需要考虑一个重要因素装载因子,所以真正可以使用的桶只有loadFactor * 2^B 个,如果超出此值将触发扩容,默认装载因子是6.5
noverflow 溢出buckets数量
hash0 hash种子,在操作键值的时候,需要引入此值加入随机性
buckets 底层数组指针,指向当前map的底层数组指针
oldbuckets 同是底层数组指针,一般在进行map迁移的时候,用来指向原来旧数组。只有迁移过程中此字段才有意义,此字段数组大小只有buckets 的一半
nevacuate 进度计算器,表示扩容进度,小于此地址的 buckets 迁移完成
extra 可选字段,溢出桶专用,只要有桶装满就会使用

mapextra结构体

// mapextra holds fields that are not present on all maps.
type mapextra struct {
	// If both key and elem do not contain pointers and are inline, then we mark bucket
	// type as containing no pointers. This avoids scanning such maps.
	// However, bmap.overflow is a pointer. In order to keep overflow buckets
	// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
	// overflow and oldoverflow are only used if key and elem do not contain pointers.
	// overflow contains overflow buckets for hmap.buckets.
	// oldoverflow contains overflow buckets for hmap.oldbuckets.
	// The indirection allows to store a pointer to the slice in hiter.
	overflow    *[]*bmap
	oldoverflow *[]*bmap

	// nextOverflow holds a pointer to a free overflow bucket.
	nextOverflow *bmap
}

当 map 的 key 和 value 都不是指针且内联时,将会把 bmap 标记为不含指针,这样可以避免 gc 时扫描整个 hmap。但是,我们看到 bmap 有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。

overflowoldoverflow 只有当键和值不包含指针才使用,其中overflow存储hmap.buckets的溢出桶,而oldoverflow存储hmap.oldbuckets的溢出桶。

间接允许将指向切片的指针存储在hiter中

nextOverlflow 指向下一个空的溢出桶的指针

bmap结构体

// A bucket for a Go map.
type bmap struct {
	// tophash generally contains the top byte of the hash value
	// for each key in this bucket. If tophash[0] < minTopHash,
	// tophash[0] is a bucket evacuation state instead.
	tophash [bucketCnt]uint8
	// Followed by bucketCnt keys and then bucketCnt elems.
	// NOTE: packing all the keys together and then all the elems together makes the
	// code a bit more complicated than alternating key/elem/key/elem/... but it allows
	// us to eliminate padding which would be needed for, e.g., map[int64]int8.
	// Followed by an overflow pointer.
}

bmap即bucket map的缩写。

这里只有一个tophash字段,而实际上在使用中值的类型是不固定的,甚至可以是一个自定义结构体的指针类型。这个结构体看起来可能有点让人费解,其实编译器在编译期间会动态创建一个新的同名数据结构,如下

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

每个桶里有0到7共八个存储位置,也就是说每个桶最多可以存储8个元素,其中将所有的键放在一起,按先后顺序存放在一个keys字段中,再将所有的值按同样的顺序放在一起存放在values字段中(注意:没有将每个键值单独存储在一个字段中,主要是为了解决内存对齐带来的浪费问题。)

例如 map[int64]int8 ,如果按key/value/key/value/… 这种方式存储的话,则在每一个key/value之后需要padding7个字节才行。而如果按上面key/key/key/… 和 value/value/value/…这种方式的话,只需要在最后一个值的后面进行一次padding就可以了,大大节省了内存开支。如果不太懂这一块的话,可以了解一下golang的内存对齐。

如果当前bucket已满的话,则会创建新的bucket,并使用 overflow 字段进行bucket之间的链接,实现单向链表功能。

d4a95ed79625399a0ed58638e776f923-1

map结构体关系图

00b06e623a81cd5ed7fb89935b78d44a-1
来源网络

key的定位算法

在map中要存储一个键值对,必须先找到所在的位置,一般采用hash算法即取模的结果来实现,这里我们主要介绍一下map中是如何实现的。

先介绍一下map装载相关的内容。上面我们介绍 bmap 结构体的时候,有一个 B 字段,这个字段决定了map对应的底层数组的大小,它的大小决定了可以容纳的bucket的个数。如果B=5的话,则可以bucket的数组元素值个数为 2^5=32个,但golang中需要考虑到装载分子6.5这个因素,所以真正装载的元素并没有这么多。每当一个map存储的元素个数占比达到65%的时候,就会触发map的扩容操作。

如果想知道一个key要放在哪个bucket个, 需要先计算出一个hash值,然后再除以 32取余数即可。golang 也是如此实现的,只不过是为了性能考虑,使用了位操作而已。如5 *2 换作用位操作的话,就是将5左移1位即可。每左移1位一次就是*2,左移两位就是*4,同理右移则是相除。

如 hash(key) 计算出的结果为

10010111 | 000011110110110010001111001010100010010110010101010 │ 00110

根据上面说的h.B = 5,则取后面的500110,值为6,则需要将key:value存放在第6号的bucket中。

找到了要存储的桶位置,还需要找到放在桶的什么位置,每个位置我们可以称之为slot或者Cell

// src/runtime/map.go

const (
	// Maximum number of key/elem pairs a bucket can hold.
	bucketCntBits = 3
	bucketCnt     = 1 << bucketCntBits
        ......
}

这里定义了一个bucket最多存放元素个数的相关常量,在go里一个bucket最多可以存放 2^3=8个 元素,再多的话就需要使用到溢出桶overflow。

golang map

slot位置计算公式为取hash的高8位,计算函数为tophash()。这里高8位是 10010111,十进制的值为151, 于是在第6号bucket的中遍历每个slot,直到找到bmap.tophash值为151slot,当前slot在bucket中的索引位置就是要找的键和值的索引位置。然后根据当前位置索引值分别从bmap.keysbmap.values中计算出对应的值。

如果当前bucket中没有找到,就去当前bucket 中的溢出桶overflow中查找,如果overflow为nil,则直接返回空值即可。

总结

要想实现元素的存储定位需要三个步骤:

  1. 根据h.B 来的值,来取出hash结果的最后 b.B 位数字(低位),定位到bucket
  2. 再根据hash结果的高8位,实现在bucket定位到指定的位置
  3. 最后根据位置分别从 bmap.key和bmap.values中读取存储的值内容

map创建

package main

func main() {
	m := make(map[int]string)
	m[0] = "a"
	_ = m[0]
	_, _ = m[0]
	delete(m, 0)
}

我们先看一下与map想着的runtime有哪些

$ go tool compile -S main.go | grep runtime 
        0x0080 00128 (main.go:4)        CALL    runtime.fastrand(SB)
        0x00aa 00170 (main.go:5)        CALL    runtime.mapassign_fast64(SB)
        0x00bc 00188 (main.go:5)        CMPL    runtime.writeBarrier(SB), $0
        0x00f1 00241 (main.go:6)        CALL    runtime.mapaccess1_fast64(SB)
        0x0114 00276 (main.go:7)        CALL    runtime.mapaccess2_fast64(SB)
        0x0137 00311 (main.go:8)        CALL    runtime.mapdelete_fast64(SB)
        0x0153 00339 (main.go:5)        CALL    runtime.gcWriteBarrier(SB)
        0x0160 00352 (main.go:3)        CALL    runtime.morestack_noctxt(SB)
        ......

可以看到每个对map操作基本都有相应的同名调用函数。不过对map的初始化操作函数不是fastrand()函数,而是runtime.makemap()函数(还有一个runtime.makemap_small函数)。fastrand()函数只是用来产生随机hash种子的。

注意:我们这里没有指定创建map的长度,runtime会根据是否指定这个字段做一些情况考虑。

// src/runtime/map.go

// makemap implements Go map creation for make(map[k]v, hint).
// If the compiler has determined that the map or the first bucket
// can be created on the stack, h and/or bucket may be non-nil.
// If h != nil, the map can be created directly in h.
// If h.buckets != nil, bucket pointed to can be used as the first bucket.
func makemap(t *maptype, hint int, h *hmap) *hmap {
	mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
	if overflow || mem > maxAlloc {
		hint = 0
	}

	// initialize Hmap
	if h == nil {
		h = new(hmap)
	}
	h.hash0 = fastrand()

	// Find the size parameter B which will hold the requested # of elements.
	// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
	B := uint8(0)
	for overLoadFactor(hint, B) {
		B++
	}
	h.B = B

	// allocate initial hash table
	// if B == 0, the buckets field is allocated lazily later (in mapassign)
	// If hint is large zeroing this memory could take a while.
	if h.B != 0 {
		var nextOverflow *bmap
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

	return h
}

步骤:

  1. 计算指定的大小所需要的内容是否超出出系统允许的最大分配大小
  2. 初始化hmap,并指定随机种子
  3. 通过overLoadFactor(hint, B)函数找到一个能装下指定map大小个元素个数的最小B,从小到大开始循环B。刚开始map越小越好
  4. 开始对hash table进行初始化。如果B==0则buckets 进行延时初始化操作(赋值的时候才进行初始化),如果B值特别大,则初始化需要一段时间,主要通过makeBucketArray()函数实现

注意 makemap()函数返回的是一个指针,而makeslice()返回的是一个新的结构体,在参数传递的时候,是值复制,两者有差异,有些是引用的是同一个数组,有些不是。

// src/runtime/map.go

// makeBucketArray initializes a backing array for map buckets.
// 1<<b is the minimum number of buckets to allocate.
// dirtyalloc should either be nil or a bucket array previously
// allocated by makeBucketArray with the same t and b parameters.
// If dirtyalloc is nil a new backing array will be alloced and
// otherwise dirtyalloc will be cleared and reused as backing array.
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	base := bucketShift(b)
	nbuckets := base
	// For small b, overflow buckets are unlikely.
	// Avoid the overhead of the calculation.
	if b >= 4 {
		// Add on the estimated number of overflow buckets
		// required to insert the median number of elements
		// used with this value of b.
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets
		up := roundupsize(sz)
		if up != sz {
			nbuckets = up / t.bucket.size
		}
	}

	if dirtyalloc == nil {
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		// dirtyalloc was previously generated by
		// the above newarray(t.bucket, int(nbuckets))
		// but may not be empty.
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		if t.bucket.ptrdata != 0 {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}

	if base != nbuckets {
		// We preallocated some overflow buckets.
		// To keep the overhead of tracking these overflow buckets to a minimum,
		// we use the convention that if a preallocated overflow bucket's overflow
		// pointer is nil, then there are more available by bumping the pointer.
		// We need a safe non-nil pointer for the last overflow bucket; just use buckets.
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

初始化bucket

  1. 对于b<4的话(桶的数量< 2^4),基本不需要溢出桶,这样就可以避免开销。对于b>4(桶的数量> 2^4)的话,则需要创建2^(b-4)个溢出桶。
  2. 数组分配通过函数 newarray()实现,第一个参数是元素类型,每二个参数是数组长度,返回的是数组内存首地址

map读取

对于map中的赋值与修改原理是基本一样的,先找到位置,不管原来位置有没有数据直接存储新数据就可以了。

对于map的赋值操作,是由函数 runtime.mapaccess1()runtime.mapaccess2() 来实现的。两者的唯一区别就是返回值不一样,runtime.mapaccess1() 返回的是一个值,runtime.mapaccess2() 返回的是两个值,第二个值决定了key是否在map中存在。这点可以通过上面我们的 compile 结果中看出他们的区别。这里我们只介绍runtime.mapaccess1()

// src/runtime/map.go

// mapaccess1 returns a pointer to h[key].  Never returns nil, instead
// it will return a reference to the zero object for the elem type if
// the key is not in the map.
// NOTE: The returned pointer may keep the whole map live, so don't
// hold onto it for very long.
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := funcPC(mapaccess1)
		racereadpc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.key, key, callerpc, pc)
	}
	if msanenabled && h != nil {
		msanread(key, t.key.size)
	}
	if h == nil || h.count == 0 {
		if t.hashMightPanic() {
			t.hasher(key, 0) // see issue 23734
		}
		return unsafe.Pointer(&zeroVal[0])
	}
	if h.flags&hashWriting != 0 {
		throw("concurrent map read and map write")
	}
	hash := t.hasher(key, uintptr(h.hash0))
	m := bucketMask(h.B)
	b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
	if c := h.oldbuckets; c != nil {
		if !h.sameSizeGrow() {
			// There used to be half as many buckets; mask down one more power of two.
			m >>= 1
		}
		oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
		if !evacuated(oldb) {
			b = oldb
		}
	}
	top := tophash(hash)
bucketloop:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
			if t.key.equal(key, k) {
				e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				if t.indirectelem() {
					e = *((*unsafe.Pointer)(e))
				}
				return e
			}
		}
	}
	return unsafe.Pointer(&zeroVal[0])
}

runtime.mapaccess1()函数返回一个指定h[key]的指针,如果key不存在,则返回当前map值类型的zero值,注意永远也不会返回一个nil值。

返回的指针在map中是永远有效的,尽量读取完尽快使用并释放,不要长期持久。

步骤

  1. 判断h是否为nil或者h.count值是否为0,如果h为nil则表示未初始化,则可能panic,如果h.count=0,则表示map为空,则直接返回一个zero值。
  2. 考虑是否处于并发读写状态,否则产生panic
  3. 根据键key计算hash值
  4. 计算低B位的掩码bucketMask(h.B),比如 B=5,那 m 就是31,低五位二进制是全1
  5. 计算当前bucket的地址
  6. 根据h.oldbuckets是否为nil来判断是否处于扩容中,如果不是nil则表示当前map正处于扩容状态中。将m减少一半,重新计算当前key在 oldbuckets中的位置,如果oldbuckets没有全部迁移到新bucket的话(bmap.tophash的意义所在),则在oldbuckets中查找。
  7. 计算tophash,即高八位
  8. 真正开始查找key,外层for是循环bucket及溢出桶overflow,内层for是循环桶内的8个slot
    • 如果 b.tophash[i] != top 则表示当前bucket的第i个位置的tophash 与当前key的tophash不同。这时分两种情况:
      一种是当前的标识是 emptyRest 表示剩下的所有slot全是空,则直接结束当前bucket查找,继续下一下bucket重复此步骤(b = b.overflow(t));
      另一种则是继续查找下一个slot,直到8个slot全部查找完毕,然后再查找下一个bucket
    • 如果b.tophash[i] == top,则根据键bucket的首地址+类型计算出字节长度*slot索引值 计算得出key的位置
    • 判断同时bucket中的key与请求的key是否相等,如果相等的话,按取键的方法取出值,并返回,否则继续
  9. 如果在最后仍未找到key,则直接返回zero 值

key的定位公式

k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

value 的定位公式

e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))

其中 dataOffset 表示key相对于bmap的偏移量,结构体如下

// src/runtime/map.go

// data offset should be the size of the bmap struct, but needs to be
// aligned correctly. For amd64p32 this means 64-bit alignment
// even though pointers are 32 bit.
dataOffset = unsafe.Offsetof(struct {
	b bmap
	v int64
}{}.v)

所以键的地址=当前bucket的起始位置(unsafe.Pointer(b)) + key的偏移量(dataOffset)+当前slot索引值(i) * 每个键的大小(uintptr(t.keysize))

而对于值来说,由于bmap.values在b.map.keys后面,所以要先将8个键的地址全部计算上才行,同样值类型也有自己的大小 t.elemsize

对于bucket的几种状态码如下

// src/runtime/map.go

// Possible tophash values. We reserve a few possibilities for special marks.
// Each bucket (including its overflow buckets, if any) will have either all or none of its
// entries in the evacuated* states (except during the evacuate() method, which only happens
// during map writes and thus no one else can observe the map during that time).
emptyRest      = 0 // this cell is empty, and there are no more non-empty cells at higher indexes or overflows.
emptyOne       = 1 // this cell is empty
evacuatedX     = 2 // key/elem is valid.  Entry has been evacuated to first half of larger table.
evacuatedY     = 3 // same as above, but evacuated to second half of larger table.
evacuatedEmpty = 4 // cell is empty, bucket is evacuated.
minTopHash     = 5 // minimum tophash for a normal filled cell.

emptyRest 表示当前cell为空,并且它后面的所有cell也为空,包括溢出桶overflow。
emptyOne 表示当前cell为空。
evacuatedX 扩容相关,当前bucket的slot值迁移到了新的X部分,由于每次扩容时,都是两倍扩容,所以原来bucket中的8个slot值,有的被迁移到了新bucket的X部分,有的是Y部分(原来的1个桶变成2个桶,分别称作X和Y)。
evacuatedY 扩容相关,第二部分迁移完毕。
evacuatedEmpty 当前cell为空,且迁移完成。

minTopHash tophash最小值,如果在调用 tophash(hash)时,计算出的值小于此值,则会加上此值(代码

map赋值

与赋值相关的函数是mapassign,根据key的不同底层调用相同类型的函数 ,常见的有

key 类型插入
uint32mapassign_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
uint64mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
stringmapassign_faststr(t *maptype, h *hmap, ky string) unsafe.Pointer

我们这里只主要介绍常用的mapassign函数。

函数原型

// src/runtime/map.go

// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {}

步骤

  1. 检查是否处于并发模式,否则产生panic
  2. 进行 hash 值计算(同上)
  3. 检查bucket是否进行过初始化,否则调用 newobject(t.buckt) 函数进行初始化,这点我们上面介绍过
  4. 检查map 是否处于扩容状态(h.growing()),如果是则调用函数growWork(t,h,bucket)进行扩容工作
  5. 计算当前bucket的地址
  6. 计算tophash,开始在bucket内通过 for 遍历出相应的位置,这点同上面的方法一样,双层for查找bucket,如果找到了再去overflow查找。找到则进行typedmemmove(t.key, k, key)更新并直接结束整个逻辑
  7. 如果没有map里没有找到key,则判断如果map元素个数+1的话会不会触发扩容
    扩容所必须的两个条件:
    1. 达到了最大装载因子
    2. 溢出桶是否过多
    只要这两个其中的一个条件满足,则先进行扩容然后再重头开始。
  8. 到目前为止才算正式开始追加新的kv。要写入前必须要知道要写入的位置,有两个重要的变量,一个是inserti,另一个是insertk。如果此时inserti 为 nil的话,说明在上面第6步骤的时候遍历了所有的bucket和overflow也没有找到要插入的位置(bucket或overflow满了),则创建新的overflow,然后在新的overflow里计算出key和vlaue要存放的内存位置(代码
  9. 根据key和value的地址,写入新的值,并将当前map的元素个数加1。(需要特别注意的在这里并没有写入value,因为函数参数并没有传入值,所以返回的是一个存放v的内存地址,有了这个地址赋值就解决了)

总结:

触发map扩容的两个条件,一个是装载因子,另一个是overflow过多,对于overflow多少算多呢见下方说明。

map删除

与删除有关的函数为 mapdelete,原型

// src/runtime/map.go

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {}

根据 key 类型的不同,删除操作会被优化成更具体的函数:

key 类型删除
uint32mapdelete_fast32(t *maptype, h *hmap, key uint32)
uint64mapdelete_fast64(t *maptype, h *hmap, key uint64)
stringmapdelete_faststr(t *maptype, h *hmap, ky string)
golang mapdelete

步骤

  1. 判断map是否为nil或者根本就没有任何元素
  2. 检查是否并发模式
  3. 计算hash,双层for循环遍历查找key,找到位置后进行以下工作
    将key位置写入nil或置零值
    将value位置写入nil或置零值
    将tophash位置置为 emptyOne 状态
    判断当前key是不是bucket中的最后一个元素,如果是最后一个元素,且当前bucket还有overflow,而overflow的首个tophash[0] != emptyRest 说明已经没有元素可查找了,于是直接结束。否则判断当前key的下一个 tophash是不是emptyRest,如果不是的话,直接结束逻辑。后面利用for循环重置tophash 的状态为最合适的状态,以方便后期循环使用

map扩容

随着map元素的添加,有可能出现bucket的个数不够,导致overflow桶越来越多,最后变成了一个单向链表了,查找效率越来越差,最差的情况下可能会变成O(n)。这与我们期待的O(1)相反,所以golang为了解决这个问题,就需要对map中的元素进行扩容重新整理,以达到最优的效果。那么什么时候才需要扩容呢?

// src/runtime/map.go

// Did not find mapping for key. Allocate new cell & add entry.

// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
	hashGrow(t, h)
	goto again // Growing the table invalidates everything, so try again
}

这里有两个条件,只要满足其中一项就会进行扩容

  1. 当装载因子越过6.5的时候,这个是内部固定的一个值(查看说明)(代码)
  2. 当overflow 过多的时候
    a. 当 B 小于 15,也就是 bucket 总数 2^B 小于 2^15 时,如果 overflow 的 bucket 数量超过 2^B;
    b. 当 B >= 15,也就是 bucket 总数 2^B 大于等于 2^15,如果 overflow 的 bucket 数量超过 2^15。
    主要分界点就是15(代码

    针对第一个条件,就是说元素太多,桶装不下了,立即进行扩容,添加一倍的桶,保证元素个数最多只能达到桶总容量的65%;
    针对第二种情况一般是先是大量的添加元素,后来又删除了,再添加导致出现了太多的空的overflow,这样查找key的话,就需要遍历太多的桶了,效率大大下降,需要重新对元素进行位置调整,类似于windows系统中的磁盘碎片整理一样,将元素向前面的bucket中迁移即可,当然这种情况在扩容时底层有些特殊,需要注意一下。
// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
// 装载因子>6.5
func overLoadFactor(count int, B uint8) bool {
	return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

// tooManyOverflowBuckets reports whether noverflow buckets is too many for a map with 1<<B buckets.
// Note that most of these overflow buckets must be in sparse use;
// if use was dense, then we'd have already triggered regular map growth.
// 溢出桶过多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
	// If the threshold is too low, we do extraneous work.
	// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
	// "too many" means (approximately) as many overflow buckets as regular buckets.
	// See incrnoverflow for more details.
	if B > 15 {
		B = 15
	}
	// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
	return noverflow >= uint16(1)<<(B&15)
}

虽然这里调用了 hashGrow() 函数,但它并不有真正的进行扩容,我们先看下它的源码

// src/runtime/map.go

func hashGrow(t *maptype, h *hmap) {
	// If we've hit the load factor, get bigger.
	// Otherwise, there are too many overflow buckets,
	// so keep the same number of buckets and "grow" laterally.
	// 申请2倍空间
	bigger := uint8(1)

	// 判断装载因子>6.5这个条件,否则就是由于overflow过多,此时申请的空间和原来的一样
	if !overLoadFactor(h.count+1, h.B) {
		bigger = 0
		h.flags |= sameSizeGrow
	}

	// 调整hmap中的桶的指针指向,将原来桶变为旧桶,新桶为新申请的数组
	oldbuckets := h.buckets
	newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

	// 重点了解下 &^ 位运算符,其作用是按位置0
	flags := h.flags &^ (iterator | oldIterator)
	if h.flags&iterator != 0 {
		flags |= oldIterator
	}
	// commit the grow (atomic wrt gc)
	// 提交grow操作
	h.B += bigger
	h.flags = flags
	h.oldbuckets = oldbuckets
	h.buckets = newbuckets
	h.nevacuate = 0 // 迁移进度
	h.noverflow = 0 // 溢出桶数量为0

	// 将扩展举出桶指令同样和新旧桶一样进行指针指向变更
	if h.extra != nil && h.extra.overflow != nil {
		// Promote current overflow buckets to the old generation.
		if h.extra.oldoverflow != nil {
			throw("oldoverflow is not nil")
		}
		h.extra.oldoverflow = h.extra.overflow
		h.extra.overflow = nil
	}
	if nextOverflow != nil {
		if h.extra == nil {
			h.extra = new(mapextra)
		}
		h.extra.nextOverflow = nextOverflow
	}

	// the actual copying of the hash table data is done incrementally
	// by growWork() and evacuate().
}

其中 hashGrow() 只是为扩容做的一些准备工作,而真正进行扩容的是 growWork()evacuate()函数。

函数 growWork() 只有在赋值删除的过程中会出现,所以也只有这两个动作才会触发扩容动作。

我们再看看growWrok()源码

// src/runtime/map.go

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// make sure we evacuate the oldbucket corresponding
	// to the bucket we're about to use
	// 确认迁移的老的bucket是我们正在使用的bucket
	evacuate(t, h, bucket&h.oldbucketmask())

	// evacuate one more oldbucket to make progress on growing
	// 如果还是在迁移中,注再迁移一个bucket
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

从代码里可以看到,先是迁移我们正在使用的bucket,如果还有 bucket 没有迁移完的话,就再迁移一个bucket,也就是说一次最多可迁移两个bucket。

这里 h.oldbucketmask()是计算oldbucket的位置的,实现原理我们上面介绍过的,计算hash值的最低B位来决定用哪个bucket。

// src/runtime/map.go

// oldbucketmask provides a mask that can be applied to calculate n % noldbuckets().
func (h *hmap) oldbucketmask() uintptr {
	return h.noldbuckets() - 1
}

下面我们重点关注 evacuate()函数,最复杂的工作就是由它来完成的。

// src/runtime/map.go

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
	// 计算 oldbucket 的地址
	b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
	newbit := h.noldbuckets()

	// 如果未迁移
	if !evacuated(b) {
		// TODO: reuse overflow buckets instead of using new ones, if there
		// is no iterator using the old buckets.  (If !oldIterator.)

		// xy contains the x and y (low and high) evacuation destinations.
		// 由于是两倍大小扩容,这里x和y分别代表新的两个bucket
		// evacDst 是迁移目的结构体,主要有四项数据,分别是要接收新数据的桶、值所在桶内的索引位置、map键和map值的指针地址
		var xy [2]evacDst

		// 获取目标 x bucket 的地址
		x := &xy[0]
		// 设置当前要迁移到的新桶相关信息
		x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
		x.k = add(unsafe.Pointer(x.b), dataOffset)
		x.e = add(x.k, bucketCnt*uintptr(t.keysize))

		// 如果不是等量扩容的话
		if !h.sameSizeGrow() {
			// Only calculate y pointers if we're growing bigger.
			// Otherwise GC can see bad pointers.
			// 获取目标 y bucket 的地址,和上面x一样
			y := &xy[1]
			y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
			y.k = add(unsafe.Pointer(y.b), dataOffset)
			y.e = add(y.k, bucketCnt*uintptr(t.keysize))
		}

		// 使用双层for循环进行数据迁移
		for ; b != nil; b = b.overflow(t) {
			k := add(unsafe.Pointer(b), dataOffset)
			e := add(k, bucketCnt*uintptr(t.keysize))
			for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
				top := b.tophash[i]
				// 如果当前tophash索引位置的元素标记为empty,即为emptyRest或emptyOne,说明根本就不有内容要迁移,可以直接标记为迁移完成状态 evacuatedEmpty
				if isEmpty(top) {
					// 设置旧桶位置为迁移完成标记 evacuatedEmpty
					b.tophash[i] = evacuatedEmpty
					continue
				}
				// 位置标记非法,好像永远也不会出现这个情况
				if top < minTopHash {
					throw("bad map state")
				}
				k2 := k
				if t.indirectkey() {
					k2 = *((*unsafe.Pointer)(k2))
				}
				var useY uint8
				if !h.sameSizeGrow() {
					// Compute hash to make our evacuation decision (whether we need
					// to send this key/elem to bucket x or bucket y).
					hash := t.hasher(k2, uintptr(h.hash0))

					// iterator!=0 说明有协程正在遍历当前map
					// !t.key.equal(k2,k2) 表示同一个key计算出为的hash值不一样
					if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
						// If key != key (NaNs), then the hash could be (and probably
						// will be) entirely different from the old hash. Moreover,
						// it isn't reproducible. Reproducibility is required in the
						// presence of iterators, as our evacuation decision must
						// match whatever decision the iterator made.
						// Fortunately, we have the freedom to send these keys either
						// way. Also, tophash is meaningless for these kinds of keys.
						// We let the low bit of tophash drive the evacuation decision.
						// We recompute a new random tophash for the next level so
						// these keys will get evenly distributed across all buckets
						// after multiple grows.
						useY = top & 1
						top = tophash(hash)
					} else {
						if hash&newbit != 0 {
							useY = 1
						}
					}
				}

				if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
					throw("bad evacuatedN")
				}

				b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
				dst := &xy[useY]                 // evacuation destination
				// 如果目标桶内元素索引值=8,即超出桶内最大元素个数8,则新建溢出桶,并将桶内cell索引值置为0
				if dst.i == bucketCnt {
					dst.b = h.newoverflow(t, dst.b)
					dst.i = 0
					dst.k = add(unsafe.Pointer(dst.b), dataOffset)
					dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
				}
				// 设置tophash值
				dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check

				// 进行数据迁移操作
				if t.indirectkey() {
					*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
				} else {
					typedmemmove(t.key, dst.k, k) // copy elem
				}
				if t.indirectelem() {
					*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
				} else {
					typedmemmove(t.elem, dst.e, e)
				}

				// 索引位置下移一位,即下次存储数据的cell位置
				dst.i++
				// These updates might push these pointers past the end of the
				// key or elem arrays.  That's ok, as we have the overflow pointer
				// at the end of the bucket to protect against pointing past the
				// end of the bucket.
				dst.k = add(dst.k, uintptr(t.keysize))
				dst.e = add(dst.e, uintptr(t.elemsize))
			}
		}
		// Unlink the overflow buckets & clear key/elem to help GC.
		// 如果oldbucket没有协程在使用的话,则帮助gc把bucket清除掉
		if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
			b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
			// Preserve b.tophash because the evacuation
			// state is maintained there.
			// 清除bucket 的 key,value 部分,保留 tophash 部分,指示搬迁状态
			ptr := add(b, dataOffset)
			n := uintptr(t.bucketsize) - dataOffset
			memclrHasPointers(ptr, n)
		}
	}

	// 如果当前迁移的bucket 就是当前的 oldbucket
	if oldbucket == h.nevacuate {
		advanceEvacuationMark(h, t, newbit)
	}
}

函数advanceEvacuationMark()源码

// src/runtime/map.go

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
	// 更新当前map的迁移进度
	h.nevacuate++
	// Experiments suggest that 1024 is overkill by at least an order of magnitude.
	// Put it in there as a safeguard anyway, to ensure O(1) behavior.
	// 一次标记1024个
	stop := h.nevacuate + 1024

	// 如果标记个数超出了实际oldbucket的人数,则使用oldbucket的数量值
	if stop > newbit {
		stop = newbit
	}
	// 循环标记bucket
	for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
		h.nevacuate++
	}
	// 当前迁移桶等于oldbucket的总数量,即迁移完毕
	if h.nevacuate == newbit { // newbit == # of oldbuckets
		// Growing is all done. Free old main bucket array.
		// 重置oldbuckets为nil,即表示oldbuckets 迁移完毕
		h.oldbuckets = nil
		
		// Can discard old overflow buckets as well.
		// If they are still referenced by an iterator,
		// then the iterator holds a pointers to the slice.
		// 如果h.extra != nil 说明保存的kv中没有指令,使用了slice存储,所以这里也需要同时清除oldoverflow
		if h.extra != nil {
			h.extra.oldoverflow = nil
		}
		h.flags &^= sameSizeGrow
	}
}

对于map 的扩容,主要分两种情况,一个是等量扩容,另一种是非等量扩容,什么意思呢?

所谓等量扩容是指原来是一个bucket,虽然扩容的时候创建了两个新的bucket,但使用的时候只使用了其中一个bucket,另一个没有使用,所以这种情况下的扩容是直接将原来oldbucket中的元素直接迁移到新 bucket中的x桶就可以了。

而非等量扩容就是将原来一个bucket 中的内容迁移到新的两个bucket中。

我们知道每个bucket 最多只有8个元素,迁移需要一个一个的迁移,每迁移完一个元素,都需要将当前cell作个标记,表示位置的元素已经迁移完成,对于等量扩容和非等量扩容两者是有些细微的差别的。我们先看一下等量迁移。

对于等量扩容来说,由于bucket是1对1的迁移,况且都迁移到了x桶,所以oldbucket中每个cell在迁移后被标记为evacuatedX,表示当前oldbucket中的元素全部被迁移到了x桶了。

csdn

经过等量扩容后,原来的overflow消失了,所有元素都放在了同一个bucket,节省资源的同时查询效率也提高了。是不是有点像widows 的磁盘碎片整理。

等量扩容后

而对于非等量扩容,bucket的关系就是1对2了,oldbucket 中的元素有可能经过rehash后,被迁移到了x桶,也有可能被迁移到了y 桶。

map非等量扩容

如果是被迁移到了y桶的话,则标记为evacuatedY,否则就是evacuatedX

如果oldbucket中的元素为空(emptyRestemptyOne),即未存储任何元素或者原来存储过后来删除了,则需要标记为evacuatedEmpty

经过迁移后,我们发现元素在oldbucket中的存储顺序,到新的bucket中发生了变化。如odlbucket里有8个元素(1-8),遍历打印的时候,会打印12345678。后来分成了两个bucket, 其中3/4/8 放在了x桶,1/2/5/6/7放在了y桶,那么这时遍历打印的话,则是遍历打印x桶,然后是y桶,最后打印34812567,与原来的顺序不一样了,这正是我们平时所说的对map的遍历并不保证一定的顺序的。

当然在源代码里起始位置也是随机定位的,先是随机选择一个bucket,然后再随机选择一个cell位置,开始往后遍历查找,当达到最后bucket的最后一个元素的时候再才第一个bucket查找,走到遍历到当前元素等于起始元素为止,或者官方是为了引起我们的注意故意这样设计的吧。所在就算你是硬编码写死的map不进行删除或添加,打印的也是随机的。

由于map内部是分bucket迁移的,每次最多迁移两个bucket,所以在遍历数据的时候有也些复杂,即可考虑新bucket的数据,也要考虑oldbucket中的数据。有些oldbucket中的数据被分两到两个新bucket中了,导致要考虑的情况要多一些。等有时间了再单独写写这一块的逻辑。

最后,当所有数据都迁移完成后,还需要将oldbucket清除,直接通过设置 h.oldbuckets为nil即可,记得还有一个mapextra.oldoverflow

在扩容过程中,有一段说明需要注意到

// If key != key (NaNs), then the hash could be (and probably
// will be) entirely different from the old hash. Moreover,
// it isn’t reproducible. Reproducibility is required in the
// presence of iterators, as our evacuation decision must
// match whatever decision the iterator made.
// Fortunately, we have the freedom to send these keys either
// way. Also, tophash is meaningless for these kinds of keys.
// We let the low bit of tophash drive the evacuation decision.
// We recompute a new random tophash for the next level so
// these keys will get evenly distributed across all buckets
// after multiple grows.

在进行hash计算的时候,有一种key,它每次经过hash计算的时候,得出的结果都不一样,这个key就是math.NaN(),表示Not a Number,类型是float64。一旦使用此key进行存储后,几乎无法正确读取出来值,只有在遍历的时候才会读取到此值。为了解决这个问题,官方采用了使用tophash的最低位来决定是放在x还是y,如果是0则放在x,否则放在y。

总结

1. map不是并发安全的,所以在源码里有大量的代码判断


if h.flags&hashWriting == 0 {
	throw("concurrent map writes")
}

在赋值时会设置map处于协程写的状态


// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.

// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.
h.flags ^= hashWriting

2. 遍历map是无序的,因为随着元素的添加和删除会进行扩容,元素位置会发生变化

参考

https://blog.csdn.net/u010853261/article/details/99699350
map 的底层实现原理是什么
https://studygolang.com/articles/22236
https://draveness.me/golang/docs/part2-foundation/ch03-datastructure/golang-hashmap/