sync.Map

sync.Map是什么?

sync.map
sync.map
sync.map

怎么解决map并发?

实现方式原理适用场景
map+Mutex通过Mutex互斥锁来实现多个goroutine对map的串行化访问读写都需要通过Mutex加锁和释放锁,适用于读写比接近的场景
map+RWMutex通过RWMutex来实现对map的读写进行读写锁分离加锁,从而实现读的并发性能提高同Mutex相比适用于读多写少的场景
sync.Map底层通分离读写map和原子指令来实现读的近似无锁,并通过延迟更新的方式来保证读的无锁化读多修改少,元素增加删除频率不高的情况,在大多数情况下替代上述两种实现

sync.Map的简单使用

package main

import (
	"fmt"
	"sync"
)

func main() {
	var m sync.Map

	// 写入数据
	m.Store("key1", "value1")
	m.Store("key2", "value2")

	// 读取数据
	value, found := m.Load("key1")
	if found {
		fmt.Println("Key1:", value)
	} else {
		fmt.Println("Key1 not found")
	}

	// 删除数据
	m.Delete("key2")

	// 遍历数据
	m.Range(func(key, value interface{}) bool {
		fmt.Printf("Key: %v, Value: %v\n", key, value)
		return true // 返回true以继续遍历,返回false停止遍历
	})
}

源码解析

从架构图我们可以看出,sync.Map是读写分离的

type Map struct {
  // 互斥锁
	mu Mutex
 // 存储read数据的readOnly指针类型,因为只能读所有可以并发,但是更新的时候要加锁
	read atomic.Pointer[readOnly]
 // 全量数据 dirty map	
	dirty map[any]*entry
 //确认dirty同步到read时机用的,记录未命中read,击中dirty的次数	
	misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
// readOnly 是一个 不可变的结构体,原子化存储在 sync.Map的read 字段中。
type readOnly struct {
    m       map[any]*entry // 其中map[key],key为泛型,任意类型。
    amended bool           // true if the dirty map contains some key not in m.
    // amended (简单说,就是记录一种状态的,标记dirty和read有无差异)
    // 1 :有差异,返回true。
    // 2 :没有差异。返回false
}
type entry struct {
    p atomic.Pointer[any] // p 泛型指针
}

这里的sync.Map里面存储的entry,有三种值的情况。

  1. nil 空值
  2. expunged 标记空泛型new(any),代表被删除的泛型值。
  3. any 任意类型的非nil有效值。
p == nil
p == expunged

其他情况,p 指向一个正常的值,

dirty同步到read的时候,他们两个map,都指向同一个&entry,这样在后面我们就可以通过cas修改值,修改后,无论从read还是dirty读取,都会是新的值

atomic.Pointer常用的方法

// 原子化加载 指针的值
func (x *Pointer[T]) Load() *T { return (*T)(LoadPointer(&x.v)) }

// 原子化存储,指针的值
func (x *Pointer[T]) Store(val *T) { StorePointer(&x.v, unsafe.Pointer(val)) }

// 原子化,交换指针的值
func (x *Pointer[T]) Swap(new *T) (old *T) { return (*T)(SwapPointer(&x.v, unsafe.Pointer(new))) }

// cas
func (x *Pointer[T]) CompareAndSwap(old, new *T) (swapped bool) {
    return CompareAndSwapPointer(&x.v, unsafe.Pointer(old), unsafe.Pointer(new))
}

missLocked

missLocked
func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	// dirty map 晋升
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

直接将 misses 的值加 1,表示一次未命中,如果 misses 值小于 m.dirty 的长度,就直接返回。否则,将 m.dirty 晋升为 read,并清空 dirty,(!!!注意这里是清空而不是同步)清空 misses 计数值。这样,之前一段时间新加入的 key 都会进入到 read 中,从而能够提升 read 的命中率

Load

// Load returns the value stored in the map for a key, or nil if no
// Load方法 返回 map中key 存储的值,或者返回nil,如果现在没有值。
// value is present.
// The ok result indicates whether value was found in the map.
// ok返回值 表示 这个值是否存在map中。
func (m *Map) Load(key any) (value any, ok bool) {
  // 不加锁直接读
    read := m.loadReadOnly()
    e, ok := read.m[key]
    // 当read中没有,但dirty map中有值的时候,从dirty读取一次。
    if !ok && read.amended {
        // 从dirty map 中读值,需要加锁。
        // 同样双检查
        m.mu.Lock()
        // Avoid reporting a spurious miss if m.dirty got promoted while we were
        // blocked on m.mu. (If further loads of the same key will not miss, it's
        // not worth copying the dirty map for this key.)
        read = m.loadReadOnly()
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // Regardless of whether the entry was present, record a miss: this key
            // 不管这条entry记录,是否存在,记录一次 未击中。m.misses 加一次
            // will take the slow path until the dirty map is promoted to the read
            // 这个key,会一直 走到这一个缓慢的逻辑中,直到dirty map升格到read map 中。
            // map.
            m.missLocked()
        }
        m.mu.Unlock()
    }
    // 当 dirty 和 read map中,都为false,没有这个key时候,直接返回。
    if !ok {
        return nil, false
    }
    // 当ok,有这个key的时候,返回read map中的值
    return e.load()
}
func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

