1. 什么是线程安全?

多个线程在并行访问同一个对象时,线程安全的代码通过同步机制保证各个线程都可以正常且正确的执行,且都可以获得正确的结果,不会出现数据污染等情况,就表示这个对象是线程安全的。

2. slice与map的线程安全问题

首先明确一点,在多线程的情况下,slice和map默认都是线程不安全的

2.1 slice线程安全问题

看一下下面的这个例子

var w sync.WaitGroup
func sliceSafety() {
	var s []int
	var sum int
	fmt.Printf("----------: len(s): %d, cap(s): %d, s: %v \n", len(s), cap(s), s)
	for i := 0; i < 10; i++ {
        w.Add(1)
		go func(i int) {
            defer w.Done()
			sum++
			s = append(s, i)
			fmt.Printf("==========i: %d: len(s): %d, cap(s): %d, s: %v \n", i, len(s), cap(s), s)
		}(i)
	}
    w.Wait()
	fmt.Println(sum)
	fmt.Println(s, len(s))
}

执行结果

#第一次执行
----------: len(s): 0, cap(s): 0, s: [] 
==========i: 9: len(s): 2, cap(s): 2, s: [3 9] 
==========i: 1: len(s): 1, cap(s): 1, s: [1] 
==========i: 3: len(s): 1, cap(s): 1, s: [3] 
==========i: 2: len(s): 3, cap(s): 4, s: [3 9 2] 
==========i: 4: len(s): 4, cap(s): 4, s: [3 9 2 4] 
==========i: 7: len(s): 6, cap(s): 8, s: [3 9 2 4 5 7] 
==========i: 8: len(s): 6, cap(s): 8, s: [3 9 2 4 0 8] 
==========i: 5: len(s): 5, cap(s): 8, s: [3 9 2 4 5] 
==========i: 6: len(s): 7, cap(s): 8, s: [3 9 2 4 0 8 6] 
==========i: 0: len(s): 5, cap(s): 8, s: [3 9 2 4 0] 
10
[3 9 2 4 0 8 6] 7

#第二次执行
----------: len(s): 0, cap(s): 0, s: [] 
==========i: 0: len(s): 1, cap(s): 1, s: [0] 
==========i: 2: len(s): 3, cap(s): 4, s: [0 3 2] 
==========i: 9: len(s): 4, cap(s): 4, s: [0 3 2 9] 
==========i: 6: len(s): 5, cap(s): 8, s: [0 3 2 9 6] 
==========i: 7: len(s): 6, cap(s): 8, s: [0 3 2 9 6 7] 
==========i: 4: len(s): 7, cap(s): 8, s: [0 3 2 9 6 7 4] 
==========i: 8: len(s): 8, cap(s): 8, s: [0 3 2 9 6 7 4 8] 
==========i: 3: len(s): 2, cap(s): 2, s: [0 3] 
==========i: 5: len(s): 9, cap(s): 16, s: [0 3 2 9 6 7 4 8 5] 
==========i: 1: len(s): 9, cap(s): 16, s: [0 3 2 9 6 7 4 8 1] 
10
[0 3 2 9 6 7 4 8 5] 9

由结果可见,每一次的执行结果都不太一样,就算在同一次执行中,s切片中放的值也遭到了修改,如第一次执行结果中的:

==========i: 7: len(s): 6, cap(s): 8, s: [3 9 2 4 5 7] #第5位值为5
==========i: 8: len(s): 6, cap(s): 8, s: [3 9 2 4 0 8] #第5位值为0,说明相同索引下的共享值遭到了破坏
==========i: 5: len(s): 5, cap(s): 8, s: [3 9 2 4 5] 
==========i: 6: len(s): 7, cap(s): 8, s: [3 9 2 4 0 8 6] 
==========i: 0: len(s): 5, cap(s): 8, s: [3 9 2 4 0] 

因为是并发执行的,且没有做同步控制,那么不同线程读到相同的索引位的时候,后一个线程会把前一个线程在该索引位所放入的值覆盖掉。这里注意,slice切片是引用类型的,切片底层实际引用的是数组,所以不同线程读取的是同一个底层数组,当不同线程读到该切片底层数组的相同的索引位的时候,在并发情况下会产生竞争关系,导致共享元素的值遭到修改。slice底层实现原理可以参考go中文文档 https://www.topgoer.cn/docs/golang/chapter03-11

2.2 解决办法

那么解决方法是什么呢?就是做同步控制,让执行是串行的,具体实现是在对数据进行修改的前后加锁。

