Go语言编程笔记9:使用共享变量实现并发

image-20211108153040805

数据竞态

在多线程编程中,遇到的最大麻烦就是当多个线程对同一个数据进行操作时,因为代码交错执行引发的一些问题:

package main

import (
	"fmt"
	"sync"
)

type bank struct {
	amount int
}

func (b *bank) SaveMoney(amount int) {
	b.amount += amount
}

func (b *bank) GetAmount() int {
	return b.amount
}

func main() {
	for i := 0; i < 10; i++ {
		bankTest()
	}
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  100
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
}

func bankTest() {
	var b bank
	var gwg sync.WaitGroup
	gwg.Add(1)
	go func() {
		defer gwg.Done()
		b.SaveMoney(100)
		fmt.Println("bank amount: ", b.GetAmount())
	}()
	gwg.Add(1)
	go func() {
		defer gwg.Done()
		b.SaveMoney(200)
	}()
	gwg.Wait()
}
bankbankTest

实际运行多次我们发现结果并不一致,这是因为两个goroutine是同时运行的,并非是顺序执行,所以是可能A线程存完100块后,B线程存了200,然后A线程再查看余额,此时就是300块。除了这种情况以外,也有可能是A全部执行完后,B再存钱,此时输出结果就是100块。当然也可能是B存完钱后A再执行,此时结果也是300块。

但可能的情况并不仅仅是这三种,因为计算机实际执行程序时是以底层的汇编指令为最小执行单元来执行的,并非是高级语言的单行代码,这在多线程编程中尤其致命。

b.amount += amount
c = b.amount + amount
b.amount = c
++b.amountb.amount

《Go程序设计语言》对此的一句评论相当有趣——“不要相信你在多线程编程时的直觉,因为那往往是错的”。

在学习Python时我也看到过类似的话,关于多线程编程最好的告诫就是——“不要多线程编程”。当然这并不是说不要用编写并发程序,而是说不要写传统的多线程编程,因为那样你会遇到很多麻烦,且很难排查和解决。事实上很多编程语言在语言层面尝试解决该问题,比如Python的全局线程锁,这可以看作是试图将多线程这头老虎关在笼子里的做法,在此基础上使用并发或异步都可以很好地解决并发问题。当然这也并非没有代价,但综合来看是相当值得的。与Python相比,Go语言的goroutine更像是传统的多线程编程,不过采用了其它方式来避免传统多线程编程的问题,在后面我们会详细说明。

net.Connect

解决数据竞态

针对数据竟态,有以下几种方式可以解决:

不写入数据

之所以会出现数据竟态,是因为多个线程并发访问共享数据时,有至少一个尝试写入,如果所有的并发线程都只尝试读取,而不写入,自然也就不存在数据竟态。

在编程中,我们经常会遇到一种“延迟初始化”的问题:

package main

type student struct {
	name string
	age  int
}

type students struct {
	stds map[string]*student
}

func (s *students) getStudent(name string) *student {
	student, ok := s.stds[name]
	if !ok {
		student := initStudent(name)
		s.stds[name] = student
	}
	return student
}

func initStudent(name string) *student {
	return &student{}
}

func main() {
	var ss students
	go func() {
		ss.getStudent("std1")
	}()
	go func() {
		ss.getStudent("std2")
	}()
}
studentsstudentsmapkeymap

这种情况下最简单的一个解决方式是:在进行并发访问前先生成所有数据,再并发读取:

func main() {
	var ss students
	ss.getStudent("std1")
	ss.getStudent("std2")
	go func() {
		ss.getStudent("std1")
	}()
	go func() {
		ss.getStudent("std2")
	}()
}
getStudentmap

通过通道共享数据

使用通道来传递共享数据是一个更常见的解决方式,Go语言有一句真理——“不要通过共享内存来通信,而应该通过通信来共享内存”。也就是说在多个线程共享访问同一个变量时,我们应当通过通道来访问这个变量的副本,而不是直接访问这个变量。

bank
package main

import (
	"fmt"
	"sync"
	"time"
)

type bank struct {
	amount    int
	saveChan  chan int
	getChan   chan int
	closeChan chan struct{}
}

func (b *bank) Init() {
	b.saveChan = make(chan int)
	b.getChan = make(chan int)
	b.closeChan = make(chan struct{})
}

func (b *bank) SaveMoney(amount int) {
	b.saveChan <- amount
}

func (b *bank) GetAmount() int {
	return <-b.getChan
}

func (b *bank) StartBank() {
	for {
		select {
		case amount := <-b.saveChan:
			b.amount += amount
		case b.getChan <- b.amount:
		case <-b.closeChan:
			return
		}
	}
}

func (b *bank) Close() {
	close(b.closeChan)
}

func main() {
	for i := 0; i < 10; i++ {
		bankTest()
	}
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  100
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
}

func bankTest() {
	var b bank
	b.Init()
	go b.StartBank()
	var gwg sync.WaitGroup
	gwg.Add(1)
	go func() {
		defer gwg.Done()
		b.SaveMoney(100)
		fmt.Println("bank amount: ", b.GetAmount())
	}()
	gwg.Add(1)
	go func() {
		defer gwg.Done()
		b.SaveMoney(200)
	}()
	gwg.Wait()
	b.Close()
	time.Sleep(time.Second)
}
go b.StartBankbamountGetAmountSaveMoney

上面例子的遗留问题可以通过互斥锁来解决。

