我们知道 golang 的 map 并发会有问题,所以 go 官方在 sync 包中加入了一个 sync.map 来作为一个官方的并发安全的 map 实现。

ConcurrentHashMap

那么 go 里面是如何设计的呢?今天我们就来看看它是怎么实现的。

PS: 本文 go 源码基于版本1.16.2,我觉得当有了泛型之后这个库十有八九是要改的….

数据结构定义

type Map struct {
    mu Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry
    misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
   m       map[interface{}]*entry
   // 注意这个标志,这个翻译为修改过的,如果为 true 表示 dirty 和 read 是不一致的了
   amended bool // true if the dirty map contains some key not in m.
}
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
   p unsafe.Pointer // *interface{}
}

可以看到,它的数据结构非常简单

  • mu: dirty 操作所需要使用的锁
  • read: 里面存储的数据只读
  • dirty: 最新的数据,但是需要加锁访问
  • misses: 读取 miss 计数器

刚看到这个数据结构的时候,就可以猜测一下,通过这个命名,感觉像是这样的:读写分离,读取全部从 read 中读取;修改操作记录在 dirty 中,然后当 miss 达到一个指标后进行 dirty 和 read 的交换;当然这是猜测,让我们往下看看源码实现。

方法实现

Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
   // 首先从 read 中获取,因为 read 只读,所以不加锁
   read, _ := m.read.Load().(readOnly)
   e, ok := read.m[key]
   
   // 如果从 read 没有获取到,则需要验证 amended 标志是否为true
   // 如果 amended 为true,则证明 read 和 dirty 不一致,则需要从 dirty 中获取
   if !ok && read.amended {
      m.mu.Lock()
      // double check 因为当加锁之后,可能 read 的数据已经修改,所以需要再次验证
      read, _ = m.read.Load().(readOnly)
      e, ok = read.m[key]
      if !ok && read.amended {
         // 从 dirty 中获取
         e, ok = m.dirty[key]
         // 注意获取之后需要增加 misses 标志数值,证明这次 read 没读到,是从 dirty 中获取的
         m.missLocked()
      }
      m.mu.Unlock()
   }
   // 如果 dirty 中也没有那么就是没有这个键对应的值
   if !ok {
      return nil, false
   }
   // 如果一开始已经从 read 中获取到,那么直接 load 就可以了
   return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
   p := atomic.LoadPointer(&e.p)
   if p == nil || p == expunged {
      return nil, false
   }
   return *(*interface{})(p), true
}

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    // 当 misses 标志数值达到了 dirty 的长度,那么就需要将 dirty 赋值给 read,让 read 存储最新的数据
    // 并且把 dirty 清空,misses 标志数值清零
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}
  1. 优先读取 read
  2. read 没有就看 dirty 和 read 是否一致
  3. 不一致就从 dirty 中读取
  4. 每次 miss 增加 miss 数值,当 miss 数值到达 dirty 长度的时候就重新将 dirty 赋值给 read

这里利用到了一个常用的并发操作,double check,因为验证和加锁并不是一个原子操作,所以需要二次确认,加锁之后是否还满足原来的加锁条件

Store

func (m *Map) Store(key, value interface{}) {
   // 先从 read 中获取,如果存在则尝试直接修改,注意这里的修改是一个原子操作,并且判断了这个 key 是否已经被删除
   read, _ := m.read.Load().(readOnly)
   if e, ok := read.m[key]; ok && e.tryStore(&value) {
      return
   }

   m.mu.Lock()
   // double check
   read, _ = m.read.Load().(readOnly)
   if e, ok := read.m[key]; ok {
      if e.unexpungeLocked() {
         // The entry was previously expunged, which implies that there is a
         // non-nil dirty map and this entry is not in it.
         // 这里比较难理解一点:
         // 如果加锁之后,read 能获取到,并且被标记为删除,则认为 dirty 肯定不为空,并且这个 key 不在 dirty 中,所以需要在 dirty 中添加这个 key
         m.dirty[key] = e
      }
      // 然后将 value 值更新到 entry 中
      e.storeLocked(&value)
   } else if e, ok := m.dirty[key]; ok {
      // 如果 read 中还是不存在,但是 dirty 中存在,则直接更新
      e.storeLocked(&value)
   } else {
      // 如果read,dirty都没有,则是一个新的 key
      if !read.amended {
         // We're adding the first new key to the dirty map.
         // Make sure it is allocated and mark the read-only map as incomplete.
         // 如果之前 read 没有被修改过,则需要初始化 dirty
         m.dirtyLocked()
         // 并且需要更行 read 为 已经被修改过
         m.read.Store(readOnly{m: read.m, amended: true})
      }
      // 此时 dirty 已经被初始化过了,直接添加新的 key 就可以了
      m.dirty[key] = newEntry(value)
   }
   m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
   for {
      p := atomic.LoadPointer(&e.p)
      // 如果这个键已经被删除了,则直接返回
      if p == expunged {
         return false
      }
      // 如果还存在,则直接修改,利用 cas 原子操作
      if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
         return true
      }
   }
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
   return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        // 注意这里有一个细节,需要验证是否已经被删除,如果已经被删除无需要添加到 dirty 中
        // 并且内部将 nil 的 entry 设置为 expunged
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

