本文以一个简单事例的多种解决方案作为引子,用结构体Demo来总结各种并发读写的情况

一个数据竞态的case

package main

import (
	"fmt"
	"testing"
	"time"
)

func Test(t *testing.T) {
  fmt.Print("getNum(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNum()) + " ")
	}
	fmt.Println()

}
func getNum() int {
	var num int
	go func() {
		num = 53
	}()
	time.Sleep(500)
	return num
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yUTBgzir-1675414566701)(assets/IMG_21.png)]

numnumnumnum
  1. goRoutine先完成对num的修改,最后返回5
  2. 变量num的值从函数返回,结果为默认值0

操作完成的顺序不同,导致最后的输出结果不同,这就是将其称为数据竟态的原因。

检查数据竞态

Go有内置的数据竞争检测器,可以使用它来查看潜在的数据竞争条件。使用它就像-race在普通的Go命令行工具中添加标志一样。

  • 运行时检查: go run -race main.go
  • 构建时检查: go build -race main.go
  • 测试时检查: go test -race main.go

所有避免产生竞态背后的核心原则是防止对同一变量或内存位置同时进行读写访问

解决方案

1、WaitGroup等待

解决数据竞态的最直接方法是阻止读取访问操作直到写操作完成为止。
可以以最少的麻烦解决问题,但必须要保证Add和Done出现次数一致,否则会一致阻塞程序,无限制消耗内存,直至资源耗尽服务宕机

func getNumByWaitGroup() int {
	var num int
	var wg sync.WaitGroup
	wg.Add(1) // 表示有一个任务需要等待,等待任务数+1
	go func() {
		num = 53
		wg.Done() // 完成一个处于等待队列的任务,等待任务-1

		// Done decrements the WaitGroup counter by one.
		// func (wg *WaitGroup) Done() {
		//	wg.Add(-1)
		//}

	}()
	wg.Wait() // 阻塞等待,直到等待队列的任务数为0
	return num
}

2、Channel阻塞等待

与1相似

func getNumByChannel() int {
	var num int
	ch := make(chan struct{}) // 创建一个类型为结构体的channel,并初始化为空
	go func() {
		num = 53
		ch <- struct{}{} // 推送一个空结构体到ch
	}()
	<-ch // 使程序处于阻塞状态,直到ch获取到推送的值
	return num
}

3、Channel通道

获取结果后通过通道推送结果,与前两种方法不同,该方法不会进行任何阻塞。
相反,保留了阻塞调用代码的时机,因此它允许更高级别的功能决定自己的阻塞合并发机制,而不是将getXX功能视为同步功能

func getNumByChan() <-chan int {
	var num int
	ch := make(chan int) // 创建一个类型为int的channel
	go func() {
		num = 53
		ch <- num // 推送一个int到ch
	}()

	return ch // 返回chan
}

4、互斥锁

上述三种方法解决的是num在写操作完成后才能读取的情况
不管读写顺序如何,只要求它们不能同时发生——> 互斥锁


// 首先,创建一个结构体,其中包含我们想要返回的值以及一个互斥实例
type NumLock struct {
	val int
	m   sync.Mutex
}

func (num *NumLock) Get() int {
	// The `Lock` method of the mutex blocks if it is already locked
	// if not, then it blocks other calls until the `Unlock` method is called
	// Lock方法
	// 调用结构体对象的Lock方法将会锁定该对象中的变量;如果没有,将会阻塞其他调用,直到该互斥对象的Unlock方法被调用

	num.m.Lock()
	// 直到该方法返回,该实例对象才会被解锁
	defer num.m.Unlock()
	// 返回安全类型的实例对象中的值
	return num.val
}

func (num *NumLock) Set(val int) {
	// 类似于上面的getNum方法,锁定num对象直到写入“num.val”的值完成
	num.m.Lock()
	defer num.m.Unlock()
	num.val = val
}

func getNumByLock() int {
	// 创建一个`NumLock`的示例
	num := &NumLock{}
	// 使用“Set”和“Get”来代替常规的复制修改和读取值,这样就可以确保只有在写操作完成时我们才能进行阅读,反之亦然
	go func() {
		num.Set(53)
	}()
	time.Sleep(500)
	return num.Get()
}

这里要注意,我们无法保证最后取得的num值
当有多个写入和读取操作混合在一起时,使用Mutex互斥可以保证读写的值与预期结果一致

附上结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OTuDXKB7-1675414566702)(assets/IMG_22.png)]

完整代码:

package main

import (
	"fmt"
	"strconv"
	"sync"
	"testing"
	"time"
)

func Test(t *testing.T) {
	fmt.Print("getNum(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNum()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByWaitGroup(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNumByWaitGroup()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByChannel(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNumByChannel()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByChan(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(<-getNumByChan()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByLock(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNumByLock()) + " ")
	}
	fmt.Println()
	fmt.Print("getFact(): ")
	fmt.Println(getFact())
	fmt.Println()
}
func getNum() int {
	var num int
	go func() {
		num = 53
	}()
	time.Sleep(500)
	return num
}

func getNumByWaitGroup() int {
	var num int
	var wg sync.WaitGroup
	wg.Add(1) // 表示有一个任务需要等待,等待任务数+1
	go func() {
		num = 53
		wg.Done() // 完成一个处于等待队列的任务,等待任务-1

		// Done decrements the WaitGroup counter by one.
		// func (wg *WaitGroup) Done() {
		//	wg.Add(-1)
		//}

	}()
	wg.Wait() // 阻塞等待,直到等待队列的任务数为0
	return num
}

func getNumByChannel() int {
	var num int
	ch := make(chan struct{}) // 创建一个类型为结构体的channel,并初始化为空
	go func() {
		num = 53
		ch <- struct{}{} // 推送一个空结构体到ch
	}()
	<-ch // 使程序处于阻塞状态,直到ch获取到推送的值
	return num
}

