转载:http://se77en.cc/2014/04/08/share-by-communicating-the-concurrency-slogan-in-golang/


概述

我一直在找一种好的方法来解释 go 语言的并发模型:

不要通过共享内存来通信,相反,应该通过通信来共享内存

但是没有发现一个好的解释来满足我下面的需求:

  • 通过一个例子来说明最初的问题
  • 提供一个共享内存的解决方案
  • 提供一个通过通信的解决方案

这篇文章我就从这三个方面来做出解释。

读过这篇文章后你应该会了解通过通信来共享内存的模型,以及它和通过共享内存来通信的区别,你还将看到如何分别通过这两种模型来解决访问和修改共享资源的问题。

前提

设想一下我们要访问一个银行账号:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
     
type Account interface {
Withdraw( uint)
Deposit( uint)
Balance() int
}
type Bank struct {
account Account
}
func NewBank(account Account) *Bank {
return &Bank{account: account}
}
func (bank *Bank) Withdraw(amount uint, actor_name string) {
fmt.Println( "[-]", amount, actor_name)
bank.account.Withdraw(amount)
}
func (bank *Bank) Deposit(amount uint, actor_name string) {
fmt.Println( "[+]", amount, actor_name)
bank.account.Deposit(amount)
}
func (bank *Bank) Balance() int {
return bank.account.Balance()
}
Account
     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
     
type SimpleAccount struct{
balance int
}
func NewSimpleAccount(balance int) *SimpleAccount {
return &SimpleAccount{balance: balance}
}
func (acc *SimpleAccount) Deposit(amount uint) {
acc.setBalance(acc.balance + int(amount))
}
func (acc *SimpleAccount) Withdraw(amount uint) {
if acc.balance >= int(amount) {
acc.setBalance(acc.balance - int(amount))
} else {
panic( "杰克穷死")
}
}
func (acc *SimpleAccount) Balance() int {
return acc.balance
}
func (acc *SimpleAccount) setBalance(balance int) {
acc.add_some_latency() //增加一个延时函数,方便演示
acc.balance = balance
}
func (acc *SimpleAccount) add_some_latency() {
<-time.After(time.Duration(rand.Intn (100)) * time.Millisecond)
}
balancesetBalance

把上面所有部分弄好以后我们就可以像下面这样使用它啦:

     
1
2
3
4
5
6
7
8
9
10
11
     
func main() {
balance := 80
b := NewBank(bank. NewSimpleAccount(balance))
fmt. Println( "初始化余额", b. Balance())
b. Withdraw( 30, "马伊琍")
fmt. Println( "-----------------")
fmt. Println( "剩余余额", b. Balance())
}

运行上面的代码会输出:

     
1
2
3
4
     
初始化余额 80
[-] 30 马伊琍
- ----------------
剩余余额 50

没错!

不错在现实生活中,一个银行账号可以有很多个附属卡,不同的附属卡都可以对同一个账号进行存取钱,所以我们来修改一下代码:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
     
func main() {
balance := 80
b := NewBank(bank. NewSimpleAccount(balance))
fmt. Println( "初始化余额", b. Balance())
done := make(chan bool)
go func() { b. Withdraw( 30, "马伊琍"); done <- true }()
go func() { b. Withdraw( 10, "姚笛"); done <- true }()
//等待 goroutine 执行完成
<-done
<-done
fmt. Println( "-----------------")
fmt. Println( "剩余余额", b. Balance())
}

这儿两个附属卡并发的从账号里取钱,来看看输出结果:

     
1
2
3
4
5
     
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
- ----------------
剩余余额 70

这下把文章高兴坏了:)

结果当然是错误的,剩余余额应该是40而不是70,那么让我们看看到底哪儿出问题了。

问题

当并发访问共享资源时,无效状态有很大可能会发生。

在我们的例子中,当两个附属卡同一时刻从同一个账号取钱后,我们最后得到银行账号(即共享资源)错误的剩余余额(即无效状态)。

我们来看一下执行时候的情况:

     
1
2
3
4
5
6
7
8
9
10
11
     
处理情况
--------------
_马伊琍_|_姚笛_
1. 获取余额 80 | 80
2. 取钱 - 30 | - 10
3. 当前剩余 50 | 70
... | ...
4. 设置余额 50 ? 70 //该先设置哪个好呢?
5. 后设置的生效了
--------------
6. 剩余余额 70
...add_some_latency

解决办法

我们通过两种方法来解决这个问题:

  • 共享内存的解决方案
  • 通过通信的解决方案
SimpleAccount

共享内存的解决方案

又叫 “通过共享内存来通信”。

这种方案暗示了使用锁机制来预防同时访问和修改共享资源。锁告诉其它处理程序这个资源已经被一个处理程序占用了,因此别的处理程序需要排队直到当前处理程序处理完毕。

LockingAccount
     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
     
type LockingAccount struct {
lock sync.Mutex
account *SimpleAccount
}
//封装一下 SimpleAccount
func NewLockingAccount(balance int) *LockingAccount {
return &LockingAccount{account: NewSimpleAccount(balance)}
}
func (acc *LockingAccount) Deposit(amount uint) {
acc. lock. Lock()
defer acc. lock. Unlock()
acc.account.Deposit(amount)
}
func (acc *LockingAccount) Withdraw(amount uint) {
acc. lock. Lock()
defer acc. lock. Unlock()
acc.account.Withdraw(amount)
}
func (acc *LockingAccount) Balance() int {
acc. lock. Lock()
defer acc. lock. Unlock()
return acc.account.Balance()
}
lock sync.Locklock.Lock()lock.Unlock()

