我们知道 golang 的 map 并发会有问题,所以 go 官方在 sync 包中加入了一个 sync.map 来作为一个官方的并发安全的 map 实现。
ConcurrentHashMap
那么 go 里面是如何设计的呢?今天我们就来看看它是怎么实现的。
PS: 本文 go 源码基于版本1.16.2,我觉得当有了泛型之后这个库十有八九是要改的….
数据结构定义
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
// 注意这个标志,这个翻译为修改过的,如果为 true 表示 dirty 和 read 是不一致的了
amended bool // true if the dirty map contains some key not in m.
}
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
p unsafe.Pointer // *interface{}
}
可以看到,它的数据结构非常简单
- mu: dirty 操作所需要使用的锁
- read: 里面存储的数据只读
- dirty: 最新的数据,但是需要加锁访问
- misses: 读取 miss 计数器
刚看到这个数据结构的时候,就可以猜测一下,通过这个命名,感觉像是这样的:读写分离,读取全部从 read 中读取;修改操作记录在 dirty 中,然后当 miss 达到一个指标后进行 dirty 和 read 的交换;当然这是猜测,让我们往下看看源码实现。
方法实现
Load
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 首先从 read 中获取,因为 read 只读,所以不加锁
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果从 read 没有获取到,则需要验证 amended 标志是否为true
// 如果 amended 为true,则证明 read 和 dirty 不一致,则需要从 dirty 中获取
if !ok && read.amended {
m.mu.Lock()
// double check 因为当加锁之后,可能 read 的数据已经修改,所以需要再次验证
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 从 dirty 中获取
e, ok = m.dirty[key]
// 注意获取之后需要增加 misses 标志数值,证明这次 read 没读到,是从 dirty 中获取的
m.missLocked()
}
m.mu.Unlock()
}
// 如果 dirty 中也没有那么就是没有这个键对应的值
if !ok {
return nil, false
}
// 如果一开始已经从 read 中获取到,那么直接 load 就可以了
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// 当 misses 标志数值达到了 dirty 的长度,那么就需要将 dirty 赋值给 read,让 read 存储最新的数据
// 并且把 dirty 清空,misses 标志数值清零
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
- 优先读取 read
- read 没有就看 dirty 和 read 是否一致
- 不一致就从 dirty 中读取
- 每次 miss 增加 miss 数值,当 miss 数值到达 dirty 长度的时候就重新将 dirty 赋值给 read
这里利用到了一个常用的并发操作,double check,因为验证和加锁并不是一个原子操作,所以需要二次确认,加锁之后是否还满足原来的加锁条件
Store
func (m *Map) Store(key, value interface{}) {
// 先从 read 中获取,如果存在则尝试直接修改,注意这里的修改是一个原子操作,并且判断了这个 key 是否已经被删除
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
// double check
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
// 这里比较难理解一点:
// 如果加锁之后,read 能获取到,并且被标记为删除,则认为 dirty 肯定不为空,并且这个 key 不在 dirty 中,所以需要在 dirty 中添加这个 key
m.dirty[key] = e
}
// 然后将 value 值更新到 entry 中
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果 read 中还是不存在,但是 dirty 中存在,则直接更新
e.storeLocked(&value)
} else {
// 如果read,dirty都没有,则是一个新的 key
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
// 如果之前 read 没有被修改过,则需要初始化 dirty
m.dirtyLocked()
// 并且需要更行 read 为 已经被修改过
m.read.Store(readOnly{m: read.m, amended: true})
}
// 此时 dirty 已经被初始化过了,直接添加新的 key 就可以了
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
// 如果这个键已经被删除了,则直接返回
if p == expunged {
return false
}
// 如果还存在,则直接修改,利用 cas 原子操作
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 注意这里有一个细节,需要验证是否已经被删除,如果已经被删除无需要添加到 dirty 中
// 并且内部将 nil 的 entry 设置为 expunged
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
存储的时候相对复杂一些:
- 如果 read 中存在则直接尝试更新
- 如果 read 中不存在则加锁
- double check
- 如何二次确认时已经 read 中存在则需要判断是否已经被删除,如果已经删除需要在 dirty 中加回来
- 如果二次确认 read 中不存在,则查看 dirty 中是否存在, 如果存在直接更新
- 如果 read 和 dirty 中都不存在,则是一个新的 key
- 如果 dirty 还没有被初始化,则先初始化
- 最后更新 dirty 即可
需要把握的细节是:
- read 和 dirty 可能不一致,dirty 有就可以操作
- double check 之后 key 也有可能被删除
Delete
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果 read 中没有,就尝试去 dirty 中找
if !ok && read.amended {
m.mu.Lock()
// 这里也是 double check
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 反正只要 dirty 中有就直接操作就好了
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
// 这里是细节需要增加 miss 的计数,因为这里本质也是一个 load 操作, LoadAndDelete 嘛
m.missLocked()
}
m.mu.Unlock()
}
// 如果 read 中有,就直接操作就好了
if ok {
return e.delete()
}
return nil, false
}
// 注意这里上面的所有删除操作都是直接 cas 交换为 nil,并不是很多人说的标记为 expunged,expunged 是在 read 重新赋值个 dirty 初始化的时候才进行的
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
删除比较简单
- 如果 read 中存在则直接操作
- 如果 read 中不存在则需要去 dirty 中找
- 删除的时候只需要设置为 nil 就可以了
Range
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
// 如果有修改过,那么 dirty 中的数据才是最新的,所以需要重新赋值给 read 然后读取
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
理解了前面几个方法,range 就很简单了,就只需要知道如果已经修改过的时候需要遍历的是 dirty 的,因为 dirty 才是最新的
设计原理
那么 sync.map 的设计原理是什么呢?其实非常简单:
- 读写分离,读取大多数情况下会直接读 read 不需要加锁
- 懒更新,删除标记一下不需要额外操作其他数据结构
所以整体设计真的非常简单易懂,没有复杂的数据结构,当然这样的设计其实性能上并不会满足所有情况的要求
总结
实际使用
其实我实际在很多代码中更多的时候看到的是,使用 sync.Mutex 或者 RWMutex 来实现的,如:https://blog.golang.org/maps#TOC_6. 一方面我 sync.Map 使用起来必须进行 interface 转换,代码写起来比较麻烦,还需要额外进行封装一层;还有就是当前性能还没有那么极致的追求,所以很多时候也够用。
适用场景
- 读多写少的场景
- 多个goroutine读/写/修改的key集合没有交集的场景
- 压测后 sync.Map 确实能带来性能提升的场景
其他场景其实个人也并不建议去使用
为什么 go 不采用类似 java 的实现
其实肯定有人已经尝试过了 https://github.com/orcaman/concurrent-map 但是不采纳的原因也很简单,go 官方一直秉承的就是 less is more 他们很多地方都认为简单一点是好事,一旦复杂了,就会变得原来越难以维护。其实个人认为还是等待泛型的出现,否则轮子要重新造一次,有点累。