总结:

  • 首先尝试不加锁直接读read,如果有直接返回
  • read没有的话,加锁进行双检查再次读read,如果还是没读出来并且amended为false(read和dirty没差异),说明read和dirty都没有这个key,return nil
  • 如果read没读出来了并且amended为true,那么开始读dirty,并且调用missLocked()函数进行misses++(如果misses>=len(dirty))会把dirty晋升为read,dirty清空

Delete

// Delete deletes the value for a key.
// Delete 删除 指定key的值
func (m *Map) Delete(key any) {
    m.LoadAndDelete(key)
}

// LoadAndDelete deletes the value for a key, returning the previous value if any.
// LoadAndDelete 删除指定的key,返回先前的值。
// The loaded result reports whether the key was present.
// loaded 返回值,key是否存在。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
  // 首先试一下能不能不加锁删除,直接走情况2
    read := m.loadReadOnly()
    e, ok := read.m[key]
    // read中没有,且dirty map中的key后来新增过key
  // 情况1. 只dirty存在,那么删除delete
    if !ok && read.amended {
      // 加锁双检查
        m.mu.Lock()
        read = m.loadReadOnly()
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // 删除dirty中的key
            delete(m.dirty, key)
            // Regardless of whether the entry was present, record a miss: this key
            // will take the slow path until the dirty map is promoted to the read
            // map.
            // miss增加1,直到 dirty map 下一次同步数据 到 read map
            m.missLocked()
        }
        m.mu.Unlock()
    }
    // 情况2. read 和 dirty map 存在的时候,把entry记录的指针 置为nil
    if ok {
        return e.delete()
    }
  // 情况3. 都没有,直接返回
    return nil, false
}

总结:

  • 首先试试不加锁直接读,能读出来说明在read和dirty里面都有,那么直接cas为nil
  • 如果不在read里面,加锁再次进行双检查(这是为了防止加锁的时候read被dirty同步所以得再次检查一遍,后面双检查也是一样的道理),仍然不在read,即只在dirty,然后直接delete(m.dirty, key),然后调用missLocked()函数进行misses++(如果misses>=len(dirty))会把dirty晋升为read,dirty清空
  • 如果在read和dirty ,直接调cas把read和dirty的entry为nil
  • 两个都不在说明key根本不存在,直接return

Store

// Store sets the value for a key.
// Store 存储指定key的值
func (m *Map) Store(key, value any) {
    _, _ = m.Swap(key, value)
}

// Swap swaps the value for a key and returns the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
  // 首先尝试不加锁直接读
    read := m.loadReadOnly()
    // 1、key在 read map中, 更换key对应的值指针地址。
    if e, ok := read.m[key]; ok {
        if v, ok := e.trySwap(&value); ok {
          // dirty指定的 key 对应的值被标记为删除。
            if v == nil {
                return nil, false
            }
            return *v, true
        }
    }

    // 走到这里的情况为,1 不在read中  2在read中,不过被删除了。
    m.mu.Lock()
    read = m.loadReadOnly()
    if e, ok := read.m[key]; ok {
        // 1、如果key在 read map 存在
        // read map 中 这个key 对应的值为 nil
      // 并且被删除了
        if e.unexpungeLocked() {
            // The entry was previously expunged, which implies that there is a
            // 该 entry实体记录 先前已被删除,
            // non-nil dirty map and this entry is not in it.
            // 意味着  dirty map 不为nil,且 这个 entry 不在里面。
            // 写入dirty
            m.dirty[key] = e
        }
      // 只要在read中,cas
        if v := e.swapLocked(&value); v != nil {
            loaded = true
            previous = *v
        }
    } else if e, ok := m.dirty[key]; ok {
        // 2、如果key只在 dirty map 存在
        if v := e.swapLocked(&value); v != nil {
            loaded = true
            previous = *v
        }
    } else {
        // 3、key不在read和dirty map中 ,一个新key
      // key没有更新的时候,
        if !read.amended {
            // 如果 dirty中没有新key
            // We're adding the first new key to the dirty map.
            // 第一次 增加新key 到 dirty map
            // Make sure it is allocated and mark the read-only map as incomplete.
            // 重要! dirtyLocked 这个方法会初始化 dirty map
          // 这里是为了防止在加入之前,因为dirty晋升read后把dirty清空和amended设置为false,所以需要把read复制到dirty中
            m.dirtyLocked()
            // 复制了之后本来是没有差异的,但是下面必添加元素,所以肯定有差异,把 amended 改为 true
            m.read.Store(&readOnly{m: read.m, amended: true})
        }
        // 写入到 dirty map 中
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
    return previous, loaded
}

