前言
map 又被称之为字典或哈希表,是除数组外最常见的数据结构,用于表示键值对之间映射关系。和切片相比,map 类型的内部实现要更加复杂,Go 运行时使用一张哈希表来实现抽象的 map 类型,哈希表有着 O(1) 的平均查找效率,因此 map 数据结构在编程中使用频率极高。
上篇文章:map 基本使用 讲述了 map 数据结构相关的操作,本文将从底层角度展开,结合源码探索 map 的底层实现原理。文章有点长,做好心里准备,让我们开启 golang 学习第三课:map 底层实现源码解析。
注:本文依据 go 版本 1.20.7 源码进行讲解,源码位置:src/runtime/map.go、
代码中保留了部分官方英文注释,并对每一行代码进行了详细注释;特别的地方会重点进行强调和讲解。
1.map 底层数据结构
runtime.hmapbuckets unsafe.Pointerkey,value
我们先通过一张图,从整体上了解一下结构体之间的指向关系,然后对其进行详细的分析与讲解:
1.1 hmap 数据结构
从图中我们可以看到,hmap 结构体用于表示 map(它是 hashmap 的缩写),hmap 类型是 map 类型的头部结构(header),它存储了后续 map 类型操作所需的所有信息。下边展示了源码中 hmap 的详细结构。
// A header for a Go map.
type hmap struct {
// Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
// Make sure this stays in sync with the compiler's definition.
count int // map 的元素数量。调用 len(map) 的时候返回此值
flags uint8 // map 当前状态的标志位,目前定义了 4 中状态:iterator/oldIterator/hashWriting/sameSizeGrow
B uint8 // 当前哈希表持有的 buckets(存储桶) 的数量(2^B 是 bucket 的数量,bucket 数组的长度)
noverflow uint16 // 溢出桶的大致数量,溢出桶较少时为精确值
hash0 uint32 // 哈希种子,计算 key 的哈希的时候会传入哈希函数
buckets unsafe.Pointer // 指针指向 2^B Buckets 数组。当 count==0 时,可能为 nil
// 仅仅在扩容期间不为 nil,非等量扩容期间为 buckets 的一半大小,等量扩容与 buckets 等大
oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
nevacuate uintptr // 迁移进度计数器,小于此下标的 buckets 都已迁移完成 // progress counter for evacuation (buckets less than this have been evacuated)
extra *mapextra // optional fields 可选字段,用于防止 k-v 不是指针情况下,溢出桶被 GC
}
const(
// flags
// map 迭代器允许并行,在迭代过程中,有可能使用 buckets 或 oldbuckets
iterator = 1 // 有迭代器可能正在使用 buckets; there may be an iterator using buckets
oldIterator = 2 // 有迭代器可能正在使用 oldbuckets; there may be an iterator using oldbuckets
hashWriting = 4 // 有协程在写 map,用于限制并发读写;a goroutine is writing to the map
sameSizeGrow = 8 // 等量扩容;the current map growth is to a new map of the same size
)
// mapextra holds fields that are not present on all maps.
type mapextra struct {
// 源码注释
// If both key and elem do not contain pointers and are inline, then we mark bucket
// type as containing no pointers. This avoids scanning such maps.
// However, bmap.overflow is a pointer. In order to keep overflow buckets
// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
// overflow and oldoverflow are only used if key and elem do not contain pointers.
// overflow contains overflow buckets for hmap.buckets.
// oldoverflow contains overflow buckets for hmap.oldbuckets.
// The indirection allows to store a pointer to the slice in hiter.
overflow *[]*bmap // 包含 hmap.buckets 的溢出桶
oldoverflow *[]*bmap // 包含 hmap.oldbuckets 的溢出桶
// nextOverflow holds a pointer to a free overflow bucket.
// nextOverflow 持有指向空闲溢出桶的指针(map 初始化的时候,预分配出来的,用于提升 map 后续创建溢出桶的速度)。
nextOverflow *bmap
}
- flags 为 hmap 的标志位,共有四位 1111 表示了 map 的四种状态
- iterator:有迭代器可能正在使用 buckets
- oldIterator:有迭代器可能正在使用 oldbuckets;
- hashWriting:有协程在写 map,用于限制并发读写
- sameSizeGrow:map 正在进行等量扩容
- mapextra 字段
mapextra 字段设计思路很有意思,这里值得详细介绍一下,后续源码解析中也有讲解。
首先你得提前了解一下 golang 的 GC 机制,方便理解该字段的内容。垃圾回收机制会回收那些没有被引用到可回收的内存。
当 GC 扫描到 hmap 结构体的变量时,不仅需要扫描 hmap 内部的字段,还需要扫描整个 buckets 数组,当 map 存储大量 k-v 时,将会耗费大量的性能用于 GC 扫描,影响性能。这个时候设计 map 的工程师就在想了,如果 map 存储的 key、value 都不包含指针,能不能避免 GC 扫描 buckets 数组。
来看一下官方对 mapextra 字段的注释内容:
If both key and elem do not contain pointers and are inline, then we mark bucket type as containing no pointers. This avoids scanning such maps.
However, bmap.overflow is a pointer.
In order to keep overflow buckets alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
overflow and oldoverflow are only used if key and elem do not contain pointers.
overflow contains overflow buckets for hmap.buckets.
oldoverflow contains overflow buckets for hmap.oldbuckets.
The indirection allows to store a pointer to the slice in hiter.
这里先对官方注释做一个翻译:
如果 key 和 eiem 都不包含指针,会把 bucket 的存储类型标记为不含指针,这将避免扫描这样的 maps。
然而 bmap.overflow 是一个指针。
为了保持溢出桶存活,我们在 hmap.extra.overflow 和 hmap.extra.oldoverflow 变量中存储了指针指向所有的溢出桶。
overflow 和 oldoverflow 字段仅仅只用在 k-v 不包含指针的情况下。overflow 字段包含了 hmap.buckets 的溢出桶。oldoverflow 字段包含了 hmap.oldbuckets 的溢出桶。
间接允许在 hiter 中存储指向切片的指针。
如果没有读过 map 源码,一定不知道官方注释在说什么,这里对官方注释内容做一个讲解:
第一句:如果 key 和 value 都不包含指针,会把 bucket 的存储类型标记为不含指针,这将避免扫描这样的 maps。
讲解:如果 map 的 key 和 value 都不包含指针,会把 map 的存储类型 maptype 标记为不含指针(使用 maptype.bucket.ptrdata == 0 进行判断,后续会有源码解读 ptrdata 字段)。这将避免扫描整个 map(触发扫描的过程就是 GC)。
第二句:然而 bmap.overflow 是一个指针。
讲解:然而 bmap.overflow 字段是指针,这将破坏 bmap 中没有指针这一设想(虽然 k-v 不是指针,但 overflow 还是指针,无法避免 GC 扫描 bmap)。
第三句:为了保持溢出桶存活,我们在 hmap.extra.overflow 和 hmap.extra.oldoverflow 变量中存储了指针指向所有的溢出桶。
讲解:为了保持溢出桶的活跃状态(也就是不被 GC 认为可回收),我们在 hmap.extra.overflow 和 hmap.extra.oldoverflow 变量中存储了指针指向所有的溢出桶。从这句注释,我们大概知道了 extra 字段的作用是为了保活溢出桶而存在的,到这里 extra 中字段的意思解释清楚了,官方注释也戛然而止了,我相信你们和我产生了同样的疑问:你第二句没告诉我咋解决 bmap.overflow 是指针的问题啊!!!
很明显第二句和第三句注释之间是断层的,隐藏了一些东西,官方没有直接说出来,这里把缺失的部分补充回来:bmap.overflow 指针字段该如何解决?
首先得理解一下,为什么 bmap.overflow 字段要用指针类型?我们都知道,map 溢出桶是通过链表结构维护的,需要指针指向下一个溢出桶,所以用指针很正常,且指针可以用来对溢出桶保活。
**那有办法不用指针类型吗?**当然可以,golang 在编译期间会把 k-v 不是指针的 map 中的 bmap.overflow 字段优化为 unitptr 类型。uintptr 是数值类型,非指针类型,用这个存储指针是无法保护对象的(扫描的时候 uintptr 指向的对象不会被扫描,意味着会被 GC)。
这样就会带来一个新问题,在使用 map 的过程中,溢出桶链表会被 GC 回收掉,这是不能接受的,此时 hmap.extra 的作用就彰显出来了。
第四句:overflow 和 oldoverflow 字段仅仅只用在 k-v 不包含指针的情况下。overflow 字段包含了 hmap.buckets 的溢出桶。oldoverflow 字段包含了 hmap.oldbuckets 的溢出桶。
讲解:从第三句+第四句注释,我们可以知道溢出桶的保活任务交给了 hmap.extra.overflow 和 hmap.extra.oldoverflow 两个 slice 变量;overflow 包含了 hmap.buckets 的所有溢出桶,oldoverflow 包含了 hmap.oldbuckets 的所有溢出桶,这样扩容的时候,也可以进行保活。
最后一句:间接允许在 hiter 中存储指向切片的指针。
讲解:hiter 是 map 迭代器的数据结构,在遍历 map 的过程中,会使用 hiter 迭代器;此时迭代器保存了 map 中的快照数据,如果 hiter 指向了oldbuckets ,当 hmap 把 oldbuckets 迁移完毕时,会把字段 h.oldbuckets = nil,有迭代器存在时,虽然不清理 oldbuckets 的内存,但溢出桶却失去了保活的变量;而在 hiter 遍历 oldbuckets 过程中也是不允许溢出桶被 GC 回收的,所以迭代器需要对溢出桶进行保活,也就是官方的这句注释的解释了,保活的字段为 hiter.overflow 和 hiter.oldoverflow。
在 Go 中,这种避免 GC 回收的保活手段在源码里是经常被使用的,对理解源码还是十分必要的。
1.2 bmap 数据结构
buckets 数组中存储是一个指针,最终它指向的是一个结构体:bmap。
// A bucket for a Go map.
type bmap struct {
// tophash generally contains the top byte of the hash value
// for each key in this bucket. If tophash[0] < minTopHash,
// tophash[0] is a bucket evacuation state instead.
tophash [bucketCnt]uint8
// Followed by bucketCnt keys and then bucketCnt elems.
// NOTE: packing all the keys together and then all the elems together makes the
// code a bit more complicated than alternating key/elem/key/elem/... but it allows
// us to eliminate padding which would be needed for, e.g., map[int64]int8.
// Followed by an overflow pointer.
}
通过官方注释也可以知道,bmap 不止包含 tophash 这一个字段,其实在编译期间会给它加料,动态的创建一个新的结构,如下:
type bmap struct {
// tophash 通常包含此 bucket 中每个 key 的哈希值的最高的 8 位(1 个字节)
// 如果 tophash[0] < minTopHash,则 tophash[0] 为桶已迁移状态。
tophash [bucketCnt]uint8
keys [bucketCnt]keytype // 8个key
values [bucketCnt]valuetype // 8个value
padding uintptr // 填充字段,用于内存对齐,经常会用到,刚好一个字节大小
overflow uintptr // 指向溢出桶链表 overflow 类型由编译器根据情况而定
}
// tophash 的标志位
const (
emptyRest = 0 // 这个 cell 是空的,并且在更高的索引或溢出处没有更多的非空 cell。
emptyOne = 1 // 这个 cell 是空的
evacuatedX = 2 // key/elem 有效。 entry 已被迁移到较大的哈希表的前半部分(扩容了)。
evacuatedY = 3 // 同上,但迁移到大的哈希表的后半部分(扩容了)。
evacuatedEmpty = 4 // cell 是空的,bucket 已经被迁移了
minTopHash = 5 // 正常填充 cell 的最小 tophash。
)
// bucketCnt = 8,一个 bucket 可以存储 8 个 k-v
const(
// Maximum number of key/elem pairs a bucket can hold.
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
)
bmap 是存放 k-v 的地方,也被成为 bucket(桶),我们具体看下 bucket 的内部组成。
上图就是 bucket 的内存模型,从上到下分别为:tophashs 区域、keys 区域、values 区域 和 overflow,我们先简单分析一下它们存储的内容:
- tophash 区域
因为一个 bucket 只能存储 8 个元素,所以 tophashs 区域是一个长度为 8 的数组。tophash 通常包含此 bucket 中每个 key 的哈希值的最高的 8 位(刚好是 1 个字节),如果 tophash[0] < minTopHash,则 tophash[0] 含义转变为桶已迁移状态。因为 tophash 可能有不同的含义冲突,所以需要在 key 的哈希值的最高的 8 位 < minTopHash 时,为其 tophash + minTopHash 避免冲突(后续有详细代码分析)。
- key、value 区域
因为 key 和 value 总是成对出现,这里就一起分析了,通过命名想必大家也能看出来,key - value 就是 map 中存储的键值对。仔细观察图我们可以注意到 key 和 value 是各自放在一起的,并不是以 key/value/key/value/… 这样的形式存储的。源码里说明这样的好处是在某些情况下可以省略掉 padding 字段,节省内存空间。
map[int64]int8
- overflow
每个 bucket 中存储的是 Hash 值低 hmap.B 位数值相同的元素(和取余类似,这里用的是二进制相与操作),也就是说发生哈希碰撞的元素会被分配到一个 bucket 进行存储,当某个 bucket 的 8 个空槽 slot/cell 都填满了,且 map 尚未达到扩容条件的情况下,运行时会建立 overflow bucket(溢出桶),并将这个 overflow bucket 挂在上面 bucket 末尾的 overflow 指针上,这样两个 bucket 形成了一个链表结构,用拉链的方式,解决哈希碰撞的问题。
2.map 的函数调用和类型字段
2.1 map 操作的函数调用
运行时实现了 map 类型操作的所有功能,包括创建、查找、插入、删除、遍历等。在编译阶段,Go 编译器会将 Go 语法层面的 map 操作,重写成运行时对应的函数调用。大致的对应关系是这样的:
// 创建map类型变量实例
m := make(map[keyType]valType, hint) → m := runtime.makemap(maptype, hint, h)
// 获取某键的值
v := m["key"] → v := runtime.mapaccess1(maptype, m, "key")
v, ok := m["key"] → v, ok := runtime.mapaccess2(maptype, m, "key")
// 删除某键
delete(m, "key") → runtime.mapdelete(maptype, m, “key”)
// 插入新键值对或给键重新赋值,v是用于后续存储value的空间的地址
m["key"] = "value" → v := runtime.mapassign(maptype, m, "key")
// 遍历 map
for k,v := range m{}
// 初始化 map 迭代器,后续操作以迭代器 hiter 为准
mapiterinit(t *maptype, h *hmap, it *hiter)
// 每次迭代会调用 mapiternext(it *hiter) 函数,返回下一个 key 和 value
mapiternext(it *hiter)
// for range map 编译器源码注释
// The loop we generate:
// var hiter map_iteration_struct
// for mapiterinit(type, range, &hiter); hiter.key != nil; mapiternext(&hiter) {
// index_temp = *hiter.key
// value_temp = *hiter.val
// index = index_temp
// value = value_temp
// original body
// }
2.2 map 类型 maptype
通过观察我们可以发现,这些运行时函数都有一个共同的特点,那就是第一个参数都是 maptype 指针类型的参数。**当我们声明一个 map 类型变量,比如 var m map[string]int 时,Go 编译期间就会为这个变量生成对应的特定 map 类型,生成一个 runtime.maptype 实例。**maptype 实例的结构如下:
type maptype struct {
typ _type
key *_type // key 类型
elem *_type // elem(value) 类型
bucket *_type // 表示哈希桶的内部类型
// function for hashing keys (ptr to key, seed) -> hash
// 哈希函数,用于计算 key 的哈希值
hasher func(unsafe.Pointer, uintptr) uintptr
keysize uint8 // key 大小
elemsize uint8 // value(elem) 大小
bucketsize uint16 // bucket 大小
flags uint32 // map k-v 标志位,比如 key 是否存储的是指针
}
type _type struct {
size uintptr
ptrdata uintptr // 保存所有指针的内存前缀的大小 size of memory prefix holding all pointers
hash uint32
tflag tflag
align uint8
fieldAlign uint8
kind uint8
// function for comparing objects of this type
// (ptr to object A, ptr to object B) -> ==?
equal func(unsafe.Pointer, unsafe.Pointer) bool
// gcdata stores the GC type data for the garbage collector.
// If the KindGCProg bit is set in kind, gcdata is a GC program.
// Otherwise it is a ptrmask bitmap. See mbitmap.go for details.
gcdata *byte
str nameOff
ptrToThis typeOff
}
这个实例包含了我们需要的 map 类型中的所有"元信息"。Go 运行时就是利用 maptype 参数中的信息确定 key、value 的类型和大小的。maptype 的存在让 Go 中所有 map 类型都共享一套运行时 map 操作函数,而不是像 C++ 那样为每种 map 类型创建一套 map 操作函数,这样就节省了对最终二进制文件空间的占用。
这里简单解释一下 flags 的作用。由于 map 对 key、value 的数据长度有长度限制,如果 key 或 value 的数据长度大于一定数值,那么运行时不会在 bucket 中直接存储数据,而是会存储 key 或 value 数据的指针。目前 Go 运行时定义的最大 key 和 value 的长度是这样的:
const (
maxKeySize = 128
maxElemSize = 128
)
因此,需要字段标记 key,value 存储的是不是指针,就是 flags 字段,flags 字段还标记了其他值,后续代码里用到会具体进行讲解。
func (mt *maptype) indirectkey() bool { // store ptr to key instead of key itself
return mt.flags&1 != 0
}
func (mt *maptype) indirectelem() bool { // store ptr to elem instead of elem itself
return mt.flags&2 != 0
}
3.map 创建
创建 map 对应的函数调用为 makemap,下边对其源码进行分析:
// 创建map类型变量实例
m := make(map[keyType]valType, hint) → m := runtime.makemap(maptype, hint, h)
3.1 makemap 函数创建 map
// makemap implements Go map creation for make(map[k]v, hint).
// If the compiler has determined that the map or the first bucket
// can be created on the stack, h and/or bucket may be non-nil.
// If h != nil, the map can be created directly in h.
// If h.buckets != nil, bucket pointed to can be used as the first bucket.
// makemap 是 make(map[k]v, hint) 的实现,创建一个 map。
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 计算 bucket 所需内存大小,是否溢出,如果溢出或超过最大内存,分配 hint = 0
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
// initialize Hmap
// 分配 hamp 结构体所占内存
if h == nil {
h = new(hmap)
}
// 生成随机哈希种子
h.hash0 = fastrand()
// Find the size parameter B which will hold the requested # of elements.
// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
// 根据传入的 hint 和 负载因子,计算出需要的最小桶数量。
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
// allocate initial hash table 分配最初的哈希表内存
// if B == 0, the buckets field is allocated lazily later (in mapassign) B == 0 时,延迟分配
// If hint is large zeroing this memory could take a while. hint 比较大时,耗时比较长
if h.B != 0 {
var nextOverflow *bmap
// 初始化 buckets(分配 buckets 所需要的内存),可能会提前分配一些空闲的溢出桶,以备后续使用,提升速度
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
// 如果预分配了空闲的溢出桶数组,则初始化 mapextra 字段,用 h.extra.nextOverflow 指向第一个溢出桶
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
// overLoadFactor 放置在 2^B 个桶中的键值对数量是否超过负载因子。
func overLoadFactor(count int, B uint8) bool {
// 第一步:判断一个桶能否装下,可以装下 B 就不用算了
// 第二步:count 是否大于 桶数量(2^B) * 负载因子(6.5),如果比这都大,说明现在的 B 负载不了,还需要 ++
// bucketShift(B) = 2^B
// loadFactorNum / loadFactorDen = 6.5
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
为什么说 map 是引用类型,看 makemap 函数返回值就明白了,返回了 *hmap 一个指针;这一点需要和 slice 区分开,创建切片源码返回的是一个结构体,只不过是结构体里有底层数组指针,有兴趣可以看这一篇文章:切片原理分析。
3.2 makeBucketArray 初始化 map buckets 底层数组
makemap 函数中当 h.B != 0 时,makeBucketArray 函数会初始化 map buckets 的底层数组,还可能会预分配空闲溢出桶的内存。
// makeBucketArray initializes a backing array for map buckets.
// 1<<b is the minimum number of buckets to allocate.
// dirtyalloc should either be nil or a bucket array previously
// allocated by makeBucketArray with the same t and b parameters.
// If dirtyalloc is nil a new backing array will be alloced and
// otherwise dirtyalloc will be cleared and reused as backing array.
// makeBucketArray 初始化 map buckets 的底层数组
// 最小存储桶数为 1<<b,也就是 2^b
// dirtyalloc 应该是 nil 或之前由 makeBucketArray 使用相同的 t 和 b 参数分配的存储桶数组。
// 如果 dirtyalloc 为 nil,将分配一段新的内存;否则将清除 dirtyalloc 指向的内存,将其作为新分配的内存。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
// nbuckets = base = 2^b
base := bucketShift(b)
nbuckets := base
// For small b, overflow buckets are unlikely.
// Avoid the overhead of the calculation.
// 对于 b 很小的情况,溢出桶的可能性不大(由于数据较少,哈希冲突少),避免计算开销(减少溢出桶的预分配)
// 当 b >= 4 认为溢出桶的使用概率变大,可以预先分配一下溢出桶的内存,提升后续使用 map 的性能
if b >= 4 {
// Add on the estimated number of overflow buckets
// required to insert the median number of elements
// used with this value of b.
// 总存储桶数量 = 普通存储桶 + 溢出桶数量 (2^(b-4))
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets // 存储桶所需内存
up := roundupsize(sz) // 内存对齐后所占内存
if up != sz {
// 根据新内存重新计算存储桶数量
nbuckets = up / t.bucket.size
}
}
if dirtyalloc == nil {
// 创建 t.bucket 类型新数组
buckets = newarray(t.bucket, int(nbuckets))
} else {
// dirtyalloc was previously generated by
// the above newarray(t.bucket, int(nbuckets))
// but may not be empty.
// 清空原有数组内存,作为该 map 的数组使用
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
// bucket 中有指针,用有指针的清理函数
memclrHasPointers(buckets, size)
} else {
// bucket 中没有指针,用没有指针的清理函数
memclrNoHeapPointers(buckets, size)
}
}
// 有预分配的溢出桶情况
if base != nbuckets {
// We preallocated some overflow buckets.
// To keep the overhead of tracking these overflow buckets to a minimum,
// we use the convention that if a preallocated overflow bucket's overflow
// pointer is nil, then there are more available by bumping the pointer.
// We need a safe non-nil pointer for the last overflow bucket; just use buckets.
// nextOverflow 指向溢出桶的开始地址(溢出桶地址在普通存储桶后边)
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
// 获取最后的一个溢出桶
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
// 最后一个溢出桶的 overflow 指针链接到第一个普通桶
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
通过阅读 makeBucketArray 函数源码,我们可以发现预分配的溢出桶数组的尾部的溢出桶有一点点特殊,它的 overflow 字段并不是 nil(其余溢出桶都是 nil),这样一来,我们通过判断溢出桶的 overflow 是否为 nil 就可以知道是否是最后一个空闲溢出桶。如果是最后一个空闲溢出桶,那么将 map 里面的 extra.nextOverflow 字段设置为 nil,表示预分配的空闲溢出桶用完了,后面如果再需要溢出桶的时候,就只能直接 new 一个了。
4.map 读取
读取 map 一般有两种形式,分别对应两个调用函数:
// 获取某键的值
v := m["key"] → v := runtime.mapaccess1(maptype, m, "key")
v, ok := m["key"] → v, ok := runtime.mapaccess2(maptype, m, "key")
为了更好地理解源码的内容,我们先来学习一下在 map 中如何定位一个 key。
4.1 定位 key 的方式
当向 map 插入一条数据,或者是从 map 按 key 查询数据的时候,运行时都会使用哈希函数对 key 做哈希运算,并获得一个哈希值(hashcode,在 64 位机器上为 64 个 bit 位)。这个 hashcode 是定位 key 的关键,运行时会从 hashcode 中取得高 8 位和 低 B(hmap.B) 位 ,其中低位区的值用于选定 bucket,高位区的值用于在某个 bucket 中确定 key 的位置(也就是 tophash 的值)。我把这一过程整理成了下面这张示意图,你理解起来可以更直观:
每个 bucket 的 tophash 区域其实是用来快速定位 key 位置的,这样就避免了逐个 key 进行比较这种代价较大的操作。尤其是当 key 是 size 较大的字符串类型时,好处就更突出了。这是一种以空间换时间的思路,避免的是 key 本身的内容过大导致的慢,而不是把遍历 tophash 这个过程砍掉。换句话说:比较 8 个 bit 的 tophash 肯定是比比较字符串更快的。
当然,后续我们分析源码也会看到,比较 key 是否相等这一步骤是绕不过去的,因为哈希冲突的存在,低 B 位使得两个 key 落在了一个桶里,还会有更小的概率,这俩 key 哈希值的 tophash 也相等,此时只能通过 key 是否相等来进行区分(map 中 key 是唯一的);同理,找到了哈希值低 B 位和高8位相等的槽,并不能确定的说这就是我们要找的槽,还是需要比较 key 是否相等,做最终的判断。因此,tophash 只是对比较 key 的次数进行了优化,绕不过比较 key 是否相等这一层。
4.2 mapaccess1 源码解析
mapaccess1 和 mapaccess2 函数实现基本一致,只是返回值不同,这里我们以 mapaccess1 为例子进行分析,查找的关键有两点:
- key 的定位问题,上一小节已经详细讲述。
- 在扩容过程中查找,需要注意 bucket 是否已经迁移,未进行迁移的桶,需要在旧桶中查找。
// mapaccess1 返回一个指向 h[key] 的指针。 永远不会返回 nil,
// 如果键不在映射中,它将返回对 elem 类型的零对象的引用。
// NOTE: The returned pointer may keep the whole map live, so don't
// hold onto it for very long.(因为返回的是指针,会导致整个 map 不能被GC)
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// ...
// 如果 h 什么都没有,返回零值
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0])
}
// 并发读和写冲突(hashWriting 写 map 时标志位为 1)
// 所以 map 并发读是可以的,并发读写会 panic
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}
// 不同类型 key 使用的 hash 算法在编译期确定,t *maptype 参数确定哈希函数
// 计算哈希值,并且加入 hash0 引入随机性
hash := t.hasher(key, uintptr(h.hash0))
// 比如 B=5,那 m 就是31,二进制是全 1
// 定位 bucket num 时,将 hash 与 m 相与,
// 最终由 hash 的低 8 位决定 key 在哪个 bucket 中
// 和子网掩码作用类似
m := bucketMask(h.B)
// b 就是具体 bucket 的首地址(h.buckets 首地址 + 偏移量)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 如果 map 正经历扩容,需要重新定位,需要从旧桶中读取
if c := h.oldbuckets; c != nil {
// 不是等量扩容,需要重新计算 m
if !h.sameSizeGrow() {
// 不是等量扩容,则将 m 除以 2,因为是 2 倍扩容,
// 所以 buckets 的大小为 oldbuckets 长度的 2 倍,
// 除以 2 才是旧的桶数量
m >>= 1
}
// key 在 oldbuckets 中所在 bucket 的首地址
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
// 如果该 bucket 尚未迁移到新的 buckets 中
if !evacuated(oldb) {
// b 尚未迁移到新的 buckets 中,还在 oldbuckets 中
// 则需要从旧桶中查找
b = oldb
}
}
// 计算高 8 位,用于定位 key 具体位置(下边有具体实现)
top := tophash(hash)
bucketloop:
// 遍历溢出桶所有的 bucket(这相当于是一个 bucket 链表)。
// 第一次遍历的是普通桶
for ; b != nil; b = b.overflow(t) {
// 循环遍历 bucketCnt = 8 个元素(一个 bucket 存储 8 个 key)
for i := uintptr(0); i < bucketCnt; i++ {
// tophash 不匹配不是这个槽,continue,继续遍历下一个 cell
if b.tophash[i] != top {
// emptyRest 为 tophash 标志位:
// 这个 cell 是空的,并且在更高的索引或溢出处没有更多的非空 cell(bucket 后边没 key了)。
// 如果等于这个标志位,后续就不需要再进行遍历了
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// tophash 找到了,但有极小可能冲突,需要继续判断 key 是否相等
// k 定位到 key 的首地址
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 如果 key 是指针
if t.indirectkey() {
// 解引用,找到真正的 key 值
k = *((*unsafe.Pointer)(k))
}
// 比较 key 是否相等
if t.key.equal(key, k) {
// e:value 首地址
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// value 为指针
if t.indirectelem() {
// 解引用
e = *((*unsafe.Pointer)(e))
}
return e
}
// tophash 相等,但 key 不等,继续遍历下一次 cell
}
}
// 如果键不在映射中,它将返回对 elem 类型的零对象的引用。
return unsafe.Pointer(&zeroVal[0])
}
// 判断该 bucket 是否迁移完毕
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}
// tophash 计算 hash 的 tophash 值。
// 这是一个字节的大小的。(hash 最高的 8 位)
func tophash(hash uintptr) uint8 {
// top 本质上就是 hash 的前面 8 个字节
// goarch.PtrSize*8 - 8,左移位数:指针的字节大小 - 8 字节
top := uint8(hash >> (goarch.PtrSize*8 - 8))
// 为了跟正常的 tophash 区分开来,如果计算出来的 tophash 小于 minTopHash,
// 会将计算出来的 tophash 加上 minTopHash:
// 这样一来,通过 tophash 这一个字节就可以记录桶里面槽的状态了,非常节省空间。
if top < minTopHash {
top += minTopHash
}
return top
}
// 计算 overflow 指向的 bmap
func (b *bmap) overflow(t *maptype) *bmap {
return *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize))
}
uintptr(t.bucketsize)-goarch.PtrSizeadd(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize)
5.map 删除
map 删除对应的函数为:mapdelete。如果删除发生在扩容过程中,需要先把定位到的 bucket 迁移完毕后才进行删除,顺便还要为渐进式迁移做出额外贡献:顺序迁移一个 bucket。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapdelete)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
if asanenabled && h != nil {
asanread(key, t.key.size)
}
// map 为 空,直接返回
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return
}
// 并发写检查
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
// 计算 key 的 hash 值
hash := t.hasher(key, uintptr(h.hash0))
// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write (delete).
// 设置写 map 标志
h.flags ^= hashWriting
// 定位 key 所在 bucket
bucket := hash & bucketMask(h.B)
// 正在扩容
if h.growing() {
// 迁移该 bucket 到新地址
growWork(t, h, bucket)
}
// b 指向 key 所在 bucket
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 记录原始的 b
bOrig := b
// 计算 tophash
top := tophash(hash)
search:
// 循环整个 bucket 链表
for ; b != nil; b = b.overflow(t) {
// 循环 8 个槽
for i := uintptr(0); i < bucketCnt; i++ {
// tophash 不匹配
if b.tophash[i] != top {
// emptyRest:bucket 后边数据都为空,不用继续循环了
if b.tophash[i] == emptyRest {
break search
}
// 直到找到匹配的 tophash
continue
}
// 获取 key(处理 key 为指针的情况)
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
// key 不相等,继续循环 bucket
if !t.key.equal(key, k2) {
continue
}
// 找到相等的 key
// Only clear key if there are pointers in it.
// 仅当其中有指针时才清除键
if t.indirectkey() {
// 当 key 存储为指针
// 指针指向 nil,并不主动清理内存(由 GC 处理)
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
// key 本身为指针
// 清除指针内存
memclrHasPointers(k, t.key.size)
}
// 获取 elem(value);同理 key 的清空过程
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
// 清除不包含指针的 elem 的内存
memclrNoHeapPointers(e, t.elem.size)
}
// tophash 置为标志位 emptyOne,此槽为空
b.tophash[i] = emptyOne
// If the bucket now ends in a bunch of emptyOne states,
// change those to emptyRest states.
// 如果这个 bucket 后边以一堆 emptyOne 为结尾,则可以设置为 emptyRest
// It would be nice to make this a separate function, but
// for loops are not currently inlineable.
if i == bucketCnt-1 {
// key 为 bucket 中最后一个槽
// 溢出桶不为空 && 溢出桶的第一个标志位不为 emptyRest
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
// 不是最后一个元素,后边还有数据,直接跳过 for 循环
goto notLast
}
} else {
// 当 key 不为 bucket 最后一个槽时,直接检查后一个槽是否为 emptyRest
// 即可判断后边是否还有数据
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
// 执行到这里的时候,表明刚刚删除的 key 是 bucket 或其溢出桶中的最后一个有数据的元素,后续都没数据了
// 倒着往前数,把 emptyOne 标志位都置为 emptyRest,直到有数据或链表到头
// emptyRest 标志可以优化遍历 map 的性能
for {
// 将当前 b.tophash[i] 设置为 emptyRest,标志后边没有元素了
// 循环往前设置 emptyRest,直到槽内有数据或链表到头
b.tophash[i] = emptyRest
// 从 i 开始往前遍历,直到 i == 0
// 当遍历到 i == 0 时,分两种情况
// 1. 链表到头了
// 2. 需要找到链表的前一个,继续循环遍历,然后赋值 emptyRest
if i == 0 {
// 如果 b 是链表最原始的头(也就是普通桶)
if b == bOrig {
// 找到初始桶,就意味着链表到头了,可以结束循环了
break // beginning of initial bucket, we're done.
}
// Find previous bucket, continue at its last entry.
// 找到当前 bucket 的前一个 bmap
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
// 继续遍历前一个桶的槽
i = bucketCnt - 1
} else {
i--
}
// 如果当前 i 标志位不为 emptyOne,说明槽内有数据,则终止循环
if b.tophash[i] != emptyOne {
break
}
}
notLast:
// 由于删除了元素,因此 count--
h.count--
// Reset the hash seed to make it more difficult for attackers to
// repeatedly trigger hash collisions. See issue 25237.
// 如果 map 内元素清零
if h.count == 0 {
// 重置哈希种子,使攻击者更难重复触发哈希冲突。 请参阅问题 25237。
h.hash0 = fastrand()
}
// 已经删除了对应的 value,退出循环
break search
}
}
// 写冲突判断
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
// 消除写标志
h.flags &^= hashWriting
}
map 删除逻辑有几个非常优秀的设计值得一提:
typedmemmove(t.key, insertk, key)
6.map 修改和写入
map 修改和写入共用一个函数:mapassign。同样,修改和写入如果发生在扩容期间,也会触发 bucket 迁移,最多两个 bucket 进行迁移,也是等 bucket 迁移完毕,对新 bucket 进行修改和写入;另外写入 key 还有可能会额外触发扩容操作,一旦发生扩容,key 需要重新进行计算和写入。
6.1 mapassign 修改和写入 map
// 功能:插入 key 或者修改 map 中的 key
// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// map 为 nil,触发 panic
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapassign)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled {
msanread(key, t.key.size)
}
if asanenabled {
asanread(key, t.key.size)
}
// 并发写,panic
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
// 计算 key 的哈希值
hash := t.hasher(key, uintptr(h.hash0))
// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.
// 标记正在写 map(map 并发读写、并发写不安全)
h.flags ^= hashWriting
// 如果 buckets 为 nil,则新建一个
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
again:
// 获取 bucket 下标(定位 key 在哪个 bucket)
bucket := hash & bucketMask(h.B)
// 正在扩容
if h.growing() {
// 迁移该 bucket
growWork(t, h, bucket)
}
// b 为 key 所属的 bucket
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 计算 tophash 高 8 位
top := tophash(hash)
var inserti *uint8 // 记录要插入的 tophash 地址
var insertk unsafe.Pointer // 记录要插入的 key 地址
var elem unsafe.Pointer // 记录要插入的 value(elem) 地址
bucketloop:
for {
// 循环 bucket 中的 8 个槽
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
// tophash 不匹配
// 槽为空 && 还没记录要插入的位置
if isEmpty(b.tophash[i]) && inserti == nil {
// 记录要插入的位置 tophash、key、value
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
// 如果 tophash 为标志位 emptyRest,意味着 bucket 没有找到对应的 key,同时 bucket 中剩余的槽都是空的。
// 后边就没必要循环了
if b.tophash[i] == emptyRest {
// 终止对 bucket 的循环
break bucketloop
}
// 没找到对应的 tophash,就继续找
continue
}
// 走到这里,意味着找到对应的 tophash 了
// 获取 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// key 是否包含指针
if t.indirectkey() {
// 解引用
k = *((*unsafe.Pointer)(k))
}
// key 是否相等(寻找 key 的最终判断条件)
if !t.key.equal(key, k) {
continue
}
// already have a mapping for key. Update it.
// 已存在该 key,则更新值
// 看看 key 是否需要被覆盖
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
// 获取 value 的地址,最终返回 elem 的地址,在函数外按地址对值进行更新,这样就不用把新值传进来了
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// 找到 key 和 value,后续就可以结束了
goto done
}
// 获取下一个溢出桶地址
ovf := b.overflow(t)
// 溢出桶到头了
if ovf == nil {
// 跳出 bucket 循环
break
}
// 去下一个溢出桶继续找 key
b = ovf
}
// Did not find mapping for key. Allocate new cell & add entry.
// 没找到 key,说明要插入新 key,需要给 key 分配新槽
// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
// 非扩容过程中 && (超过负载因子 || 有太多溢出桶) -> 触发扩容
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
// 哈希表扩容
hashGrow(t, h)
// 扩容后上述所做工作全部无效,需要重新再来一遍
goto again // Growing the table invalidates everything, so try again
}
// 没找到可插入的地方
if inserti == nil {
// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
// 新建一个溢出桶
newb := h.newoverflow(t, b)
// 记录要插入的位置
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
// store new key/elem at insert position
// key 是否需要存储指针(key 太大时,需要存储指针类型)
if t.indirectkey() {
kmem := newobject(t.key) // 给新 key 分配内存(只是分配了地址,还没有写入 key)
*(*unsafe.Pointer)(insertk) = kmem // 槽中的 key 值区域保存了新 key 应在的内存指针
insertk = kmem // insertk 指向新分配的地址,不再指向槽中的 key 值区
}
// value 同理 key 操作
if t.indirectelem() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
// 移动 key 到 insertK 指向的地址
// 最终槽中的 key 值区域保存了新 key 应在的内存指针,该指针指向了新 key 值
// 如果不需要 indirectkey,则槽中的 key 值区域直接保存 key 值
typedmemmove(t.key, insertk, key)
*inserti = top // 保存 tophash 值
h.count++ // map 元素个数 +1
done:
// 如果写冲突,panic
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
// 清除写标志
h.flags &^= hashWriting
// 如果 elem 保存的是指向 elem 的指针
if t.indirectelem() {
// 获取指向 elem 实际存储地址的指针(由于返回 elem 用于修改 value,所以需要操作 value 的地址)
elem = *((*unsafe.Pointer)(elem))
}
// 返回存储值的指针
return elem
}
还记得上一小节留下了一个小小的疑问吗?当 key、elem 都不是指针时,删除 key 时只清理了 elem 的内存,却保留了 key 的内存,虽然这么写代码是合理的,但优点是啥呢?
typedmemmove(t.key, insertk, key)
func typedmemmove(typ *_type, dst, src unsafe.Pointer) {
// 当内存覆盖的源和目的一样,直接就 return 了,否则才进行内存覆盖
if dst == src {
return
}
if writeBarrier.needed && typ.ptrdata != 0 {
bulkBarrierPreWrite(uintptr(dst), uintptr(src), typ.ptrdata)
}
// There's a race here: if some other goroutine can write to
// src, it may change some pointer in src after we've
// performed the write barrier but before we perform the
// memory copy. This safe because the write performed by that
// other goroutine must also be accompanied by a write
// barrier, so at worst we've unnecessarily greyed the old
// pointer that was in src.
memmove(dst, src, typ.size)
if writeBarrier.cgo {
cgoCheckMemmove(typ, dst, src, 0, typ.size)
}
}
到这里,相信你已经明白为啥唯独不处理 key 了,原因就是被删除的 key 重新被写回 map 的概率很高,同时这个 key 重新再被插到同一个槽的概率也很高,当重新被写回来时,就不用处理内存了,这样可以极大地提升性能;而一个被删除的 key ,再次被插进来时,value 还是同一个 value 的概率可太小了(相信各位写业务代码的时候都能感受到),因此还不如直接把 value 清理了,这样代码的可读性会更好,更符合我们编码的习惯,通过阅读源码可以发现,golang 源码的工程师是把性能优化考虑到了极致,希望你阅读本篇文章可以感受到这一点。
6.2 newoverflow 新建溢出桶
// 新建一个溢出桶
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
var ovf *bmap
// 优先使用创建 map 时预分配的溢出桶 h.extra.nextOverflow
if h.extra != nil && h.extra.nextOverflow != nil {
// We have preallocated overflow buckets available.
// See makeBucketArray for more details.
ovf = h.extra.nextOverflow
if ovf.overflow(t) == nil {
// 预分配的溢出桶中指向下一个溢出桶的字段为 nil,意味着还没有用完预分配的溢出桶(不懂的看前面分析)
// We're not at the end of the preallocated overflow buckets. Bump the pointer.
// h.extra.nextOverflow 指向下一个可用的溢出桶
h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
} else {
// This is the last preallocated overflow bucket.
// Reset the overflow pointer on this bucket,
// which was set to a non-nil sentinel value.
// 已经用到最后一个预分配的溢出桶,设置其内部指向下一个溢出桶的字段为 nil(因为要用,需要恢复初始值)
ovf.setoverflow(t, nil)
// 这里表示预分配的溢出桶用完了
h.extra.nextOverflow = nil
}
} else {
// 没有预分配的溢出桶,只能重新建一个
ovf = (*bmap)(newobject(t.bucket))
}
// h.noverflow 溢出桶数量增加,记录溢出桶数量
// 溢出桶数量小时,是精确值,否则为大概计数,源码很简单,有兴趣可以看一下
h.incrnoverflow()
// map 中 key,value 不含指针
if t.bucket.ptrdata == 0 {
// 创建 overflow 切片
h.createOverflow()
// 切片指向所有溢出桶(保活使用,防止被 GC)
*h.extra.overflow = append(*h.extra.overflow, ovf)
}
// 给当前 bucket 的 overflow 赋值指向溢出桶
b.setoverflow(t, ovf)
return ovf
}
makeBucketArrayh.extra.nextOverflow
h.extra.overflow
7.map 扩容
使用哈希表的目的就是要快速查找到目标 key,然而,随着向 map 中添加的 key 越来越多,key 发生碰撞的概率也越来越大;bucket 中的 8 个槽会被逐渐塞满,通过前面的源码分析,我们也知道,查找、插入、删除 key 的效率也会越来越低。
最理想的情况是一个 bucket 只装一个 key,这样就能达到 O(1) 的效率,但这样空间消耗太大,用空间换时间的代价太高。而 Go 对 map 的设计中,一个 bucket 能装载 8 个 key,定位到 bucket 后,还需要再定位到具体的 key,其实是一种时间换空间的操作。超过 8 个的 key 冲突,以链表的形式解决,当然这也要有度,不然一个 bucket 存储太多的 key,性能会直接退化为链表,操作效率变为 O(n)。
loadFactor := count / (2^B)2^B
7.1 扩容触发条件
扩容触发的时机:map 插入新 key 时,会检测触发条件,满足条件则会触发扩容。
在 mapassign 函数中触发条件是这样写的:
// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
// 非扩容过程中 && (超过负载因子 || 有太多溢出桶) -> 触发扩容
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
// 哈希表扩容
hashGrow(t, h)
// 扩容后上述所做工作全部无效,需要重新再来一遍
goto again // Growing the table invalidates everything, so try again
}
// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
func overLoadFactor(count int, B uint8) bool {
// loadFactorNum = 13;loadFactorDen = 2
// bucketShift(B) = 2^B
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
// tooManyOverflowBuckets reports whether noverflow buckets is too many for a map with 1<<B buckets.
// Note that most of these overflow buckets must be in sparse use;
// if use was dense, then we'd have already triggered regular map growth.
// 判断是否有太多的溢出桶了
// 多的标准是:
// B <= 15 的时候,溢出桶数量大于 2^B 的时候
// B > 15 的时候,溢出桶的数量大于 2^15 的时候
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
// If the threshold is too low, we do extraneous work.
// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
// "too many" means (approximately) as many overflow buckets as regular buckets.
// See incrnoverflow for more details.
if B > 15 {
B = 15
}
// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
return noverflow >= uint16(1)<<(B&15)
}
扩容触发条件总结为两点:
- 装载因子超过阈值,源码里定义的阈值是 6.5。
- 如果每个 bucket 都没有溢出,并且 8 个槽都装满的情况下,装载因子 = 8,6.5 也就意味着很多 bucket 都快装满了,查找和插入效率都会变低,这个时候扩容是很有必要的。至于装载因子为啥是 6.5,官方注释给的解释是应该实验得出的,实验数据在源码也能找到,这里就不展示了。
- 针对元素过多的这种情况,Go 给的扩容方式为 2 倍扩容,即 B + 1,bucket 由原来的 2 ^ B 变为 2 ^ B * 2 = 2^(B+1)。
- 溢出桶的数量过多。
- 当装载因子较小的时候,有时候会发现 map 的查找和插入效率也很低,原因是溢出桶太多了,但由于 map 中元素少,触发不了第一种条件,但查询效率变低;这种是由于大量的插入和删除元素导致的:比如先插入了大量的元素,创建了很多的 overflow,然后删除后,继续插入,触发不了第一种扩容机制,但 key 会因为大量 overflow 的存在变得很分散,导致查询效率变低。
- 针对这种元素不多,overflow 过多的情况,bucket 并没有装满,只是太分散了,解决办法就是开辟一个新 bucket 空间(与原空间等量为 2 ^ B),将老 bucket 中的元素移动到新 bucket,使得同一个 bucket 中的 key 排列地更紧密。这样,原来,在 overflow bucket 中的 key 可以移动到 bucket 中来。结果是节省空间,提高 bucket 利用率,map 的查找和插入效率自然就会提升。
7.2 扩容源码解析
由于 map 扩容需要将原有的 key/value 重新搬迁到新的内存地址,如果有大量的 key/value 需要搬迁,会非常影响性能。因此 Go map 的扩容采取了一种称为“渐进式”的方式,原有的 key 并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket。
因此,map 扩容分为两个阶段:
hashGrowgrowWork
7.2.1 hashGrow 函数
func hashGrow(t *maptype, h *hmap) {
// If we've hit the load factor, get bigger.
// Otherwise, there are too many overflow buckets,
// so keep the same number of buckets and "grow" laterally.
// 扩容分为等量和2倍量
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
// 未达到装载因子,等量扩容
bigger = 0
// 标志位置为等量扩容
h.flags |= sameSizeGrow
}
// h.buckets 挂载到 oldbuckets 字段
oldbuckets := h.buckets
// 分配新的 bucket 数组,和预分配新的溢出桶
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
// flags 先把迭代器标志置为 0,用于后续记录迭代器在使用新的还是旧的 bucket
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
// 当前 hmap 在迭代器中
// flags 迭代器标志位置为在使用旧 bucket,因为在扩容前发生的
flags |= oldIterator
}
// commit the grow (atomic wrt gc)
// 提交扩容操作
h.B += bigger // 等量或2倍量扩容
h.flags = flags // 迭代器使用新旧 bucket 标志位确认(迭代器初始化会把新旧标志位都置为 1)
h.oldbuckets = oldbuckets // 挂载旧 buckets
h.buckets = newbuckets // 挂载新 buckets
h.nevacuate = 0 // 初始化迁移进度
h.noverflow = 0 // 新 buckets 溢出桶数量为 0
// h.extra.overflow != nil 意味着 k-v 都不是指针,溢出桶存储在 overflow 字段保活
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
// 旧溢出桶存储到 oldoverflow
h.extra.oldoverflow = h.extra.overflow
// 新溢出桶个数为 0 ,置空即可
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
// 指向预分配的新空白溢出桶
h.extra.nextOverflow = nextOverflow
}
// the actual copying of the hash table data is done incrementally
// by growWork() and evacuate().
// 哈希表数据的实际迁移过程是通过 growWork() 和 evacuate() 增量完成的。
}
hashGrow 函数逻辑比较简单,其中值得一提是对 h.flags 的处理,这里其实是和下一小节迭代器有关系,我这里把代码放到一起看,你会更明白:
func hashGrow(t *maptype, h *hmap) {
// flags 先把迭代器标志置为 0,用于后续记录迭代器在使用新的还是旧的 bucket
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
// 当前 hmap 在迭代中
// flags 迭代器标志位置为在使用旧 bucket,因为在扩容前发生的
flags |= oldIterator
}
h.flags = flags // 迭代器使用新旧 bucket 标志位确认(迭代器初始化会把新旧标志位都置为 1)
}
// 迭代器初始化函数
func mapiterinit(t *maptype, h *hmap, it *hiter) {
// Remember we have an iterator.
// Can run concurrently with another mapiterinit().
// 记住我们有一个迭代器。
// 可以与另一个 mapiteinit() 同时运行。
// 设置 hmap 迭代器标志位为 iterator|oldIterator
// 1. 迭代前未进行扩容,迭代器初始化为 11,迭代过程中发送扩容,标志位被更改为 10
// 表示迭代器正在使用旧 hmap.oldbuckets,迁移完毕不能其清理内存,标志新 bucket 没有被迭代器使用
// 2. 迭代器初始化时正在经历扩容,标志为 11,表示 hmap.buckets 正在被迭代器使用
// 此时,迭代器也会访问 hmap.oldbuckets 数据,因此也不能进行清理
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
}
mapiterinithashGrow
7.2.2 growWork 函数
mapassign
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// 正在扩容
if h.growing() {
// 迁移该 bucket
growWork(t, h, bucket)
}
}
// growing reports whether h is growing. The growth may be to the same size or bigger.
func (h *hmap) growing() bool {
// 当 h.oldbuckets != nil 时,说明正在扩容中
return h.oldbuckets != nil
}
func growWork(t *maptype, h *hmap, bucket uintptr) {
// make sure we evacuate the oldbucket corresponding
// to the bucket we're about to use
// 把即将使用的旧存储桶迁移到新桶
evacuate(t, h, bucket&h.oldbucketmask())
// evacuate one more oldbucket to make progress on growing
// 仍然处于扩容中
if h.growing() {
// 按顺序触发迁移
evacuate(t, h, h.nevacuate)
}
}
bucket&h.oldbucketmask() 这行代码,如源码注释里说的,是为了确认搬迁的 bucket 是我们正在使用的 bucket。oldbucketmask() 函数返回扩容前的 map 的 bucketmask。bucket&h.oldbucketmask() = oldbucket 旧存储桶的下标。我们发现 growWork 函数会最多进行两次 bucket 迁移,每次迁移一个 bucket,如果定位到的 bucket 已经迁移完毕,或者刚好定位到的 bucket 与顺序迁移的 bucket 一致,则此过程只迁移一个 bucket,否则迁移两个 bucket。
- 顺序迁移,由标志位 nevacuate 决定旧 buckets 数组的下标。
- 写入导致的迁移, bucket&h.oldbucketmask() 用于计算旧 buckets 数组的下标。
接下来继续分析源码 evacuate 函数,逻辑有点复杂,这里理解的关键点在于 2 倍扩容期间的迁移操作。由于扩容是 2 倍的,也就意味着一个 bucket 会裂变为两个 bucket,那 oldbucket 里的 key 该如何分配呢?其实也很简单,buckets 数组由原来的 2^B 裂变为 2^(B+1) 个,那定位 key 时,就该计算低 B+1 位来定位新 buckets 数组的下标了。
其实就是多了一个二进制位,我们都知道一个二进制位代表了两种情况 0 或 1。假设 B = 5,原低 B 位为 00110,也就是定位到 buckets[6];如今 2 倍扩容后,该 key 可能的情况有 000110 = 6 或 100110 = 38,裂变为了两个 bucket 下标,其中一个和迁移前一致;一致的部分被称之为低位部分:x 部分;不一致的部分被称之为高位部分:y 部分;后续迁移以及迭代器部分都要使用 xy,需要这里记一下。
接下来,我们逐句分析一下源码:
// 迁移目的地
type evacDst struct {
b *bmap // 迁移目的地 bucket
i int // 槽数组的下标
k unsafe.Pointer // 当前槽 key 的地址指针
e unsafe.Pointer // 当前槽 elem 的地址指针
}
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 定位老的 bucket 地址
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// newbit = 2^B,该变量的意思是新 bit 位,如果是二倍扩容,旧 bucket 会发生裂变,产生新的 bit 位
newbit := h.noldbuckets()
// 如果 b 尚未进行迁移
if !evacuated(b) {
// TODO: reuse overflow buckets instead of using new ones, if there
// is no iterator using the old buckets. (If !oldIterator.)
// xy contains the x and y (low and high) evacuation destinations.
// xy 变量包含 x 和 y(低和高)迁移目的地。(2倍迁移,bucket 下标的前一位多了 0 或 1,分为 x 和 y)
var xy [2]evacDst
x := &xy[0]
// x 表示裂变的前半部分,在新桶数组的下标与旧桶数组下标一致
// 存储新桶要迁移的地址
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
// 非等量扩容,则为2倍扩容
if !h.sameSizeGrow() {
// Only calculate y pointers if we're growing bigger.
// Otherwise GC can see bad pointers.
// 2倍扩容存在 y 区域,也就是新 bit 位 = 1
// y 中存储新桶裂变的后半部分迁移地址
y := &xy[1]
// 2 倍裂变的 y 部分下标 = oldbucket+newbit
// 例如:oldbucket = 111 三位,newbit = 1000 四位;oldbucket+newbit = 1111 新下标
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
// 循环当前迁移旧 bucket 的整个链表结构
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset) // 取出第一个 key
e := add(k, bucketCnt*uintptr(t.keysize)) // 取出第一个 value
// 循环 8 个槽
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
// 获取 tophash
top := b.tophash[i]
// 槽上数据为空
if isEmpty(top) {
// 标记已迁移,且 cell 为空
b.tophash[i] = evacuatedEmpty
// 处理下一个槽
continue
}
// top 不能为已迁移标志,否则有问题,相当于已经迁移的又迁移一遍
if top < minTopHash {
throw("bad map state")
}
// 获取 key 真正的值
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8 // 数据迁移的裂变去向 0:表示x;1:表示 y
// 非等量扩容
if !h.sameSizeGrow() {
// Compute hash to make our evacuation decision (whether we need
// to send this key/elem to bucket x or bucket y).
// 计算 key 的哈希值,判断 key 的去向,是 x 还是 y 部分(等量扩容,直接迁移去 x 部分)
hash := t.hasher(k2, uintptr(h.hash0))
// 存在迭代器 && key != key && key !equal key
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
// If key != key (NaNs), then the hash could be (and probably
// will be) entirely different from the old hash. Moreover,
// it isn't reproducible. Reproducibility is required in the
// presence of iterators, as our evacuation decision must
// match whatever decision the iterator made.
// Fortunately, we have the freedom to send these keys either
// way. Also, tophash is meaningless for these kinds of keys.
// We let the low bit of tophash drive the evacuation decision.
// We recompute a new random tophash for the next level so
// these keys will get evenly distributed across all buckets
// after multiple grows.
// 如果 key != key(不是一个数字),key 每次的哈希值都不一样,它是不可再现的。
// 在迭代器存在的情况下,可重复性是必需的,因为我们的迁移决策必须与迭代器所做的任何决策相匹配,
// 迭代器还要利用迁移后的数据进行遍历 map。
// 幸运的是,我们可以任意发送这些 key,因为这些值,靠 key 计算哈希是访问不到的,只有迭代器可以访问。
// 此外,tophash 对于这些类型的 key 没有意义。我们让 tophash 的最低位的决定如何迁移。
// 我们为下一个级别重新计算一个新的随机 tophash,这样在多次扩容后,这些 key 将均匀分布在所有桶中。
useY = top & 1
top = tophash(hash)
} else {
// key 可计算 hash,且幂等的情况下,使用 hash 的新 bit 位,决定迁移数据的去向
if hash&newbit != 0 {
// hash新 bit 位为 1,应该去往 y 部分,否则去往 x 部分
useY = 1
}
}
}
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
// tophash 标志位设置为 evacuatedX + useY (迁移去了 x 还是 y 部分)
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
// 使用新 bucket 的目的地
dst := &xy[useY] // evacuation destination
// 目标 bucket 装不下了,使用溢出桶
if dst.i == bucketCnt {
// 创建一个溢出桶,更新 dst 迁移目的地
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
// 记录 tophash (dst.i&(bucketCnt-1) 避免边界检查)
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
// key 优化为指针存储
if t.indirectkey() {
// 修改 bucket 槽中指向 key 值的指针
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
// 修改 bucket 槽中保存 key 值的内存
typedmemmove(t.key, dst.k, k) // copy elem
}
// value 优化为指针存储
if t.indirectelem() {
// 修改 bucket 槽中指向 value 值的指针
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
// 修改 bucket 槽中保存 value 值的内存
typedmemmove(t.elem, dst.e, e)
}
// 通过改变 dst 内部指针的方式,改变了 &xy[useY] 指向下一个槽,继续进行迁移
dst.i++
// These updates might push these pointers past the end of the
// key or elem arrays. That's ok, as we have the overflow pointer
// at the end of the bucket to protect against pointing past the
// end of the bucket.
// 这些更新可能会将这些指针推过 key 或 elem 数组的末尾。
// 没关系,因为我们在桶的末端有溢出指针,以防止指针超出桶的末端。
// 到达溢出桶,会重新改变 dst 内部指针的指向
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// Unlink the overflow buckets & clear key/elem to help GC.
// 该 bucket 迁移完成,看是否需要清理内存
// 取消链接溢出桶并清除 key/elem 以帮助 GC。(清除旧桶的指针内存)
// hmap 不在迭代器中(迭代器没有使用旧 buckets) && key、value 包含指针 -> 此时应该清理内存
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
// 定位旧 bucket 地址
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// Preserve b.tophash because the evacuation
// state is maintained there.
// 不能清除 tophash 的内存,因为有标志位
// dataOffset 表示 tophash 的内存大小
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
// memclrHasPointers 清除从 ptr 开始的 n 个字节的类型化内存。
memclrHasPointers(ptr, n)
}
}
// 迁移的桶为顺序迁移的位置,需要更新 nevacuate 字段,表示已经迁移多少桶
// 表示该下标前的桶都迁移完毕了
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
advanceEvacuationMark 函数用于记录顺序迁移的进度,小于 nevacuate 下标的 bucket 表示已经完成迁移。当全部迁移完成时,会把 old 相关字段置空,但旧桶内存并没有被清理,只是表示停止指向 oldbuckets 的相关内存,如果此时存在迭代器,需要迭代器自己保活 oldbuckets 相关内存,否则会被 GC 回收。
// 记录迁移进度
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++
// Experiments suggest that 1024 is overkill by at least an order of magnitude.
// Put it in there as a safeguard anyway, to ensure O(1) behavior.
// 实验表明 1024 至少大了一个数量级。
// 把它放在那里作为保障,以确保 O(1) 行为。
// 往后找一个未迁移的桶
// 桶分两种情况进行迁移
// 1. 顺序迁移,2. 当前正在插入、修改、删除的 bucket 进行迁移
// 所以 h.nevacuate + 1 可能已经被迁移,所以需要一个搜索范围,控制最坏情况,也就是 1024
// 从 h.nevacuate + 1 最多遍历 1024
stop := h.nevacuate + 1024
// 如果 stop > 桶最大数量下标 :说明桶都遍历到最后了,还没到 stop 是不合理的,因此最大值为 newbit
if stop > newbit {
stop = newbit
}
// 还未遍历到未迁移的桶
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
// 已迁移标志 ++
h.nevacuate++
}
// 遍历到最后,也没有找到未迁移的桶,表示迁移完毕了
if h.nevacuate == newbit { // newbit == # of oldbuckets
// Growing is all done. Free old main bucket array.
// 释放 oldbuckets 字段(释放旧桶数组)
h.oldbuckets = nil
// Can discard old overflow buckets as well.
// If they are still referenced by an iterator,
// then the iterator holds a pointers to the slice.
// 也可以丢弃旧的溢出桶。
// 如果它们仍然被迭代器引用,那么迭代器将保存指向切片的指针,用于保活,防止被 GC。
//(但是 h 不再需要保存指向切片的指针)
if h.extra != nil {
h.extra.oldoverflow = nil
}
// 新 map 清除等量扩容标志
h.flags &^= sameSizeGrow
}
}
8.map 迭代器
从编译器源码 gofrontend/go/statements.cc/For_range_statement::do_lower() 中,可以看到 for…range 对 map 有以下注释:
// The loop we generate:
// var hiter map_iteration_struct
// for mapiterinit(type, range, &hiter); hiter.key != nil; mapiternext(&hiter) {
// index_temp = *hiter.key
// value_temp = *hiter.val
// index = index_temp
// value = value_temp
// original body
// }
注释源码分析:
- 初始化迭代器使用 mapiterinit(type, range, &hiter);hiter 为迭代器结构体。
- hiter.key != nil 为 for range 的终止条件,由 mapiternext 函数内部设置。
- 遍历 map 时没有指定循环次数,每一次迭代需要调用 mapiternext 函数获取新值,每调用一次 mapiternext,hiter.key 和 hiter.val 就会变化为下一个 map 元素,给外部 for range 使用。
8.1 mapiterinit 迭代器初始化
看源码之前,需要先理解一下迭代器 hiter 和 hmap 之间的关系,hiter 在初始化的时候不仅关联了 hmap,例如 hiter.h 字段,可以获取 hmap 的最新值;hiter 还记录 hmap 的快照数据(快照即为定格),迭代器是按照快照指向的 buckets 数组来进行遍历的。
// 哈希迭代结构。
type hiter struct {
key unsafe.Pointer // 必须排在第一位。 写入 nil 表示迭代结束
elem unsafe.Pointer // 必须在第二个位置(参见 cmd/compile/internal/walk/range.go)。
t *maptype // map 的类型信息,包括 key、elem 的类型等信息
h *hmap // 需要迭代的 hmap
// hiter 初始化时的 bucket 指针(迭代器初始化时,设置的 map 当前最新的 buckets = hmap.buckets)
buckets unsafe.Pointer
bptr *bmap // 当前正在遍历的 bucket
overflow *[]*bmap // 保持 hmap.buckets 的溢出桶存活
oldoverflow *[]*bmap // 保持 hmap.oldbuckets 的溢出桶存活
startBucket uintptr // bucket 迭代开始位置(随机选择的 bucket)
offset uint8 // 在迭代期间开始的桶内偏移量(应该足够大以容纳 bucketCnt-1)
// 已经从桶数组的末尾环绕到开始(比如从中间 bucket 开始遍历,
// 经过最大 bucket 时,从 0 下标 bucket 重新开始,此时该参数为 true)
wrapped bool
B uint8 // 就是当前遍历的 hmap 的那个 B
i uint8 // 当前遍历的 bucket 内 key 的索引
bucket uintptr // 当前遍历的 bucket
checkBucket uintptr // 是否需要检查 bucket 迁移裂变的位置
}
func mapiterinit(t *maptype, h *hmap, it *hiter) {
if raceenabled && h != nil {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiterinit))
}
// 记录 map 的类型信息
it.t = t
if h == nil || h.count == 0 {
return
}
if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {
throw("hash_iter size incorrect") // see cmd/compile/internal/reflectdata/reflect.go
}
// 关联上 map
it.h = h
// grab snapshot of bucket state
// 抓取桶状态快照,记录迭代器初始化的时候桶状态 (记住 it 是 hmap 的快照)
it.B = h.B
it.buckets = h.buckets
// bucket 中存储的内容不包含指针
if t.bucket.ptrdata == 0 {
// Allocate the current slice and remember pointers to both current and old.
// This preserves all relevant overflow buckets alive even if
// the table grows and/or overflow buckets are added to the table
// while we are iterating.
// 分配当前切片并记住指向当前切片和旧切片的指针。
// 即使在迭代时表增长或溢出桶添加到表中,这也会保留所有相关的溢出桶。
// 迭代器给溢出桶保活,迭代器迭代期间,溢出桶不能被 GC
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}
// decide where to start
// 决定从哪里开始遍历(这也是 map 迭代器每次顺序都随机的原理所在)
var r uintptr
if h.B > 31-bucketCntBits {
r = uintptr(fastrand64())
} else {
r = uintptr(fastrand())
}
// 随机迭代开始的 bucket 下标
it.startBucket = r & bucketMask(h.B)
// 随机迭代开始的 cell 下标(bucket 的槽也是随机的,每个 bucket 都从这个槽开始遍历,转一圈)
it.offset = uint8(r >> h.B & (bucketCnt - 1))
// iterator state
// 当前遍历到的 bucket 下标(初始值)
it.bucket = it.startBucket
// Remember we have an iterator.
// Can run concurrently with another mapiterinit().
// 记住我们有一个迭代器。
// 可以与另一个 mapiteinit() 同时运行。
// 设置 hmap 迭代器标志位为 iterator|oldIterator
// 1. 迭代前未进行扩容,迭代器初始化为 11,迭代过程中发送扩容,标志位被更改为 10
// 表示迭代器正在使用旧 hmap.oldbuckets,迁移完毕不能其清理内存,标志新 bucket 没有被迭代器使用
// 2. 迭代器初始化时正在经历扩容,标志为 11,表示 hmap.buckets 正在被迭代器使用
// 此时,迭代器也会访问 hmap.oldbuckets 数据,因此也不能进行清理
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
// 开始一次遍历
mapiternext(it)
}
这里着重讲解一下遍历 map 两个随机起点的计算方式:
- bucket 数组下标的起始值 it.startBucket = r & bucketMask(h.B),利用随机数 r 的后 B 位确定起始的 bucket 下标。
- 每一个 bucket 起始槽的偏移量 it.offset = uint8(r >> h.B & (bucketCnt - 1)) ,bucketCnt - 1 = 7,二进制也就是 111,r 右移 B 位后与 7 相与,得到一个 0~7 的槽下标。
map 底层是使用哈希表实现的,插入数据位置是随机的,所以遍历过程中新插入的数据不能保证遍历到,有可能在遍历过程中插入到已遍历过的 bucket 中,则没有机会进行遍历。
8.2 mapiternext 遍历下一个元素
map 遍历的核心在于理解 2 倍扩容时,老 bucket 会分裂到 2 个新 bucket 中去。而遍历操作,可能会按照新 bucket 的序号顺序进行,碰到老 bucket 未搬迁的情况时,要在老 bucket 中找到将来要搬迁到新 bucket 来的 key。
func mapiternext(it *hiter) {
// 每一次元素遍历都需要调用一次 mapiternext 函数,
// 例如:如果 map 中有10个值,则需要调用 10 次 mapiternext
// 调用由 for ... range 完成,通过 it.key = nil,it.elem = nil 终止调用 mapiternext
// h 为正在迭代的底层 hmap 实例
h := it.h
if raceenabled {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiternext))
}
// 正在插入、修改、删除 key 但时候,不能获取下一个 key,
// 注意:这不能保证插入、修改、删除之后,进行迭代。
if h.flags&hashWriting != 0 {
fatal("concurrent map iteration and map write")
}
t := it.t // map 中存储的 key、val 类型 *maptype
bucket := it.bucket // 当前遍历到的 bucket 下标(int 类型),从随机的 startBucket 开始,每次调用 mapiternext 重新赋值
b := it.bptr // 当前正在遍历的 bucket 指针,这个值存在的时候,不用重新的再去找 bucket 了
i := it.i // 迭代器当前遍历到的位置(bucket 内 key 的位置)
checkBucket := it.checkBucket // 当前 bucket 是否需要检查迁移裂变位置 x,y
next:
// 首次遍历 b == nil,为其赋初始值 startBucket
// 遍历溢出桶 b == nil,表示该 bucket 链表遍历完毕,可以为其赋值下一个要遍历的 bucket 链表了
if b == nil {
// 遍历完成,把迭代器中 key,elem 置为 nil,退出循环
// for...range 查看到 key 是 nil 会中止迭代
if bucket == it.startBucket && it.wrapped {
// end of iteration
it.key = nil
it.elem = nil
return
}
// 首先得知道,迭代器是根据快照内容进行遍历的
// 1.如果 it 快照指向了 oldbuckets,直接遍历 oldbuckets 就行了,不用管迁不迁移
// 因为 oldbuckets 内存不会被清除,但 h.oldbuckets 字段在迁移完毕时,会等于 nil
// 2.如果 it 快照指向了新 buckets && 2倍扩容过程中,需要按2倍下标遍历,
// 才需要考虑当前 bucket 所属旧 bucket 是否被迁移,
// 如果没有被迁移,则需要去旧桶读取数据,等量扩容直接读取旧数据就可以了,2倍扩容需要按 key 裂变位置读取。
// hmap 正在扩容 && 迭代器快照 B == hmap.B
// 1. 迭代器初始化的时候,hmap 已经在扩容过程中,此时迭代器指向新 buckets
// 2. 迭代器初始化后,hmap 才发生扩容,但为等量扩容,此时迭代器指向旧 buckets
if h.growing() && it.B == h.B {
// 如果迭代器是在扩容过程中启动的,扩容尚未完成。
// 如果我们正在查看的 bucket 尚未迁移,说明数据在 oldbucket 中,
// 那么我们需要遍历旧存储桶,同时只返回将迁移到此存储桶的数据。
// 那些需要迁移到另一个下标的桶则跳过。
// 通过 &mask 的方式获取旧存储桶的下标
oldbucket := bucket & it.h.oldbucketmask()
// 获取旧存储桶中当前遍历 bucket 地址
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 是否迁移完成
if !evacuated(b) {
// 旧 bucket 未进行迁移
// 需要检查 bucket 的裂变情况 checkBucket 为要检查的 bucket 下标
checkBucket = bucket
} else {
// bucket 已迁移(在新的也有数据,在旧的也有数据)
// 直接用迭代器指向的 buckets 遍历即可
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
// 1.不在扩容中(未进行扩容 || 扩容结束),直接去迭代器指向的 it.buckets 读取数据即可
// 2.在扩容中 && it.B != h.B -> 表示 it.buckets 指向了 oldbuckets,
// 里边有数据,直接遍历就行,不用在意迁移过程
// 第2种情况下,这里如果有数据插入到新桶,会导致遍历不到,所以不建议并发执行遍历和插入操作
// 而且 key 插入具有随机性,也有可能遍历不到
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
// bucket 下标加1,指向下一个 bucket
bucket++
// 判断下标是否是最后一个 bucket (bucketShift(it.B) 计算最大桶的下标)
if bucket == bucketShift(it.B) {
// 已经从第一个随机定位的 bucket 遍历到最后一个 bucket 了,下一个应该是第一个 bucket 了
// 下标从 0 开始,继续循环(达到环形数组的效果,前面的 bucket 可能还未遍历)
bucket = 0
// 标志位:已经从桶数组的末尾环绕到开始
it.wrapped = true
}
// 开始遍历新的 bucket 的时候,重置 i
i = 0
}
// 遍历当前 bucket 的 8 个 cell
for ; i < bucketCnt; i++ {
// 计算桶内 cell 下标:从随机的 it.offset 下标开始遍历
offi := (i + it.offset) & (bucketCnt - 1)
// 如果槽是空的,或者 key 已经迁移,则跳过。(利用 tophash 的标志位判断)
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
// TODO: emptyRest is hard to use here, as we start iterating
// in the middle of a bucket. It's feasible, just tricky.
continue
}
// 定位第 i 个槽的 key
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 定位第 i 个槽的 value
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))
// 需要检查迁移后裂变的位置 && 增量扩容 -> 需要判断迁移之后的 key 落入的是 h.buckets 的前半部分还是后半部分(x 还是 y)
// 前半部分 x 则需要在旧存储桶中获取属于该裂变部分的值
// 后半部分 y 则跳过,后边还会遍历到 y 所属的新的 bucket,那个时候再进行处理即可
if checkBucket != noCheck && !h.sameSizeGrow() {
// Special case: iterator was started during a grow to a larger size
// and the grow is not done yet. We're working on a bucket whose
// oldbucket has not been evacuated yet. Or at least, it wasn't
// evacuated when we started the bucket. So we're iterating
// through the oldbucket, skipping any keys that will go
// to the other new bucket (each oldbucket expands to two
// buckets during a grow).
// 如果 k 是可比较的
if t.reflexivekey() || t.key.equal(k, k) {
// If the item in the oldbucket is not destined for
// the current new bucket in the iteration, skip it.
hash := t.hasher(k, uintptr(h.hash0))
// k 的 hash 后最终落到地 bucket 不是 checkBucket(意味着是后半部分),则跳过
// 寻找被迁移到 checkBucket 的 key
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {
// Hash isn't repeatable if k != k (NaNs). We need a
// repeatable and randomish choice of which direction
// to send NaNs during evacuation. We'll use the low
// bit of tophash to decide which way NaNs go.
// NOTE: this case is why we need two evacuate tophash
// values, evacuatedX and evacuatedY, that differ in
// their low bit.
// 对于不可比较的 key,
// 是由 tophash 的最低位来决定迁移到前半部分还是后半部分的。
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}
}
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey() || t.key.equal(k, k)) {
// This is the golden data, we can return it.
// OR
// key!=key, so the entry can't be deleted or updated, so we can just return it.
// That's lucky for us because when key!=key we can't look it up successfully.
// 没有进行迁移 || key 不可比较
// key!=key,所以该条目不能被删除或更新,所以我们只能返回它。
// 这对我们来说很幸运,因为当 key!=key 时我们无法成功查找它。
it.key = k
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
// The hash table has grown since the iterator was started.
// The golden data for this key is now somewhere else.
// Check the current hash table for the data.
// This code handles the case where the key
// has been deleted, updated, or deleted and reinserted.
// NOTE: we need to regrab the key as it has potentially been
// updated to an equal() but not identical key (e.g. +0.0 vs -0.0).
// 自迭代器启动以来,哈希表已扩容。
// key 已经被迁移到其他地方,所以需要检查当前哈希表中的数据。
// 此代码处理已删除、更新或删除并重新插入 key 的情况。
// 注意:我们需要重新标记 key,因为它可能已更新为 equal() 但不是相同的 key(例如 +0.0 vs -0.0)。
// 获取当前遍历到的 key/elem。
rk, re := mapaccessK(t, h, k)
if rk == nil {
continue // key has been deleted
}
it.key = rk
it.elem = re
}
// 每遍历一个元素,调用一次 mapiternext 函数,找到 key/elem 就返回
// 记录迭代器当前迭代内容,然后进行下一次迭代
// 记录下一个要遍历的 bucket 下标
it.bucket = bucket
// 记录当前正在遍历的 bucket
if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
it.bptr = b
}
// 记录遍历到哪个槽了
it.i = i + 1
// 记录是否需要检查 key 迁移裂变位置
it.checkBucket = checkBucket
return
}
// for 循环 8 个 cell 都没有 key 需要返回,则继续遍历溢出的桶
b = b.overflow(t)
i = 0
goto next
}
通过源码分析,我们知道迭代器会给 hmap 初始化快照,当迭代器初始化后, map 发生扩容,此时迭代器快照是指向 oldbuckets 数组;如果此时插入、修改或删除值,按源码来看,会先对 bucket 进行迁移,然后插入新的 bucket,然而迭代器会根据 oldbuckets 进行遍历,则无法遍历到改变的值,因此官方是不建议遍历 map 和写操作并发执行的。
小结
通过对 map 底层源码的分析,解决了很多我对 map 特性的困惑,这里总结一下:
- map 是引用类型:因为创建 map 的函数返回了一个指针。
- map 遍历是无序的:桶起点和槽偏移点都是随机的。
- map 是非线程安全的:并发读可以,并发读写、写写会 panic,写操作发生时,会标记 hmap.flags, 如果同时有其他协程读写,会抛出错误,导致 panic。
- map 使用链表法解决哈希冲突:一个 bucket 有 8个槽 + overflow 指针指向 bmap 链表。
- map 扩容分为2倍扩容和等量扩容两种,扩容触发操作发生在插入 key 期间,触发条件为装载因子和过多的溢出桶。
- map 没有缩容机制,容量不会变小,即使删除 key,也不会减少 map 内存;由于没有缩容机制 map 内存存在一直变大的风险,所以大规模的存储尽量别用 map,要不会把你的内存干爆;后续希望官方增加缩容代码,看起来实现比较复杂。
- map 的迁移是渐进式的,迁移动作发生在写操作期间,写操作发生时,每次最多迁移两个桶,最少迁移一个桶。
- 删除不存在的 key 不会发生错误;查询不存在的 key,会返回对应类型的零值。
历经 2 周终于把 map 源码看完了,收获颇丰,这里整理出来分享给各位同学。
以上就是本文的全部内容,如果觉得还不错的话欢迎点赞,转发和关注,感谢支持。