需要说明的是,通道并非仅仅能传递共享数据的拷贝,在某些情况下也可以传递共享数据指针,比如在Go语言编程笔记7:goroutine和通道中提到的那个蛋糕师和流水线的问题,对于蛋糕实例,每一个蛋糕当前仅有一个蛋糕师可以操作,所以我们完全可以在当前蛋糕师完成制作步骤后,将蛋糕指针通过通道传递给下一个蛋糕师,然后就不再操作该蛋糕,虽然在蛋糕的生命周期内,它会存在于各个蛋糕师那里,但在某一个时刻,仅有一个蛋糕师会操作该蛋糕,所以同样不会存在数据竟态的问题,这种情况可以被称作“串行受限”。

互斥锁

互斥锁其实是一个传统的并发解决方式,熟悉传统多线程编程的老司机应该不陌生,不过可能是叫做“资源锁”或者“线程锁”,但实质都是针对共享变量的一把锁,通过在操作前加锁,操作后解锁来实现并发情况下操作同一个变量不会出现数据竟态。

我们用互斥锁来改写上面的示例:

package main

import (
	"fmt"
	"sync"
)

type bank struct {
	amountMutex sync.Mutex
	amount      int
}

func (b *bank) SaveMoney(amount int) {
	b.amountMutex.Lock()
	b.amount += amount
	b.amountMutex.Unlock()
}

func (b *bank) GetAmount() int {
	b.amountMutex.Lock()
	defer b.amountMutex.Unlock()
	return b.amount
}

func main() {
	for i := 0; i < 10; i++ {
		bankTest()
	}
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  100
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
	// bank amount:  300
}

func bankTest() {
	var b bank
	var gwg sync.WaitGroup
	gwg.Add(1)
	go func() {
		defer gwg.Done()
		b.SaveMoney(100)
		fmt.Println("bank amount: ", b.GetAmount())
	}()
	gwg.Add(1)
	go func() {
		defer gwg.Done()
		b.SaveMoney(200)
	}()
	gwg.Wait()
}

通过添加互斥锁,我们就可以让并发的多个goroutine安全地读写共享变量,而不用担心之前某些情况下钱会不翼而飞的问题。

假如我们要添加一个取钱的操作:

func (b *bank) WithDraw(amount int) bool {
	b.SaveMoney(-amount)
	if b.GetAmount() < 0 {
		b.SaveMoney(amount)
		return false
	}
	return true
}
b.amount
func (b *bank) WithDraw(amount int) bool {
	b.amountMutex.Lock()
	defer b.amountMutex.Unlock()
    b.saveMoney(-amount)
	...
}
withDrawb.saveMoney()

这种情况可以用以下的方式避免:

type bank struct {
	amountMutex sync.Mutex
	amount      int
}

func (b *bank) SaveMoney(amount int) {
	b.amountMutex.Lock()
	b.saveMoney(amount)
	b.amountMutex.Unlock()
}

func (b *bank) saveMoney(amount int) {
	b.amount += amount
}

func (b *bank) GetAmount() int {
	b.amountMutex.Lock()
	defer b.amountMutex.Unlock()
	return b.getAmount()
}

func (b *bank) getAmount() int {
	return b.amount
}

func (b *bank) WithDraw(amount int) bool {
	b.amountMutex.Lock()
	defer b.amountMutex.Unlock()
	b.saveMoney(-amount)
	if b.getAmount() < 0 {
		b.saveMoney(amount)
		return false
	}
	return true
}

这里将具体逻辑从对应方法中拆分出来,编写成了首字母小写的结构体内部无锁版本的方法,首字母大写的方法供外部调用,进行了加锁,而内部无锁版本的可以用于内部使用,这样可以确保安全地访问共享变量。

虽然这样看起来有点蠢,但挺实用。

互斥锁虽然看起来不错,但是实际上有个致命问题,通过添加互斥锁,对共享变量的一切访问都变成了互斥的、串行的行为,这当然会大大影响并发的效率,理论上,出了写入操作时应当绝对互斥,在只存在并发读取时是可以并发进行的,Go语言也为此提供了额外的一种锁:读写锁。

上面的示例改写为读写锁很容易:

type bank struct {
	amountMutex sync.RWMutex
	amount      int
}
...
func (b *bank) GetAmount() int {
	b.amountMutex.RLock()
	defer b.amountMutex.RUnlock()
	return b.getAmount()
}
LockUnLockGetAmount

需要注意的是,这样并不是没有代价的,读写锁比普通的互斥锁有更复杂的薄帐操作,所以只有在大量并发读取共享变量的情况下比普通的互斥锁有优势。

延迟初始化

之前我们提到的那个延迟初始化的例子,也可以用互斥锁来改写:

type students struct {
	stdsMutex sync.Mutex
	stds      map[string]*student
}

func (s *students) getStudent(name string) *student {
	s.stdsMutex.Lock()
	defer s.stdsMutex.Unlock()
	student, ok := s.stds[name]
	if !ok {
		student := initStudent(name)
		s.stds[name] = student
	}
	return student
}

也可以用读写锁来改善性能:

type students struct {
	stdsMutex sync.RWMutex
	stds      map[string]*student
}

func (s *students) getStudent(name string) *student {
	s.stdsMutex.RLock()
	student, ok := s.stds[name]
	s.stdsMutex.RUnlock()
	if !ok {
		s.stdsMutex.Lock()
		student, ok = s.stds[name]
		if !ok {
			student := initStudent(name)
			s.stds[name] = student
		}
		s.stdsMutex.Unlock()
	}
	return student
}
key

如果我们可以一次性完成初始化,可以用另一种方式来编写类似的代码:

type students struct {
	stdsMutex sync.Once
	stds      map[string]*student
}

func (s *students) GetStudent(name string) *student {
	s.stdsMutex.Do(s.initAll)
	return s.stds[name]
}

func (ss *students) initAll() {
	ss.stds["std1"] = initStudent("std1")
	ss.stds["std2"] = initStudent("std2")
}
sync.OnceGetStudents.initAllGetStudent

往期内容