1 环形缓冲区

1.1 环形缓冲区结构

       环形缓冲区通常有一个读指针和一个写指针。读指针指向环形缓冲区中可读的数据,写指针指向环形缓冲区中可写的缓冲区。通过移动读指针和写指针就可以实现缓冲区的数据读取和写入。在通常情况下,环形缓冲区的读用户仅仅会影响读指针,而写用户仅仅会影响写指针。如果仅仅有一个读用户和一个写用户,那么不需要添加互斥保护机制就可以保证数据的正确性。如果有多个读写用户访问环形缓冲区,那么必须添加互斥保护机制来确保多个用户互斥访问环形缓冲区。

1.2 环形缓冲区一种读写实现机制

一般的,圆形缓冲区需要4个指针

  • 在内存中实际开始位置;

  • 在内存中实际结束位置,也可以用缓冲区长度代替;

  • 存储在缓冲区中的有效数据的开始位置(读指针);

  • 存储在缓冲区中的有效数据的结尾位置(写指针)。

缓冲区是满、或是空,都有可能出现读指针与写指针指向同一位置:

缓冲区中总是有一个存储单元保持未使用状态。缓冲区最多存入(size-1)个数据。如果读写指针指向同一位置,则缓冲区为空。如果写指针位于读指针的相邻后一个位置,则缓冲区为满。

 

 

 

2 chan内部数据结构

2.1 chan的数据结构

chan实质是个环形缓冲区,外加一个接受者协程队列和一个发送者协程队列

bufbuf

2.2 有缓冲区和无缓冲区chan的区别

2.2.1 无缓冲chan数据同步过程和sudog结构

sudogsendqsendqmemmovesudogsudog

2.2.1 有缓冲chan

有缓冲chan实质是使用了完整的环形缓冲区,只要缓冲区有空闲,则发送者无需进入等待队列,直接将数据放入环形缓冲区中,如果缓冲区有数据,接受者无需进入等待队列,直接从环形缓冲区中获取数据。

3 关键源码分析

3.1 chan数据结构源码

type hchan struct {qcount   uint           // total data in the queuedataqsiz uint           // size of the circular queuebuf      unsafe.Pointer // points to an array of dataqsiz elementselemsize uint16closed   uint32elemtype *_type // element typesendx    uint   // send indexrecvx    uint   // receive indexrecvq    waitq  // list of recv waiterssendq    waitq  // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}

3.2 sudog数据结构源码

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this for sudogs involved in channel ops.g *g// isSelect indicates g is participating in a select, so// g.selectDone must be CAS'd to win the wake-up race.isSelect boolnext     *sudogprev     *sudogelem     unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// For channels, waitlink is only accessed by g.// For semaphores, all fields (including the ones above)// are only accessed when holding a semaRoot lock.acquiretime int64releasetime int64ticket      uint32parent      *sudog // semaRoot binary treewaitlink    *sudog // g.waiting list or semaRootwaittail    *sudog // semaRootc           *hchan // channel
}

3.3 chan的构造过程

func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}

可以看到,如果不传入size或size=0,则没有为环形缓冲区分配内存,职位chan结构分配内存

3.4 无缓冲收发

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {// src is on our stack, dst is a slot on another stack.// Once we read sg.elem out of sg, it will no longer// be updated if the destination's stack gets copied (shrunk).// So make sure that no preemption points can happen between read & use.dst := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)// No need for cgo write barrier checks because dst is always// Go memory.memmove(dst, src, t.size)
}func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {// dst is on our stack or the heap, src is on another stack.// The channel is locked, so src will not move during this// operation.src := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)memmove(dst, src, t.size)
}

 

参考:

[译] Go语言的有缓冲channel和无缓冲channel

图解Go的channel底层原理

Go channel 实现原理分析