channel是golang中特有的一种数据结构,通常与goroutine一起使用,下面我们就介绍一下这种数据结构。
channel数据结构
channel最重要的一个结构体就是hchan
,我们创建一个channel的时候,实际上是创建了一个下面结构体的实例。
hchan结构体
// src/runtime/chan.go type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
字段说明
qcount
当前channel中的元素数量dataqsiz
环形队列的大小buf
指向dataqsize
的数组指针,只有缓冲chan有效closed
当前channel关闭状态elemsize
存储元素的大小elemtype
存储元素的数据类型sendx
发送操作处理到的索引位置,最大值为数组buf的最大下标值recvx
接收操作处理到的索引位置,最大值为数组buf的最大下标值recvq
接收队列,双向链表,阻塞元素sendq
发送列队,双向链表,阻塞元素lock
锁,,用来保护sudog里的所的字段

其中elemsize
和 elemtype
表示存储数据的大小和类型;sendx
和recvx
是指向底层数据的索引位置,表示当前处理的进度位置;recvq
和sendq
是一个由双向链表实现的队列,它存储的内容是由于队列dataqsize
过小,而阻塞的数据。
每次进行发送数据和读取数据时都需要加锁。
waitq结构体
// src/runtime/chan.go type waitq struct { first *sudog last *sudog }
sudog结构体
// src/runtime/runtime2.go // sudog represents a g in a wait list, such as for sending/receiving // on a channel. // // sudog is necessary because the g ↔ synchronization object relation // is many-to-many. A g can be on many wait lists, so there may be // many sudogs for one g; and many gs may be waiting on the same // synchronization object, so there may be many sudogs for one object. // // sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
这里 sudog 实际上是对 goroutine
的一个封装,在等待列表中如sendq
和recvq
,一个sudog 就是一个goroutine。
sudogs
是通过一个特殊的池来分配的,通过acquireSudog()
和releaseSudog()
进行获取和释放。
sudog里的字段是由 hchan.lock
锁来进行保护。
channel 整体结构图

创建
// 无缓冲通道 ch1 := make(chan int) // 有缓冲通道 ch2 := make(chan int, 10)
通过编译可以发现channel的创建是由makechan()
函数来完成的。源码
// src/runtime/chan.go func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n") } return c }
函数返回的是一个指针类型,因此我们可以在函数中通过参数直接传递,不需要再转为指针使传递。
步骤
- 数据合法性检查,包括发送数据的类型和大小
- 根据不同场景分配内存,主要针对buf字段
a. 内存大小为0,注意这时c.buf 的值为c.raceaddr()
b. 元素不包含指针,一次性分配一段内存地址
c. 元素包含指针,分配内存 - 初始化其它字段
第一个参数 *chantype 结构定义
// src/runtime/type.go type chantype struct { typ _type elem *_type dir uintptr }
实际上创建一个channel, 只是对一个hchan结构体进行了一些初始化操作,并返回其指针。因此我们在函数传递时,不需要传递指针,直接使用就可以了,因为它本身就是一个指针的类型。
注意:对于chan内存是在heap上分配的。
发送数据
对于channel的写操作是由chansend()
函数来实现的。
对于分送数据chan有三种情况,分别是直接发送,缓存区发送
和 阻塞发送
,其中阻塞发送涉及到GMP 的调度,理解起来有些吃力。
在发送数据前需要进行加锁
操作,发送完再解锁
,保证原子性操作。
直接发送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ...... // 直接发送 // 如果接收队列中有接收者,则直接将数据发给接收者,重点在send()函数,并在函数里进行解锁 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ...... }
如果接收队列中有接收者,则优化从接收者从队列中取出来sg(sg := c.recvq.dequeue()
),然后再通过调用 send() 函数将数据发送给接收者即可。

在send()函数里,会执行一个回调函数主要用来进行解锁c.lock
。真正的发送操作是函数 sendDirect()
,通过memmove(dst, src, t.size)
将数据复制过去。
缓冲区发送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ...... // 缓冲区发送 // 接收者队列中没有接收者goroutine // 当前channel中的元素<队列的大小,有缓冲buffer未满的情况 // 将数据存放在sendx在buf数组中的索引位置,然后再将sendx索引+1 // 由于是一个循环数组,所以如果达到了dataqsize,则从0开始,同时个数+1 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } ...... }
如果当前recvq
队列里没有处于等待执行的sudog
的话,则需要将数据发送到缓冲队列中(如果当前队列为缓冲chan)。
假设当前buffer大小为6(dataqsiz=6
),数据个数为0(qcount=0
),这里写入6个数据,如下图。

