1、map的基本结构本文涉及到的源码版本为Go SDK 1.16.1
map是Golang中的一种常用数据结构,其本质上是一种哈希表,类似于 java 的 HashMap以及Python的字典(dict),是一种存储键值对(Key-Value)的数据结构。一般的Map会包含两个主要结构:
- 数组:数组里的值指向一个链表
- 链表:目的解决hash冲突的问题,并存放键值
而在Golang中,解决hash冲突的不是链表,而是数组(既内存中的连续空间),而且使用了两个数组分别存放键和值,如下图所示。其中提到了map中的两个核心的结构体:hmap和bmap,每个map的底层数据结构是hmap,而每个hamp是有若干个bmap组成的bucker数组。hmap的结构体定义如下(src/runtime/map.go):
type hmap struct {
count int //元素个数
flags uint8
B uint8 // 扩容相关字段, B是buckets数组的长度的对数2^B
noverflow uint16 // 溢出的bucket个数,一个bmap中最多存8组键值对,存满了就往指向的这个bmap里存
hash0 uint32 // hash因子
buckets unsafe.Pointer // buckets 数组指针
oldbuckets unsafe.Pointer // 扩容时,存放之前的buckets(Map扩容相关字段)
nevacuate uintptr // 分流次数,成倍扩容分流操作计数的字段(Map扩容相关字段)
extra *mapextra // 用于扩容的指针
}
type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
nextOverflow *bmap
}
而bmap
// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8 //长度为8的数组
}
//编译时创建的新bamp结构
type bmap struct {
topbits [8]uint8 //长度为8的数组,[]uint8,存储的是key获取的hash的高8位,遍历时对比使用,提高性能。
keys [8]keytype //长度为8的数组,存储具体的key值。
values [8]valuetype //长度为8的数组,存储具体的value值
pad uintptr //指向的hmap.extra.overflow溢出桶里的bmap,上面的字段topbits、keys、elems长度为8,最多存8组键值对,存满了就往指向的这个bmap里存
overflow uintptr //对齐内存使用的,不是每个bmap都有会这个字段,需要满足一定条件
}
hmapbmap
2、负载因子
负载因子是衡量Hash表中空间利用率的指标,负载因子= hash表中已存储的键值对的总数量/hash桶的个数(即hmap结构中buckets数组的个数)。我们知道负载因子决定了Hash表的存储效率:负载因子过小会导致空间利用率较低;而负载因子较大则会导致Hash表的插入和查找频繁发生碰撞,导致Hash表退化成链表,触发扩容。在Golang中负载因子被设定为6.5,为什么设置为6.5而不是其他的数值呢,Golang官方给出了一组测试数据:
从表中我们能够看到当负载因子设置为6.5时,溢出率(%overflow),每个键值对占用字节比(bytes/entry)和查找次数取得一定平衡 。当在Golang中调用make(map[k]v,hint)初始化map时,会根据初始化元素个数以及负载因子来决定创建的buckets个数,核心代码如下所示:
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
// initialize Hmap
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand() //初始化hash因子
B := uint8(0)//找到合适的B值
for overLoadFactor(hint, B) {
B++
}
h.B = B
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
//根据负载因子和初始化map数量定义B,buckets数量=2^B
func overLoadFactor(count int, B uint8) bool {
//loadFactorNum=13, loadfactorDen=2
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
func bucketShift(b uint8) uintptr {
return uintptr(1) << (b & (sys.PtrSize*8 - 1))
}
fastrandmakeBucketArrayB2^(B-4)
3、扩容
随着Hash表中插入的数值越来越多,发生碰撞的概率也逐渐增大。如果当前装载因子超过6.5的阈值时就会触发扩容操作,那么是否仅在这种场景下才触发扩容呢?其实并不是,当在向 map 插入新 key 的时候,会进行条件检测,符合下面这 2 个条件,会触发扩容:
1、装载因子超过阈值,源码里定义的阈值是 6.5
2、overflow 的 bucket 数量过多
第一种场景很好理解,在上一节中我们已经知道了装载因子的重要性。而第二种场景为什么会触发扩容呢?这是因为不断地插入删除操作导致buckets数量过多,而导致存储元素的密度很低,同样会导致Hash表的插入和查找效率降低,此时也需要进行扩容操作,将数据从老的bucket移至新的bucket,提高数据存储密度,提高查询效率。
与扩容相关的函数包括hashGrow()和growWork(),hashGrow()中会开辟新的buckets空间,并将当前buckets挂载到老的buckets上。在growWrok()中会进行实际的bucket搬迁,而map在插入或修改、删除 key 的时候,都会尝试进行搬迁 buckets 的工作。从代码中我们能看到buckets的搬迁并非一次性完成的,而是选择了一次最多“搬迁”两个的渐进方式。
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1) //增量扩容
if !overLoadFactor(h.count+1, h.B) { //等量扩容
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets // 将buckets挂载到老的buckets上
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)//开辟新的buckets空间
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
//......
}
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 搬迁正在使用的旧 bucket
evacuate(t, h, bucket&h.oldbucketmask())//evacuate函数太长了,不在此展开
// 再搬迁一个 bucket,以加快搬迁进程
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}
4、遍历
当我们利用range去遍历map时会发现,每次遍历得到的顺序都不相同,如利用下面的代码遍历map三次,得到的结果均不相同。
var mapTest = map[string]int{
"a": 1, "b": 2, "c": 3, "d": 4, "e": 5, "f": 6,
}
for i := 0; i < 3; i++ {
for key := range mapTest {
fmt.Print(key + " ")
}
fmt.Println()
}
/* 遍历结果
b c d e f a
a b c d e f
c d e f a b
*/
这是因为当我们在遍历 map 时,并不是固定地从 0 号 bucket 开始遍历,每次都是从一个随机值序号的 bucket 开始遍历,并且是从这个 bucket 的一个随机序号的 cell 开始遍历。 mapiterinit是遍历时选取初始桶的函数,源码如下。
func mapiterinit(t *maptype, h *hmap, it *hiter) {
//......
it.t = t
it.h = h
it.B = h.B
it.buckets = h.buckets
if t.bucket.ptrdata == 0 {
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
it.startBucket = r & bucketMask(h.B)
it.offset = uint8(r >> h.B & (bucketCnt - 1))
it.bucket = it.startBucket
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
mapiternext(it)
}
代码里通过调用fastrand()生成随机数,决定遍历的起始位置,导致每次遍历得到的结果都不相同。如果想要每次遍历都得到相同的结果,可以先将map中的key拷贝到数组中,再遍历数组。
5、并发与sync.mapmap并非是线程安全的,如果在map中发生了并发的读写情况就会导致panic,这一点我们从map的初始化函数mapassign中能够得知。为了解决map的并发使用问题,Golang在不同的版本中给出了不同的解决方案。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
//判断 hmap 是否已经初始化(是否为 nil)
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
//...
//判断是否并发读写 map,若是则抛出异常
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
//根据 key 的不同类型调用不同的 hash 方法计算得出 hash 值
alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0))
//设置 flags 标志位,表示有一个 goroutine 正在写入数据。因为 alg.hash 有可能出现 panic 导致异常
h.flags |= hashWriting
//判断 buckets 是否为 nil,若是则调用 newobject 根据当前 bucket 大小进行分配
//初始化时没有初始 buckets,那么它在第一次赋值时就会对 buckets 分配
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
}
在Golang1.9之前,Go官方在那个时候给出了一个简单的解决方案,就是加锁。设置一个struct,嵌入一个读写锁和一个map,在读写场景下通过加锁保证数据安全性。
var counter = struct{
sync.RWMutex
m map[string]int
}{m: make(map[string]int)}
Go1.9之后加入了支持并发安全的Map sync.Map, sync.Map 通过一份只使用原子操作的数据和一份冗余了只读数据的加锁数据实现一定程度上的读写分离,使得大多数读操作和更新操作是原子操作,写入新数据才加锁的方式来提升性能。先看一下底层的数据结构:
// 封装的线程安全的map
type Map struct {
// lock
mu Mutex
// 实际是readOnly这个结构
// 一个只读的数据结构,因为只读,所以不会有读写冲突。
// readOnly包含了map的一部分数据,用于并发安全的访问。(冗余,内存换性能)
// 访问这一部分不需要锁。
read atomic.Value // readOnly
// dirty数据包含当前的map包含的entries,它包含最新的entries(包括read中未删除的数据,虽有冗余,但是提升dirty字段为read的时候非常快,不用一个一个的复制,而是直接将这个数据结构作为read字段的一部分),有些数据还可能没有移动到read字段中。
// 对于dirty的操作需要加锁,因为对它的操作可能会有读写竞争。
// 当dirty为空的时候, 比如初始化或者刚提升完,下一次的写操作会复制read字段中未删除的数据到这个数据中。
dirty map[interface{}]*entry
// 当从Map中读取entry的时候,如果read中不包含这个entry,会尝试从dirty中读取,这个时候会将misses加一,
// 当misses累积到 dirty的长度的时候, 就会将dirty提升为read,避免从dirty中miss太多次。因为操作dirty需要加锁。
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
// 如果Map.dirty有些数据不在m中,这个值为true
amended bool
}
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
// *interface{}
p unsafe.Pointer
}
(1)读写分离
并发访问map读的主要问题其实是在扩容的时候,可能会导致元素被hash到其他的地址,那如果我的读的map不会进行扩容操作,就可以进行并发安全的访问了,而sync.map里面正是采用了这种方式,对增加元素通过dirty来进行保存。通过read只读和dirty写map将操作分离,其实就只需要通过原子指令对read map来进行读操作而不需要加锁了,从而提高读的性能。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 只读数据中没有,并且dirty有比read多的数据,加锁在dirty中找
if !ok && read.amended {
m.mu.Lock()
// 双检查, 因为上锁之前的语句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 只读中没有读取到的次数+1
e, ok = m.dirty[key]
// 检查是否达到触发dirty升级read的条件
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// atomic.Load 但被标记为删除的会返回nil
return e.load()
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
(2)写加锁与延迟提升
上面提到增加元素操作可能会先增加到dirty写map中,那针对多个goroutine同时写,其实就需要进行Mutex加锁了。read只读map和dirty写map, 那就会有个问题,默认增加元素都放在dirty中,那后续访问新的元素如果都通过 mutex加锁,那read只读map就失去意义,sync.Map中采用一直延迟提升的策略,进行批量将当前map中的所有元素都提升到read只读map中从而为后续的读访问提供无锁支持。
func (m *Map) Store(key, value interface{}) {
// 先不上锁,而是从只读数据中按key读取, 如果已存在以compareAndSwap操作进行覆盖(update)
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
// 双检查获取read
read, _ = m.read.Load().(readOnly)
// 如果data在read中,更新entry
if e, ok := read.m[key]; ok {
// 如果原子操作读到的数据是被标记删除的, 则视为新数据写入dirty
if e.unexpungeLocked() {
m.dirty[key] = e
}
// 原子操作写新数据
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 原子操作写新数据
e.storeLocked(&value)
} else {
// 新数据
// 当dirty中没有新数据时,将read中数据冗余到dirty
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
// 在dirty中没有比read多出的新数据时触发冗余
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 检查entry是否被删除, 被删除的数据不冗余
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
// 将被删除(置nil)的数据以cas原子操作标记为expunged(防止因并发情况下其他操作导致冗余进dirty的数据不正确)
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
(3)惰性删除
惰性删除是并发设计中一中常见的设计,比如删除某个个链表元素,如果要删除则需要修改前后元素的指针,而采用惰性删除,则通常只需要给某个标志位设定为删除,然后在后续修改中再进行操作,sync.Map中也采用这种方式,通过给指针指向某个标识删除的指针,从而实现惰性删除。
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 只读中不存在需要到dirty中去删除
if !ok && read.amended {
m.mu.Lock()
// 双检查, 因为上锁之前的语句是非原子性的
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}