func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read := m.loadReadOnly()
	m.dirty = make(map[any]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

总结

  • 首先尝试不加锁直接读read,能读到直接cas改值,但是前提需要判断一下v==nil是否成立,成立的话说明dirty这个Key已经被删除了,所以要返回nil,不成立说明读取成功
  • 加锁双检查再次读read,没有读到,再读dirty,还是没有读到,说明这是一个新Key,如果这个时候dirty和read一样,就会调用m.dirtyLocked()函数把read复制到dirty中,再补amended=true。并且最后都会添加到dirty
  • 加锁双检查再次读read,没有读到,再读dirty成功,说明key只在 dirty 存在,直接cas改dirty
  • 加锁双检查直接读到read,看看dirty里面这个k有没有被删,删了就补一下,最后都会用cas改

Range

func (m *Map) Range(f func(key, value any) bool) {
    // We need to be able to iterate over all of the keys that were already
    // present at the start of the call to Range.
    // If read.amended is false, then read.m satisfies that property without
    // requiring us to hold m.mu for a long time.
    read := m.loadReadOnly()
    if read.amended {
        // m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
        // (assuming the caller does not break out early), so a call to Range
        // amortizes an entire copy of the map: we can promote the dirty copy
        // immediately!
        m.mu.Lock()
        read = m.loadReadOnly()
        if read.amended {
            read = readOnly{m: m.dirty}
            m.read.Store(&read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }

    for k, e := range read.m {
        v, ok := e.load()
        if !ok {
            continue
        }
        if !f(k, v) {
            break
        }
    }
}

总结

  • 判断read和dirty是否一样,一样的话直接遍历读read
  • 不一样,先把dirty晋升成read,再遍历read

举例分析

amended 字段来标记 dirty 和 read 有无差异。我们可以总结一下几种情况。

状况amended说明
新建sync.Map{}false
第1次Storetrue第一次Store后,dirty有1个值,read为0
第1次Load读false第一次读,m.misses为1,此时len(dirty)为1,len(dirty)>=misses时候,会进行第一次晋升,dirty为nil,此时amended改成false
第2次Storetrue第2次Store后,dirty有2个值,read为1,此时amended改成true,标记他们有差异
第n次----------
第一次Store

很明显会走:

else {
		if !read.amended {
			// We're adding the first new key to the dirty map.
			// Make sure it is allocated and mark the read-only map as incomplete.
			m.dirtyLocked()
			m.read.Store(&readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read := m.loadReadOnly()
	m.dirty = make(map[any]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

这个时候dirty有一个值,read没有,amended为true

第一次Load

由于read没有,所以读dirty,并且调用m.missLocked(),m.misses为1,此时len(dirty)为1,len(dirty)>=misses时候,会进行第一次晋升,dirty晋升为read,dirty被清空为nil, amended为false

if !ok && read.amended {
		m.mu.Lock()
		// Avoid reporting a spurious miss if m.dirty got promoted while we were
		// blocked on m.mu. (If further loads of the same key will not miss, it's
		// not worth copying the dirty map for this key.)
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			// Regardless of whether the entry was present, record a miss: this key
			// will take the slow path until the dirty map is promoted to the read
			// map.
			m.missLocked()
		}
		m.mu.Unlock()
	}
func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}
第二次Store
else {
		if !read.amended {
			// We're adding the first new key to the dirty map.
			// Make sure it is allocated and mark the read-only map as incomplete.
			m.dirtyLocked()
			m.read.Store(&readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read := m.loadReadOnly()
	m.dirty = make(map[any]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

read.amended 为false,进第一个if,调用m.dirtyLocked()将read拷贝到dirty,然后把amended标记为true,因为后面dirty添加一个后为2个,但是read还是1个

参考

https://segmentfault.com/a/1190000043399164