sync.Map
sync.Map是什么?
sync.map
sync.map
sync.map
怎么解决map并发?
实现方式 | 原理 | 适用场景 |
---|---|---|
map+Mutex | 通过Mutex互斥锁来实现多个goroutine对map的串行化访问 | 读写都需要通过Mutex加锁和释放锁,适用于读写比接近的场景 |
map+RWMutex | 通过RWMutex来实现对map的读写进行读写锁分离加锁,从而实现读的并发性能提高 | 同Mutex相比适用于读多写少的场景 |
sync.Map | 底层通分离读写map和原子指令来实现读的近似无锁,并通过延迟更新的方式来保证读的无锁化 | 读多修改少,元素增加删除频率不高的情况,在大多数情况下替代上述两种实现 |
sync.Map的简单使用
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 写入数据
m.Store("key1", "value1")
m.Store("key2", "value2")
// 读取数据
value, found := m.Load("key1")
if found {
fmt.Println("Key1:", value)
} else {
fmt.Println("Key1 not found")
}
// 删除数据
m.Delete("key2")
// 遍历数据
m.Range(func(key, value interface{}) bool {
fmt.Printf("Key: %v, Value: %v\n", key, value)
return true // 返回true以继续遍历,返回false停止遍历
})
}
源码解析
从架构图我们可以看出,sync.Map是读写分离的
type Map struct {
// 互斥锁
mu Mutex
// 存储read数据的readOnly指针类型,因为只能读所有可以并发,但是更新的时候要加锁
read atomic.Pointer[readOnly]
// 全量数据 dirty map
dirty map[any]*entry
//确认dirty同步到read时机用的,记录未命中read,击中dirty的次数
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
// readOnly 是一个 不可变的结构体,原子化存储在 sync.Map的read 字段中。
type readOnly struct {
m map[any]*entry // 其中map[key],key为泛型,任意类型。
amended bool // true if the dirty map contains some key not in m.
// amended (简单说,就是记录一种状态的,标记dirty和read有无差异)
// 1 :有差异,返回true。
// 2 :没有差异。返回false
}
type entry struct {
p atomic.Pointer[any] // p 泛型指针
}
这里的sync.Map里面存储的entry,有三种值的情况。
- nil 空值
- expunged 标记空泛型new(any),代表被删除的泛型值。
- any 任意类型的非nil有效值。
p == nil
p == expunged
其他情况,p 指向一个正常的值,
dirty同步到read的时候,他们两个map,都指向同一个&entry,这样在后面我们就可以通过cas修改值,修改后,无论从read还是dirty读取,都会是新的值
atomic.Pointer常用的方法
// 原子化加载 指针的值
func (x *Pointer[T]) Load() *T { return (*T)(LoadPointer(&x.v)) }
// 原子化存储,指针的值
func (x *Pointer[T]) Store(val *T) { StorePointer(&x.v, unsafe.Pointer(val)) }
// 原子化,交换指针的值
func (x *Pointer[T]) Swap(new *T) (old *T) { return (*T)(SwapPointer(&x.v, unsafe.Pointer(new))) }
// cas
func (x *Pointer[T]) CompareAndSwap(old, new *T) (swapped bool) {
return CompareAndSwapPointer(&x.v, unsafe.Pointer(old), unsafe.Pointer(new))
}
missLocked
missLocked
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// dirty map 晋升
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
直接将 misses 的值加 1,表示一次未命中,如果 misses 值小于 m.dirty 的长度,就直接返回。否则,将 m.dirty 晋升为 read,并清空 dirty,(!!!注意这里是清空而不是同步)清空 misses 计数值。这样,之前一段时间新加入的 key 都会进入到 read 中,从而能够提升 read 的命中率
Load
// Load returns the value stored in the map for a key, or nil if no
// Load方法 返回 map中key 存储的值,或者返回nil,如果现在没有值。
// value is present.
// The ok result indicates whether value was found in the map.
// ok返回值 表示 这个值是否存在map中。
func (m *Map) Load(key any) (value any, ok bool) {
// 不加锁直接读
read := m.loadReadOnly()
e, ok := read.m[key]
// 当read中没有,但dirty map中有值的时候,从dirty读取一次。
if !ok && read.amended {
// 从dirty map 中读值,需要加锁。
// 同样双检查
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// 不管这条entry记录,是否存在,记录一次 未击中。m.misses 加一次
// will take the slow path until the dirty map is promoted to the read
// 这个key,会一直 走到这一个缓慢的逻辑中,直到dirty map升格到read map 中。
// map.
m.missLocked()
}
m.mu.Unlock()
}
// 当 dirty 和 read map中,都为false,没有这个key时候,直接返回。
if !ok {
return nil, false
}
// 当ok,有这个key的时候,返回read map中的值
return e.load()
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
总结:
- 首先尝试不加锁直接读read,如果有直接返回
- read没有的话,加锁进行双检查再次读read,如果还是没读出来并且amended为false(read和dirty没差异),说明read和dirty都没有这个key,return nil
- 如果read没读出来了并且amended为true,那么开始读dirty,并且调用missLocked()函数进行misses++(如果misses>=len(dirty))会把dirty晋升为read,dirty清空
Delete
// Delete deletes the value for a key.
// Delete 删除 指定key的值
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}
// LoadAndDelete deletes the value for a key, returning the previous value if any.
// LoadAndDelete 删除指定的key,返回先前的值。
// The loaded result reports whether the key was present.
// loaded 返回值,key是否存在。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
// 首先试一下能不能不加锁删除,直接走情况2
read := m.loadReadOnly()
e, ok := read.m[key]
// read中没有,且dirty map中的key后来新增过key
// 情况1. 只dirty存在,那么删除delete
if !ok && read.amended {
// 加锁双检查
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 删除dirty中的key
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
// miss增加1,直到 dirty map 下一次同步数据 到 read map
m.missLocked()
}
m.mu.Unlock()
}
// 情况2. read 和 dirty map 存在的时候,把entry记录的指针 置为nil
if ok {
return e.delete()
}
// 情况3. 都没有,直接返回
return nil, false
}
总结:
- 首先试试不加锁直接读,能读出来说明在read和dirty里面都有,那么直接cas为nil
- 如果不在read里面,加锁再次进行双检查(这是为了防止加锁的时候read被dirty同步所以得再次检查一遍,后面双检查也是一样的道理),仍然不在read,即只在dirty,然后直接delete(m.dirty, key),然后调用missLocked()函数进行misses++(如果misses>=len(dirty))会把dirty晋升为read,dirty清空
- 如果在read和dirty ,直接调cas把read和dirty的entry为nil
- 两个都不在说明key根本不存在,直接return
Store
// Store sets the value for a key.
// Store 存储指定key的值
func (m *Map) Store(key, value any) {
_, _ = m.Swap(key, value)
}
// Swap swaps the value for a key and returns the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
// 首先尝试不加锁直接读
read := m.loadReadOnly()
// 1、key在 read map中, 更换key对应的值指针地址。
if e, ok := read.m[key]; ok {
if v, ok := e.trySwap(&value); ok {
// dirty指定的 key 对应的值被标记为删除。
if v == nil {
return nil, false
}
return *v, true
}
}
// 走到这里的情况为,1 不在read中 2在read中,不过被删除了。
m.mu.Lock()
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
// 1、如果key在 read map 存在
// read map 中 这个key 对应的值为 nil
// 并且被删除了
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// 该 entry实体记录 先前已被删除,
// non-nil dirty map and this entry is not in it.
// 意味着 dirty map 不为nil,且 这个 entry 不在里面。
// 写入dirty
m.dirty[key] = e
}
// 只要在read中,cas
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok {
// 2、如果key只在 dirty map 存在
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else {
// 3、key不在read和dirty map中 ,一个新key
// key没有更新的时候,
if !read.amended {
// 如果 dirty中没有新key
// We're adding the first new key to the dirty map.
// 第一次 增加新key 到 dirty map
// Make sure it is allocated and mark the read-only map as incomplete.
// 重要! dirtyLocked 这个方法会初始化 dirty map
// 这里是为了防止在加入之前,因为dirty晋升read后把dirty清空和amended设置为false,所以需要把read复制到dirty中
m.dirtyLocked()
// 复制了之后本来是没有差异的,但是下面必添加元素,所以肯定有差异,把 amended 改为 true
m.read.Store(&readOnly{m: read.m, amended: true})
}
// 写入到 dirty map 中
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
return previous, loaded
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
总结
- 首先尝试不加锁直接读read,能读到直接cas改值,但是前提需要判断一下v==nil是否成立,成立的话说明dirty这个Key已经被删除了,所以要返回nil,不成立说明读取成功
- 加锁双检查再次读read,没有读到,再读dirty,还是没有读到,说明这是一个新Key,如果这个时候dirty和read一样,就会调用m.dirtyLocked()函数把read复制到dirty中,再补amended=true。并且最后都会添加到dirty
- 加锁双检查再次读read,没有读到,再读dirty成功,说明key只在 dirty 存在,直接cas改dirty
- 加锁双检查直接读到read,看看dirty里面这个k有没有被删,删了就补一下,最后都会用cas改
Range
func (m *Map) Range(f func(key, value any) bool) {
// We need to be able to iterate over all of the keys that were already
// present at the start of the call to Range.
// If read.amended is false, then read.m satisfies that property without
// requiring us to hold m.mu for a long time.
read := m.loadReadOnly()
if read.amended {
// m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
// (assuming the caller does not break out early), so a call to Range
// amortizes an entire copy of the map: we can promote the dirty copy
// immediately!
m.mu.Lock()
read = m.loadReadOnly()
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(&read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
总结
- 判断read和dirty是否一样,一样的话直接遍历读read
- 不一样,先把dirty晋升成read,再遍历read
举例分析
amended 字段来标记 dirty 和 read 有无差异。我们可以总结一下几种情况。
状况 | amended | 说明 |
---|---|---|
新建sync.Map{} | false | |
第1次Store | true | 第一次Store后,dirty有1个值,read为0 |
第1次Load读 | false | 第一次读,m.misses为1,此时len(dirty)为1,len(dirty)>=misses时候,会进行第一次晋升,dirty为nil,此时amended改成false |
第2次Store | true | 第2次Store后,dirty有2个值,read为1,此时amended改成true,标记他们有差异 |
第n次 | ----- | ----- |
第一次Store
很明显会走:
else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
这个时候dirty有一个值,read没有,amended为true
第一次Load
由于read没有,所以读dirty,并且调用m.missLocked(),m.misses为1,此时len(dirty)为1,len(dirty)>=misses时候,会进行第一次晋升,dirty晋升为read,dirty被清空为nil, amended为false
if !ok && read.amended {
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
第二次Store
else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
read.amended 为false,进第一个if,调用m.dirtyLocked()将read拷贝到dirty,然后把amended标记为true,因为后面dirty添加一个后为2个,但是read还是1个
参考
https://segmentfault.com/a/1190000043399164