如果当前缓冲区的元素数量<队列的大小,说明缓冲区还没有满,还可以继续装载数据。
这时第一步先计算出 s.sendx 索引位置的内存地址,然后调用 typememmove() 函数将qp复制到内存地址,再将s.sendx索引值+1,同时c.qcount++
。
当 sendx = dataqsiz
的时候,说明已到了数组最后一个元素,下次存储数据的话,则需要重新从0开始了,所以需要重置为0
。
buf
是一个由数组组成的队列,满足队列的FIFO
的机制,最新存储的数据也先消费,最多可以存储 dataqsiz
个数量。超出这个数据量就需要使用第三种 阻塞发送
方式了。
sendx
始终保存的是下次存储数据的数组索引位置,每次使用完记得+1
。每次存储以前都需要判断当前buffer是否有空间可用 c.qcount < c.dataqsiz
。
总结
q.sendx
最大值为c.dataqsiz -1
,即数组的最大索引值。q.count
是当前chan 存储的元素个数,有可能 >c.dataqsiz
阻塞发送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ...... // 阻塞发送 // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true ...... }
如果当buff也写满的话,再send数据的话,则需要进行阻塞发送了。

假如我们有一个缓冲chan,但缓冲大小已经使用完,再次发送数据的话,则需要进入sendq队列了(将sudog绑定到一个goroutine,并放在sendq,等待读取)
对于阻塞的情况,理解起来有些吃力,因为涉及到GMP的关系和调度。
- 调用 getg() 函数获取当前运行的goroutine
- 调用 acquireSudog() 函数获取一个sudog,并进行数据绑定
- 将mysg 添加到发送队列sendq,并设置为gp.waiting
- 更改goroutine状态
- 设置goroutine为等待唤醒状态,调用 atomic.Store8(&gp.parkingOnChan, 1)函数?
- 通过keepAlive()函数可以保证发送的值一直有效,直到被接收者取走
- 进行清理工作
- 释放 sudog 结构体
总结
- 阻塞发送并不会更新 c.qcount 数量个数
acquireSudog()
和 releaseSudog(mysg) 是配对一起使用。
读取数据
对于channel的读取方式:
v <- ch
v, ok <- ch
其中 v<-ch 对应的是runtime.chanrecv1()
, v, ok <-ch 对应的是runtime.chanrecv2()
。但这两个函数最终调用的还是同一个函数,即 chanrecv()
。
我们先看一下官方文档对这个函数的说明
// chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the caller's stack. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
- chanrecv 用来从chan 中接收数据,并将接收的数据写入到ep
- 如果ep为 nil 的话,则接收的数据将被忽略
- 如果非阻塞的且没有可接收的数据将返回 (false ,false)
- 如果chan已关闭,零值 *ep 和返回值将是true, false,否则使用一个元素代替*ep并返回 (true, true)
- 一个非nil的 ep, 必须指向heap或者调用stack
// src/runtime/chan.go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 如果c为nil,表示非法操作,则直接gopark(),表示出让当前GMP中的P的使用权,允许其它G使用 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } ... // 如果chan已关闭且元素个数为0 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { // 设置内存内容为类型 c.elemtype 的零值 typedmemclr(c.elemtype, ep) } return true, false } }
如果当前读取的 chan 为nil的话,则会产生死锁,最终提示
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive (nil chan)]:
同时出让自己占用的P,允许其它goroutine抢占使用。
如果读取的chan已关闭,则读取出来的值为零值(函数说明第四条)。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // Fast path: check for failed non-blocking operation without acquiring the lock. // 在没有获取锁的情况下,检查非阻塞操作失败 if !block && empty(c) { // After observing that the channel is not ready for receiving, we observe whether the // channel is closed. // // Reordering of these checks could lead to incorrect behavior when racing with a close. // For example, if the channel was open and not empty, was closed, and then drained, // reordered reads could incorrectly indicate "open and empty". To prevent reordering, // we use atomic loads for both checks, and rely on emptying and closing to happen in // separate critical sections under the same lock. This assumption fails when closing // an unbuffered channel with a blocked send, but that is an error condition anyway. // 如果当前chan未关闭 if atomic.Load(&c.closed) == 0 { // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. return } // The channel is irreversibly closed. Re-check whether the channel has any pending data // to receive, which could have arrived between the empty and closed checks above. // Sequential consistency is also required here, when racing with such a send. if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } ... }
这段代码主要是对重排读的情况,进行了双重检测,暂是未明白code中考虑的情况,改天再消化消化。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 加锁,下面才是真正要读取的逻辑 lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } ... }
读取之前先加锁。
对chan的读取与发送一样,同样有三种方式,为直接读取、缓冲区读取和阻塞读取。
直接读取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 直接读取 // 从c.sendq队列中取sudog, 将数据复制到sg if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } }
获取一个待发送者,如果buffer大小为0,则直接从发送者接收数据。否则从队列头部接收,并将发送者发送的数据放在队列尾部。

