Go语言的哲学思想之一为:Do not communicate by sharing memory; instead, share memory by communicating. 翻译成中文即是:不要通过共享内存进行通信;取而代之,应该通过通信来共享内存。
为了支持这种哲学,Go语言提供了channel(通道)。
想当初,我刚看到这种哲学的时候觉得特别好,但是在具体实现代码的时候,对其理解却不是很深刻。在项目中,我总是把channel当作生产者-消费者模型中的消息队列来进行使用。这样对于在不同的goroutine中传输数据是很方便的。
但是对于共享内存数据,我一直使用的是传统的锁方式,而在Go语言中,常用的就是Mutex和RWMutex。对于从C/C++/Java/C#转型到Go语言的程序员而言,使用Mutex/RWMutex是非常直观的。我也尝试过使用channel来共享数据,但是觉得代码非常地繁琐,于是就放弃了。
但是随着在越来越多的项目中使用Go,以及越来越多的同事使用Go;在其它语言中使用锁的常见问题也在Go里面暴露出来了。于是,我转而继续思想使用channel来实现内存数据的共享。
最近在使用Erlang,我从中得到不少的灵感。因为Erlang是一门只提供通过发送消息来共享内存的语言,而没有提供传统的锁方式。在Erlang中,OTP框架中提供了一个叫做gen_server的behaviour,而这可以大大地简化逻辑的实现。
于是,我使用Go语言的channel实现了一个类似的版本。
代码如下:
baseModel.go
/*
This package is designed to simplify the use of csp model.
In csp model, we use channel to communicate among goroutines.
But it's not easy and error prone to implement the logic every time.
So, this package is designed as a template to use.
*/
package cspBaseModel
import (
"context"
"fmt"
)
// Request type is defined as the message exchanged between higher layer and the base model.
type Request struct {
Name string // The identity of a request. Which is used to find the callback function.
Parameter interface{} // The data sent to the callback function.
ReturnCh interface{} // A channel used to pass message between the request method and the callback function.
}
func NewRequest(name string, parameter, returnCh interface{}) *Request {
return &Request{
Name: name,
Parameter: parameter,
ReturnCh: returnCh,
}
}
// BaseModel type is the core type
type BaseModel struct {
callbackMap map[string]func(*Request) // This callbackMap stores all the name and callback function pairs.
RequestChannel chan *Request // This channel is used to receive all the request from higher layer functions.
}
// Higher layer modules can register name and callback function pairs to base model.
func (this *BaseModel) Register(name string, callback func(*Request)) {
if _, exists := this.callbackMap[name]; exists {
panic(fmt.Sprintf("Item with name:%s has already existed.", name))
}
this.callbackMap[name] = callback
}
// Higher layer modules start base model to handle all the requests.
// ctx is used to stop this goroutine, to avoid goroutine leak.
func (this *BaseModel) Start(ctx context.Context) {
go func() {
for {
select {
case requestObj := <-this.RequestChannel:
if callback, exists := this.callbackMap[requestObj.Name]; !exists {
panic(fmt.Sprintf("There is no callback related to %s", requestObj.Name))
} else {
callback(requestObj)
}
case <-ctx.Done():
return
}
}
}()
}
func NewBaseModel() *BaseModel {
return &BaseModel{
callbackMap: make(map[string]func(*Request), 16),
RequestChannel: make(chan *Request, 64),
}
}
baseModel_test.go
package cspBaseModel
import (
"context"
"fmt"
"sync"
"testing"
)
type Player struct {
Id int
Name string
Level int
}
func (this *Player) Equal(other *Player) bool {
return this.Id == other.Id && this.Name == other.Name && this.Level == other.Level
}
func (this *Player) String() string {
return fmt.Sprintf("{Id:%d, Name:%s, Level:%d}", this.Id, this.Name, this.Level)
}
func NewPlayer(id int, name string, level int) *Player {
return &Player{
Id: id,
Name: name,
Level: level,
}
}
// Use mutex as the way to prevent concurrency
type MutexPlayer struct {
mutex sync.RWMutex
playerMap map[int]*Player
}
func (this *MutexPlayer) GetPlayerById(id int) (*Player, bool) {
this.mutex.RLock()
defer this.mutex.RUnlock()
playerObj, exists := this.playerMap[id]
return playerObj, exists
}
func (this *MutexPlayer) GetPlayerList() []*Player {
this.mutex.RLock()
defer this.mutex.RUnlock()
playerList := make([]*Player, 0, len(this.playerMap))
for _, value := range this.playerMap {
playerList = append(playerList, value)
}
return playerList
}
func (this *MutexPlayer) AddPlayer(playerObj *Player) {
this.mutex.Lock()
defer this.mutex.Unlock()
this.playerMap[playerObj.Id] = playerObj
}
func NewMutexPlayer() *MutexPlayer {
return &MutexPlayer{
playerMap: make(map[int]*Player, 1024),
}
}
// Use csp as the way to prevent concurrency
type CspPlayer struct {
playerMap map[int]*Player
baseModelObj *BaseModel
cancel context.CancelFunc
}
func NewCspPlayer() *CspPlayer {
ctx, cancel := context.WithCancel(context.Background())
cspPlayerObj := &CspPlayer{
playerMap: make(map[int]*Player, 1024),
baseModelObj: NewBaseModel(),
cancel: cancel,
}
cspPlayerObj.baseModelObj.Start(ctx)
// Register callback
cspPlayerObj.baseModelObj.Register("GetPlayerById", cspPlayerObj.getPlayerByIdCallback)
cspPlayerObj.baseModelObj.Register("GetPlayerList", cspPlayerObj.getPlayerListCallback)
cspPlayerObj.baseModelObj.Register("AddPlayer", cspPlayerObj.addPlayerCallback)
return cspPlayerObj
}
func (this *CspPlayer) Stop() {
this.cancel()
}
type GetPlayerByIdResponse struct {
Value *Player
Exists bool
}
func (this *CspPlayer) GetPlayerById(id int) (*Player, bool) {
retCh := make(chan *GetPlayerByIdResponse)
this.baseModelObj.RequestChannel <- NewRequest("GetPlayerById", id, retCh)
responseObj := <-retCh
return responseObj.Value, responseObj.Exists
}
func (this *CspPlayer) getPlayerByIdCallback(requestObj *Request) {
id, _ := requestObj.Parameter.(int)
playerObj, exists := this.playerMap[id]
getPlayerByIdResponseObj := &GetPlayerByIdResponse{
Value: playerObj,
Exists: exists,
}
retCh, _ := requestObj.ReturnCh.(chan *GetPlayerByIdResponse)
retCh <- getPlayerByIdResponseObj
}
type GetPlayerListResponse struct {
Value []*Player
}
func (this *CspPlayer) GetPlayerList() []*Player {
retCh := make(chan *GetPlayerListResponse)
this.baseModelObj.RequestChannel <- NewRequest("GetPlayerList", nil, retCh)
responseObj := <-retCh
return responseObj.Value
}
func (this *CspPlayer) getPlayerListCallback(requestObj *Request) {
playerList := make([]*Player, 0, len(this.playerMap))
for _, value := range this.playerMap {
playerList = append(playerList, value)
}
getPlayerListResponseObj := &GetPlayerListResponse{
Value: playerList,
}
retCh, _ := requestObj.ReturnCh.(chan *GetPlayerListResponse)
retCh <- getPlayerListResponseObj
}
func (this *CspPlayer) AddPlayer(playerObj *Player) {
this.baseModelObj.RequestChannel <- NewRequest("AddPlayer", playerObj, nil)
}
func (this *CspPlayer) addPlayerCallback(requestObj *Request) {
playerObj, _ := requestObj.Parameter.(*Player)
this.playerMap[playerObj.Id] = playerObj
}
func TestMutexPlayer(t *testing.T) {
mutexPlayerObj := NewMutexPlayer()
playerObj := NewPlayer(1, "Jordan", 100)
mutexPlayerObj.AddPlayer(playerObj)
playerList := mutexPlayerObj.GetPlayerList()
if len(playerList) != 1 {
t.Fatalf("Expected %d items in the list, now got:%d", 1, len(playerList))
}
gotPlayerObj, exists := mutexPlayerObj.GetPlayerById(1)
if !exists {
t.Fatalf("Expected:%v, Got:%v", true, exists)
}
if gotPlayerObj.Equal(playerObj) == false {
t.Fatalf("Expected:%s, Got:%s", playerObj, gotPlayerObj)
}
}
func TestCspPlayer(t *testing.T) {
cspPlayerObj := NewCspPlayer()
playerObj := NewPlayer(1, "Jordan", 100)
cspPlayerObj.AddPlayer(playerObj)
playerList := cspPlayerObj.GetPlayerList()
if len(playerList) != 1 {
t.Fatalf("Expected %d items in the list, now got:%d", 1, len(playerList))
}
gotPlayerObj, exists := cspPlayerObj.GetPlayerById(1)
if !exists {
t.Fatalf("Expected:%v, Got:%v", true, exists)
}
if gotPlayerObj.Equal(playerObj) == false {
t.Fatalf("Expected:%s, Got:%s", playerObj, gotPlayerObj)
}
}
func BenchmarkMutexPlayer(b *testing.B) {
mutexPlayerObj := NewMutexPlayer()
for i := 0; i < b.N; i++ {
mutexPlayerObj.AddPlayer(NewPlayer(i, fmt.Sprintf("Player%d", i), i))
mutexPlayerObj.GetPlayerById(i)
mutexPlayerObj.GetPlayerList()
}
}
func BenchmarkCspPlayer(b *testing.B) {
cspPlayerObj := NewCspPlayer()
for i := 0; i < b.N; i++ {
cspPlayerObj.AddPlayer(NewPlayer(i, fmt.Sprintf("Player%d", i), i))
cspPlayerObj.GetPlayerById(i)
cspPlayerObj.GetPlayerList()
}
cspPlayerObj.Stop()
}
对于外部的使用者,使用Mutex/RWMutex和使用channel,两者没有任何区别。所以可以很方便地进行实现,以及内部的改造。