var w sync.WaitGroup
var m sync.Mutex
func sliceSafety() {
	var s []int
	var sum int
	fmt.Printf("----------: len(s): %d, cap(s): %d, s: %v \n", len(s), cap(s), s)
	for i := 0; i < 10; i++ {
		w.Add(1)
		go func(i int) {
			defer w.Done()
			m.Lock()
			sum++
			s = append(s, i)
			fmt.Printf("==========i: %d: len(s): %d, cap(s): %d, s: %v \n", i, len(s), cap(s), s)
			m.Unlock()
		}(i)
	}
	w.Wait()
	fmt.Println(sum)
	fmt.Println(s, len(s))
}

执行结果

# 第一次执行
----------: len(s): 0, cap(s): 0, s: [] 
==========i: 9: len(s): 1, cap(s): 1, s: [9] 
==========i: 7: len(s): 2, cap(s): 2, s: [9 7] 
==========i: 8: len(s): 3, cap(s): 4, s: [9 7 8] 
==========i: 6: len(s): 4, cap(s): 4, s: [9 7 8 6] 
==========i: 5: len(s): 5, cap(s): 8, s: [9 7 8 6 5] 
==========i: 1: len(s): 6, cap(s): 8, s: [9 7 8 6 5 1] 
==========i: 0: len(s): 7, cap(s): 8, s: [9 7 8 6 5 1 0] 
==========i: 2: len(s): 8, cap(s): 8, s: [9 7 8 6 5 1 0 2] 
==========i: 4: len(s): 9, cap(s): 16, s: [9 7 8 6 5 1 0 2 4] 
==========i: 3: len(s): 10, cap(s): 16, s: [9 7 8 6 5 1 0 2 4 3] 
10
[9 7 8 6 5 1 0 2 4 3] 10


# 第二次执行
----------: len(s): 0, cap(s): 0, s: [] 
==========i: 2: len(s): 1, cap(s): 1, s: [2] 
==========i: 9: len(s): 2, cap(s): 2, s: [2 9] 
==========i: 3: len(s): 3, cap(s): 4, s: [2 9 3] 
==========i: 4: len(s): 4, cap(s): 4, s: [2 9 3 4] 
==========i: 5: len(s): 5, cap(s): 8, s: [2 9 3 4 5] 
==========i: 6: len(s): 6, cap(s): 8, s: [2 9 3 4 5 6] 
==========i: 7: len(s): 7, cap(s): 8, s: [2 9 3 4 5 6 7] 
==========i: 8: len(s): 8, cap(s): 8, s: [2 9 3 4 5 6 7 8] 
==========i: 1: len(s): 9, cap(s): 16, s: [2 9 3 4 5 6 7 8 1] 
==========i: 0: len(s): 10, cap(s): 16, s: [2 9 3 4 5 6 7 8 1 0] 
10
[2 9 3 4 5 6 7 8 1 0] 10
sync.Mutexsync.RWMutex

2.3 map线程安全问题

看一下下面这个例子

var wg sync.WaitGroup
func mapThread() {
	mp := make(map[int]int)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			mp[i] = i
		}(i)
	}
	wg.Wait()
	fmt.Println(len(mp))
}

执行结果

fatal error: concurrent map writes	# map并发写入

goroutine 15 [running]:
runtime.throw({0x10a495b?, 0x0?})
	/usr/local/go/src/runtime/panic.go:992 +0x71 fp=0xc000042f48 sp=0xc000042f18 pc=0x102f411
runtime.mapassign_fast64(0x0?, 0x0?, 0x9)
	/usr/local/go/src/runtime/map_fast64.go:102 +0x2c5 fp=0xc000042f80 sp=0xc000042f48 pc=0x100f3a5
main.mapThread.func1(0x9)
	/Users/anker/kevin_go/src/go_learning/01_base/23_go_map_thread_safety/01_map.go:16 +0x6e fp=0xc000042fc8 sp=0xc000042f80 pc=0x1089eae
main.mapThread.func2()
	/Users/anker/kevin_go/src/go_learning/01_base/23_go_map_thread_safety/01_map.go:17 +0x2a fp=0xc000042fe0 sp=0xc000042fc8 pc=0x1089e0a
runtime.goexit()
	/usr/local/go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc000042fe8 sp=0xc000042fe0 pc=0x105a8a1
created by main.mapThread
	/Users/anker/kevin_go/src/go_learning/01_base/23_go_map_thread_safety/01_map.go:13 +0x3c

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc00000c108?)
	/usr/local/go/src/runtime/sema.go:56 +0x25