存储的时候相对复杂一些:

  1. 如果 read 中存在则直接尝试更新
  2. 如果 read 中不存在则加锁
  3. double check
  4. 如何二次确认时已经 read 中存在则需要判断是否已经被删除,如果已经删除需要在 dirty 中加回来
  5. 如果二次确认 read 中不存在,则查看 dirty 中是否存在, 如果存在直接更新
  6. 如果 read 和 dirty 中都不存在,则是一个新的 key
  7. 如果 dirty 还没有被初始化,则先初始化
  8. 最后更新 dirty 即可

需要把握的细节是:

  • read 和 dirty 可能不一致,dirty 有就可以操作
  • double check 之后 key 也有可能被删除

Delete

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 如果 read 中没有,就尝试去 dirty 中找
    if !ok && read.amended {
        m.mu.Lock()
        // 这里也是 double check
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 反正只要 dirty 中有就直接操作就好了
            e, ok = m.dirty[key]
            delete(m.dirty, key)
            // Regardless of whether the entry was present, record a miss: this key
            // will take the slow path until the dirty map is promoted to the read
            // map.
            // 这里是细节需要增加 miss 的计数,因为这里本质也是一个 load 操作, LoadAndDelete 嘛
            m.missLocked()
        }
        m.mu.Unlock()
    }
    // 如果 read 中有,就直接操作就好了
    if ok {
        return e.delete()
    }
    return nil, false
}

// 注意这里上面的所有删除操作都是直接 cas 交换为 nil,并不是很多人说的标记为 expunged,expunged 是在 read 重新赋值个 dirty 初始化的时候才进行的
func (e *entry) delete() (value interface{}, ok bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return nil, false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return *(*interface{})(p), true
        }
    }
}

删除比较简单

  1. 如果 read 中存在则直接操作
  2. 如果 read 中不存在则需要去 dirty 中找
  3. 删除的时候只需要设置为 nil 就可以了

Range

func (m *Map) Range(f func(key, value interface{}) bool) {
   read, _ := m.read.Load().(readOnly)
   if read.amended {
      m.mu.Lock()
      read, _ = m.read.Load().(readOnly)
      if read.amended {
         // 如果有修改过,那么 dirty 中的数据才是最新的,所以需要重新赋值给 read 然后读取
         read = readOnly{m: m.dirty}
         m.read.Store(read)
         m.dirty = nil
         m.misses = 0
      }
      m.mu.Unlock()
   }

   for k, e := range read.m {
      v, ok := e.load()
      if !ok {
         continue
      }
      if !f(k, v) {
         break
      }
   }
}

理解了前面几个方法,range 就很简单了,就只需要知道如果已经修改过的时候需要遍历的是 dirty 的,因为 dirty 才是最新的

设计原理

那么 sync.map 的设计原理是什么呢?其实非常简单:

  • 读写分离,读取大多数情况下会直接读 read 不需要加锁
  • 懒更新,删除标记一下不需要额外操作其他数据结构

所以整体设计真的非常简单易懂,没有复杂的数据结构,当然这样的设计其实性能上并不会满足所有情况的要求

总结

实际使用

其实我实际在很多代码中更多的时候看到的是,使用 sync.Mutex 或者 RWMutex 来实现的,如:https://blog.golang.org/maps#TOC_6. 一方面我 sync.Map 使用起来必须进行 interface 转换,代码写起来比较麻烦,还需要额外进行封装一层;还有就是当前性能还没有那么极致的追求,所以很多时候也够用。

适用场景

  1. 读多写少的场景
  2. 多个goroutine读/写/修改的key集合没有交集的场景
  3. 压测后 sync.Map 确实能带来性能提升的场景

其他场景其实个人也并不建议去使用

为什么 go 不采用类似 java 的实现

其实肯定有人已经尝试过了 https://github.com/orcaman/concurrent-map 但是不采纳的原因也很简单,go 官方一直秉承的就是 less is more 他们很多地方都认为简单一点是好事,一旦复杂了,就会变得原来越难以维护。其实个人认为还是等待泛型的出现,否则轮子要重新造一次,有点累。