这样每次一个附属卡访问银行账号(即共享资源),这个附属卡会自动获得锁直到最后操作完毕。

LockingAccount
     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
     
func main() {
balance := 80
b := NewBank(bank. NewLockingAccount(balance))
fmt. Println( "初始化余额", b. Balance())
done := make(chan bool)
go func() { b. Withdraw( 30, "马伊琍"); done <- true }()
go func() { b. Withdraw( 10, "姚笛"); done <- true }()
//等待 goroutine 执行完成
<-done
<-done
fmt. Println( "-----------------")
fmt. Println( "剩余余额", b. Balance())
}

输出的结果是:

     
1
2
3
4
5
     
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
- ----------------
剩余余额 40

现在结果正确了!

在这个例子中第一个处理程序加锁后独享共享资源,其它处理程序只能等待它执行完成。

我们接着看一下执行时的情况,假设马伊琍先拿到了锁:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
     
处理过程
________________
_马伊琍_|__姚笛__
加锁 ><
得到余额 80 |
取钱 - 30 |
当前余额 50 |
... |
设置余额 50 |
解除锁 <>
|
当前余额 50
|
加锁 ><
得到余额 | 50
取钱 | - 10
当前余额 | 40
| ...
设置余额 | 40
解除锁 <>
________________
剩余余额 40

现在我们的处理程序在访问共享资源时相继的产生了正确的结果。

通过通信的解决方案

又叫 “通过通信来共享内存”。

ConcurrentAccount
     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
     
type ConcurrentAccount struct {
account *SimpleAccount
deposits chan uint
withdrawals chan uint
balances chan chan int
}
func NewConcurrentAccount(amount int) *ConcurrentAccount{
acc := &ConcurrentAccount{
account : &SimpleAccount{balance: amount},
deposits: make( chan uint),
withdrawals: make( chan uint),
balances: make( chan chan int),
}
acc.listen()
return acc
}
func (acc *ConcurrentAccount) Balance() int {
ch := make( chan int)
acc.balances <- ch
return <-ch
}
func (acc *ConcurrentAccount) Deposit(amount uint) {
acc.deposits <- amount
}
func (acc *ConcurrentAccount) Withdraw(amount uint) {
acc.withdrawals <- amount
}
func (acc *ConcurrentAccount) listen() {
go func() {
for {
select {
case amnt := <-acc.deposits:
acc.account.Deposit(amnt)
case amnt := <-acc.withdrawals:
acc.account.Withdraw(amnt)
case ch := <-acc.balances:
ch <- acc.account.Balance()
}
}
}()
}
ConcurrentAccountSimpleAccount

调用代码和加锁版本的一样,这里就不写了,唯一不一样的就是初始化银行账号的时候:

     
1
     
b := NewBank(bank.NewConcurrentAccount(balance))

运行产生的结果和加锁版本一样:

     
1
2
3
4
5
     
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
- ----------------
剩余余额 40

让我们来深入了解一下细节。

通过通信来共享内存是如何工作的

一些基本注意点:

  • 共享资源被封装在一个控制流程中。
    结果就是资源成为了非共享状态。没有处理程序能够直接访问或者修改资源。你可以看到访问和修改资源的方法实际上并没有执行任何改变。
     
1
2
3
4
5
6
7
8
9
10
11
12
13
     
func (acc *ConcurrentAccount) Balance() int {
ch := make( chan int)
acc.balances <- ch
balance := <-ch
return balance
}
func (acc *ConcurrentAccount) Deposit(amount uint) {
acc.deposits <- amount
}
func (acc *ConcurrentAccount) Withdraw(amount uint) {
acc.withdrawals <- amount
}
  • 访问和修改是通过消息和控制流程通信。
  • 在控制流程中任何访问和修改的动作都是相继发生的。
    当控制流程接收到访问或者修改的请求后会立即执行相关动作。让我们仔细看看这个流程:
     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
     
func (acc * ConcurrentAccount) listen() {
// 执行控制流程
go func() {
for {
select {
case amnt := <-acc.deposits:
acc.account. Deposit(amnt)
case amnt := <-acc.withdrawals:
acc.account. Withdraw(amnt)
case ch := <-acc.balances:
ch <- acc.account. Balance()
}
}
}()
}

select 不断地从各个通道中取出消息,每个通道都跟它们所要执行的操作相一致。

select

领会这个有一点绕。

Balance()
     
1
2
3
4
5
6
7
8
     
一张附属卡的流程 | 控制流程
----------------------------------------------
1. b. Balance() |
2. ch -> [acc.balances]-> ch
3. <-ch | balance = acc.account. Balance()
4. return balance <-[ch]<- balance
5 |

这两个流程都干了点什么呢?

附属卡的流程

b.Balance()chchacc.balancesch<-ch

控制流程

acc.balanceschch
事件

总结

这篇博客描述了问题以及问题的解决办法,但那时没有深入去探究不同解决办法的优缺点。

其实这篇文章的例子更适合用 mutex,因为这样代码更加清晰。

最后,请毫无顾忌的指出我的错误!