sync.(*WaitGroup).Wait(0x60?)
	/usr/local/go/src/sync/waitgroup.go:136 +0x52
main.mapThread(0xa)
	/Users/anker/kevin_go/src/go_learning/01_base/23_go_map_thread_safety/01_map.go:19 +0xf6
main.main()
	/Users/anker/kevin_go/src/go_learning/01_base/23_go_map_thread_safety/01_map.go:24 +0x1e

goroutine 14 [runnable]:
main.mapThread.func1(0x8)
	/Users/anker/kevin_go/src/go_learning/01_base/23_go_map_thread_safety/01_map.go:16 +0x6e
created by main.mapThread
	/Users/anker/kevin_go/src/go_learning/01_base/23_go_map_thread_safety/01_map.go:13 +0x3c

进程 已完成,退出代码为 2
fatal error: concurrent map writes

2.4 解决办法

解决方法依然是加锁

var wg sync.WaitGroup
var m sync.Mutex
func mapThread() {
	mp := make(map[int]int)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			m.Lock()
			mp[i] = i
			m.Unlock()
		}(i)
	}
	wg.Wait()
	fmt.Println(len(mp))
}

执行结果

10

进程 已完成,退出代码为 0

由结果可知,此时不再发生线程安全问题

sync.Map

2.5 sync.Map详解

sync.Map的使用和普通map的使用还是有挺大差别的,包括键和值的存取方式等,如下是sync.Map的一些操作

func main() {
	var ma sync.Map
	//存储
	ma.Store("key1", "你好")
	if a, ok := ma.Load("key1"); ok {
		fmt.Println("====1:", a)
	}
	//删除
	ma.Delete("key1")
	if a, ok := ma.Load("key1"); ok {
		fmt.Println("====2:", a)
	}
	// LoadOrStore 获取值,如果没有则存储
	if b, ok := ma.LoadOrStore("key2", "我曾经没有"); ok {
		fmt.Println("----1:", b)
	}
	if b, ok := ma.Load("key2"); ok {
		fmt.Println("----2:", b)
	}
	//删除,有bool判断是否存在
	if b, ok := ma.LoadAndDelete("key2"); ok {
		fmt.Println("----3:", b)
	}

	ma.Store("key1", "你好!")
	ma.Store("key2", "你好!!")
	ma.Store("key3", "你好!!!")
	//无需遍历
	ma.Range(func(key, value any) bool {
		fmt.Printf("key:%s ,value:%s \n", key, value)
		//如果返回:false,则退出循环,
		return true
	})
}

执行结果

====1: 你好
----2: 我曾经没有
----3: 我曾经没有
key:key1 ,value:你好! 
key:key2 ,value:你好!! 
key:key3 ,value:你好!!! 

进程 已完成,退出代码为 0

从sync.Map的使用方式来看,并不需要自己主动加锁,其在内部已经实现了锁的机制,用sync.Map实现刚才的并发代码会发生什么呢?如下:

var g sync.WaitGroup
func syncMapThread(l int) {
	var ma sync.Map
	for i := 0; i < l; i++ {
		g.Add(1)
		go func(i int) {
			defer g.Done()
			ma.Store(i, i)
		}(i)
	}
	g.Wait()
	ma.Range(func(key, value any) bool {
		fmt.Println(key, value)
		return true
	})
}

执行结果

7 7
8 8
1 1
4 4
0 0
5 5
6 6
9 9
3 3
2 2

由结果可知,可以正常执行且得到正确结果,而且不需要自己额外的手动加锁,

关于sync.Map和原生map+互斥锁Mutex或读写锁RWMutex在使用上的性能分析,可以参考煎鱼大佬的这篇文章:Go 并发读写 sync.map 的强大之处

主要结果是:

  • map操作时,sync.Map适合读/删除多的场景,而对于写入多的场景性能则会更差,适合用原生map+读写锁/互斥锁。
  • 为什么官方不直接取消原生map改用sync.Map呢,因为大部分情况下人们并不会在多个goroutine的情况下使用map,所以没有线程安全问题,就不需要加锁,这个时候原生map速度是最快的,如果全部换成sync.Map,那么额外的锁会增加开销,降低程序性能和效率。所以sync.Map是官方提供的一个额外选择,但不是必须使用的,要根据不同场景使用

关于sync.Map源码解析和执行流程可以参考这篇文章:https://zhuanlan.zhihu.com/p/449078860

3 参考文章