从c.sendq队列里读取一个 *sudog,通过调用 recv()
函数,将数据从发送者复制到ep中,并返回true,true,表示读取成功。真正读取函数为 recvDirect()
。
缓冲区读取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 如果c.qcount>0,说明缓冲区有元素可直接读取 if c.qcount > 0 { // Receive directly from queue // 直接从队列中读取 qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } }
如果c.qcount > 0
,则说明缓冲区里有内容可以读取。则
直接获取 c.recvx
数组索引位置的内存地址,则
- 将
r.recvx
索引地址的值读取出来复制给 ep, - 然后更新接收数组索引
c.recvx++
, 如果>数组索引最大索引值 ,重置为0 - 减少元素个数
- 释放锁 c.qcount–
- 最后unlock返回。

阻塞读取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ...... // c.sendq没有sender,buffer里也是空的,直接阻塞读取 // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
- 通过getg()获取一个goroutine
- 获取一个sudog结构体
- 绑定两者关系
- 加入 c.recvq 队列
- 设置goroutine为等待唤醒状态
- 清理状态

关闭chan
关闭chan语句
close(ch)
对于已关闭的chan,是不允许再次关闭的,否则会产生panic。对应的函数为 runtime.closechan()
。
// src/runtime/chan.go func closechan(c *hchan) { // 如果chan未初始化,触发panic if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) // 关闭已关闭的chan,触发panicc if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } ...... }
对于一个未初始化的chan,或者已关闭的chan,如果再次关闭则会触发panic。
func closechan(c *hchan) { ...... // 设置chan关闭状态 c.closed = 1 // 声明一个结构体链表gList,主要用来调度使用 var glist gList // release all readers // 释放所有readers for { sg := c.recvq.dequeue() if sg == nil { break } // 设置元素为nil if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) // 释放所有writers,会引起panic,见下面说明 for { sg := c.sendq.dequeue() if sg == nil { break } // 设置元素为nil sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // 释放锁 unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 调度所有g for !glist.empty() { gp := glist.pop() gp.schedlink = 0 // 唤醒goroutine goready(gp, 3) } }
- 声明一个
gList
链表结构体 - 将接收队列
c.recvq
中的所有元素添加到gList
中,并将原来的值设置为零
值 - 将发送队列
c.sendq
中的所有元素添加到gList
中,并将原来的值设置为零
值 - 将所有的阻塞goroutine通过函数
goready()
进行调度
文章里提到在对c.sendq
处理的时候可能会触发panic。这是因为关闭chan后,执行了 goready()
对原来sendq里的sudogs 进行了进行了重新调度,这时候发现chan已经关闭了,所以会panic。那么又是如何调度的呢?
package main import ( "fmt" "time" ) var ch chan int func f() { } func main() { ch := make(chan int, 10) // buffer大小为10,这里发送11个,使最后一个进入到c.sendq里面 for i := 0; i < 11; i++ { // i < 10 则正常 go func(v int) { ch <- v }(i) } time.Sleep(time.Second) fmt.Println("发送完毕") // 关闭chan,将对sendq里的g进行唤醒,唤醒后发现chan关闭状态,直接panic close(ch) for v := range ch { fmt.Println(v) } time.Sleep(time.Second) }
有一条广泛流传的关闭 channel 的原则:
don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。对于只有一个sender的话,直接在sender端关闭就可以。但对于多个sender的话,则需要通过一个信号量进行关闭,参考这里。
总结
close 操作会触发goroutine的调度行为。
总结
- 对chan 发送或接收数据的时候要保证已初始化状态
- 对于已关闭的chan再次关闭会触发panic
- 对于发送和读取数据都有三种处理情况,分别是直接读写,缓存区读写和阻塞读写
- 发送和接收数据的本质上是对值的
复制
操作。All transfer of value on the go channels happens with the copy of value.
- close(ch)会触发goroutine 的调度行为
- 内部使用 sudogs对goroutine进行了一次封装。
- 如果buffer中的元素无法保证消费完的话,则会产生内存泄漏的危险,这时gc是无法对这些元素时间清理的,过多的 chan就会占用大量的资源
- 对于chan的分配的内存是在哪里,heap还是stack?