func getNumByChan() <-chan int {
	var num int
	ch := make(chan int) // 创建一个类型为int的channel
	go func() {
		num = 53
		ch <- num // 推送一个int到ch
	}()

	return ch // 返回chan
}

// 首先,创建一个结构体,其中包含我们想要返回的值以及一个互斥实例
type NumLock struct {
	val int
	m   sync.Mutex
}

func (num *NumLock) Get() int {
	// The `Lock` method of the mutex blocks if it is already locked
	// if not, then it blocks other calls until the `Unlock` method is called
	// Lock方法
	// 调用结构体对象的Lock方法将会锁定该对象中的变量;如果没有,将会阻塞其他调用,直到该互斥对象的Unlock方法被调用

	num.m.Lock()
	// 直到该方法返回,该实例对象才会被解锁
	defer num.m.Unlock()
	// 返回安全类型的实例对象中的值
	return num.val
}

func (num *NumLock) Set(val int) {
	// 类似于上面的getNum方法,锁定num对象直到写入“num.val”的值完成
	num.m.Lock()
	defer num.m.Unlock()
	num.val = val
}

func getNumByLock() int {
	// 创建一个`NumLock`的示例
	num := &NumLock{}
	// 使用“Set”和“Get”来代替常规的复制修改和读取值,这样就可以确保只有在写操作完成时我们才能进行阅读,反之亦然
	go func() {
		num.Set(53)
	}()
	time.Sleep(500)
	return num.Get()
}

func getFact() []string {
	ch := make(chan string)
	//defer close(ch)
	res := make([]string, 0)
	num := &NumLock{}
	go func() {
		for i := 10; i > 0; i-- {
			num.Set(i)
			ch <- strconv.Itoa(num.Get())
		}
		close(ch)
	}()
	for i := range ch {
		res = append(res, i)
	}
	return res
}

典型数据竞态

1、循环计数上的竞态

func main() {
	var wg sync.WaitGroup
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			fmt.Println(i) // Not the 'i' you are looking for.
			wg.Done()
		}()
	}
	wg.Wait()
}

函数文字中的变量i与循环使用的变量相同,因此goroutine中的读取与循环增量竞争。
(此程序通常打印55555,而不是01234)
该程序可以通过复制变量来修复:

func main() {
	var wg sync.WaitGroup
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func(j int) {
			fmt.Println(j) // Good. Read local copy of the loop counter.
			wg.Done()
		}(i)
	}
	wg.Wait()
}

2、意外共享变量

func ParallelWrite(data []byte) chan error {
	res := make(chan error, 2)
	f1, err := os.Create("file1")
	if err != nil {
		res <- err
	} else {
		go func() {
			// This err is shared with the main goroutine,
			// so the write races with the write below.
			_, err = f1.Write(data)
			res <- err
			f1.Close()
		}()
	}
	f2, err := os.Create("file2") // The second conflicting write to err.
	if err != nil {
		res <- err
	} else {
		go func() {
			_, err = f2.Write(data)
			res <- err
			f2.Close()
		}()
	}
	return res
}

修复方法是在goroutines中引入新变量(注意使用:=):

	...
	_, err := f1.Write(data)
	...
	_, err := f2.Write(data)
	...

3、无保护的全局变量

如果从几个goroutine调用以下代码,则会导致service的map产生竞态。同一map的并发读写不安全:

var service map[string]net.Addr

func RegisterService(name string, addr net.Addr) {
	service[name] = addr
}

func LookupService(name string) net.Addr {
	return service[name]
}
mutex
var (
	service   map[string]net.Addr
	serviceMu sync.Mutex
)

func RegisterService(name string, addr net.Addr) {
	serviceMu.Lock()
	defer serviceMu.Unlock()
	service[name] = addr
}

func LookupService(name string) net.Addr {
	serviceMu.Lock()
	defer serviceMu.Unlock()
	return service[name]
}

4、原始无保护变量

数据竞态也可以发生在原始类型的变量上(bool、int、int64等)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
	w.last = time.Now().UnixNano() // First conflicting access.
}

func (w *Watchdog) Start() {
	go func() {
		for {
			time.Sleep(time.Second)
			// Second conflicting access.
			if w.last < time.Now().Add(-10*time.Second).UnixNano() {
				fmt.Println("No keepalives for 10 seconds. Dying.")
				os.Exit(1)
			}
		}
	}()
}

即使这种“无辜”的数据竞争也可能导致因内存访问的非原子性、干扰编译器优化或访问处理器内存的重新排序问题而导致难以调试的问题。

这场比赛的一个典型修复方法是使用通道或互斥体。为了保持无锁行为,也可以使用sync/atomic包

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
	atomic.StoreInt64(&w.last, time.Now().UnixNano())
}

func (w *Watchdog) Start() {
	go func() {
		for {
			time.Sleep(time.Second)
			if atomic.LoadInt64(&w.last) < time.Now().Add(-10*time.Second).UnixNano() {
				fmt.Println("No keepalives for 10 seconds. Dying.")
				os.Exit(1)
			}
		}
	}()
}

5、未同步的发送和关闭操作

同一通道上的非同步发送和关闭操作也可能是一个竞态条件

c := make(chan struct{}) // or buffered channel

// The race detector cannot derive the happens before relation
// for the following send and close operations. These two operations
// are unsynchronized and happen concurrently.
go func() { c <- struct{}{} }()
close(c)

根据Go内存模型,通道上的发送发生在该通道的相应接收完成之前。要同步发送和关闭操作,请使用接收操作来保证发送在关闭前完成:

c := make(chan struct{}) // or buffered channel

go func() { c <- struct{}{} }()
<-c
close(c)