本文内容纲要:
前面介绍了采用go语法的并行操作以及channel。既然是并行操作,那么就涉及到数据原子性以及同步的问题。所以在Go里面也需要采用同步的机制。
互斥锁:
由标准库代码包sync中的Mutex结构体类型表示。Sync.Mutex类型只有两个公开的指针方法-Lock和Unlock。声明方法如下:
var mutex sync.Mutex
既然有加锁,那也有解锁。但是如果在代码中忘了解锁就会导致比如流程异常,线程执行停滞,甚至程序死锁等问题。在Go中,这个错误可以通过defer语句来降低发生的概率。比如下面的方法, defer语句保证了在该函数执行结束之前互斥锁mutex一定会被解锁。
var mutex sync.Mutex
func write(){
mutex.Lock()
defer mutex.Unlock()
}
来看下具体的例子:
func main(){
var mutex sync.Mutex
fmt.Println("Lock the lock.(main)")
mutex.Lock()
fmt.Println("The lock is locked.(main)")
for i:=1;i<=3;i++{
go func(i int){
fmt.Println("Lock the lock.g",i,"\n")
mutex.Lock()
fmt.Println("The lock is locked.g",i,"\n")
}(i)
}
time.Sleep(time.Second)
fmt.Println("unlock the lock.(main)")
mutex.Unlock()
fmt.Println("the lock is unlocked.(main)")
time.Sleep(time.Second)
}
在函数中启用了3个goroutine,并分别命名为g1,g2,g3.在启用这3个goroutine之前就已经对互斥锁mutex进行了锁定,并且在那3个go函数的开始处加入了对mutex的锁定操作。当for语句执行完毕后,先让main睡眠1秒钟,以便运行时系统有充足的时间开始运行g1,g2,g3。在这之后,我们解锁mutex。
运行结果:从结果中可以看到, main函数进行锁定后。g1,g2,g3都无法进行锁定操作且被阻塞,直到main函数解锁之后,这个时候被阻塞的g1,g2和g3都有机会重新锁定该互斥锁。但只有一个goroutine会锁定成功。而其他的goroutine将继续阻塞,直到有新的机会到来。
Lock the lock.(main)
The lock is locked.(main)
Lock the lock.g 1
Lock the lock.g 2
Lock the lock.g 3
unlock the lock.(main)
the lock is unlocked.(main)
The lock is locked.g 1
如果我们在go函数里面结束的时候都加上mutex.Unlock()语句。那么所有的三个goroutine都会进行锁定
Lock the lock.(main)
The lock is locked.(main)
Lock the lock.g 3
Lock the lock.g 1
Lock the lock.g 2
unlock the lock.(main)
the lock is unlocked.(main)
The lock is locked.g 3
The lock is locked.g 1
The lock is locked.g 2
读写锁:
读写锁即针对读写操作的互斥锁。与普通的互斥锁最大的不同就是可以分别针对读操作和写操作进行锁定和解锁操作。读写锁遵循的访问控制规则和互斥锁有所不同。读写锁控制下的多个写操作之间都是互斥的,并且写操作和读操作之间也都是互斥的。但是多个读操作之间却不存在互斥关系。在这样的互斥策略下,读写锁可以在大大降低因使用锁而造成的性能损耗的情况下,完成对共享资源的访问控制。
Go中的读写锁由结构体类型sync.RWMutex表示,与互斥锁一样,sync.RWMutex类型的零值就已经是可用的读写锁实例了。
func (*RWMutex) Lock()
func (*RWMutex) Unlock()
以及
func (*RWMutex) RLock()
func (*RWMutex) RUnLock()
前一对方法代表了对写操作的锁定和解锁。后一对方法代表了对读操作的锁定和解锁。
写解锁会试图唤醒所有因欲进行读锁定的而被阻塞的goroutine。而读解锁只会在已无任何读锁定的情况下,试图唤醒一个因欲进行写锁定而不阻塞的goroutine。如果对一个未被写锁定的读写锁进行写解锁,那么就会造成一个不可恢复的恐慌。对一个未被读锁定的读写锁进行读解锁也是一样的情况。
func main(){
var rwm sync.RWMutex
for i:=0;i<3;i++{
go func(i int){
fmt.Println("try to lock for reading....",i,"\n")
rwm.RLock()
fmt.Println("locked for reading,",i,"\n")
time.Sleep(time.Second*2)
fmt.Println("try to unlock for reading....",i,"\n")
rwm.RUnlock()
fmt.Println("unlocked for reading....",i,"\n")
}(i)
}
time.Sleep(time.Millisecond*100)
fmt.Println("try to locked for writting")
rwm.Lock()
fmt.Println("Locked for writting")
}
在这个例子中,启用了3个goroutine用于读写锁rwm的读锁定和读解锁操作。其中读解锁操作会有延迟2s进行模拟真实的情况。先让住goroutine睡眠100ms。以使那3个go函数有足够的时间执行。之后对rwm的写锁定操作必定会让住goroutine阻塞。因为此时go函数中的读锁定已经进行且还未进行读解锁操作。经过2秒之后,当go函数中的读解锁操作都已完成时,main函数中的写锁定操作才会成功完成。运行结果如下:
try to lock for reading.... 0
locked for reading, 0
try to lock for reading.... 1
locked for reading, 1
try to lock for reading.... 2
locked for reading, 2
try to locked for writting
try to unlock for reading.... 0
unlocked for reading.... 0
try to unlock for reading.... 2
try to unlock for reading.... 1
unlocked for reading.... 1
Locked for writing
来看一个具体操作文件的例子:
type DataFile interface{
Read()(rsn int64,d Data,err error)
Write(d Data)(wsn int64,err error)
RSN() int64
WSN() int64
DataLen() uint32
Close() error
}
首先创建一个数据文件的接口类型。里面包含了读,写操作,写入数据块的序列号和最后读取块的序列号,数据长度以及关闭。
然后来编写DataFile接口的实现类型。myDataFile共有7个字段。fmutex控制对文件的读写锁操作,woffset和roffset分别对应读,写偏置。wmutex和rmutex控制对woffset和roffset的锁操作
type myDataFile struct{
f *os.File
fmutex sync.RWMutex
woffset int64
roffset int64
wmutex sync.Mutex
rmutex sync.Mutex
dataLen uint32
}
新建一个文件的实例,把变量df作为返回值的前提是要myDataFile实现DataFile接口的所有实现方法。因此还需要为*myDataFile编写DataFile的所有方法。
func NewDataFile(path string,dataLen uint32) (DataFile,error){
f,err:=os.Create(path)
if err != nil{
return nil,err
}
if dataLen == 0{
return nil,errors.New("invalid data length")
}
df:=&myDataFile{f:f,dataLen:dataLen}
return df,nil
}
在对offset进行操作前,需要用互斥锁锁住避免多个进程进行操作。在读取文件的数据块的时候进行读锁定。但是这个代码有个问题就是当多个goroutine并行执行的时候,读偏置roffset有可能会大于写偏置woffset,这会导致没有数据可读。ReadAt返回的第二个结果就是io.EOF. 导致调用发读取错误的数据
func (df *myDataFile) Read() (rsn int64,d Data,err error){
var offset int64
df.rmutex.Lock()
offset=df.roffset
df.roffset+=int64(df.dataLen)
df.rmutex.Unlock()
rsn=offset/int64(df.dataLen)
df.fmutex.Lock()
defer df.fmutex.Unlock()
bytes:=make([]bytes,df.dataLen)
_,err=df.f.ReadAt(bytes,offset)
if err !=nil {
return
}
d=bytes
return
}
因此对读操作的边界情况加入保护。代码如下。当遇到io.EOF的时候,会尝试读取同样的数据块。在for循环开始的时候也会一直保持读锁定。针对这个文件的写操作goroutine都会被阻塞。所以在continue以及return之前都必须加上df.fmutex.RUnlock()进行读解锁。
func (df *myDataFile) Read() (rsn int64,d Data,err error){
var offset int64
df.rmutex.Lock()
offset=df.roffset
df.roffset+=int64(df.dataLen)
df.rmutex.Unlock()
rsn=offset/int64(df.dataLen)
bytes:=make([]byte,df.dataLen)
for{
df.fmutex.RLock()
_,err=df.f.ReadAt(bytes,offset)
if err !=nil{
if err == io.EOF{
df.fmutex.Unlock()
continue
}
df.fmutex.RUnlock()
return
}
d=bytes
df.fmutex.RUnlock()
return
}
}
func (df *myDataFile) Write(d Data) (wsn int64,err error){
var offset int64
var bytes []byte
df.wmutex.Lock()
offset=df.woffset
df.woffset+=int64(df.dataLen)
df.wmutex.Unlock()
wsn=offset/int64(df.dataLen)
if len(d) > int(df.dataLen) {
bytes = d[0:df.dataLen]
}else{
bytes=d
}
df.fmutex.Lock()
defer df.fmutex.Unlock()
return
}
func(df *myDataFile) RSN() int64{
df.rmutex.Lock()
defer df.rmutex.Unlock()
return df.roffset/int64(df.dataLen)
}
func(df *myDataFile) WSN() int64{
df.wmutex.Lock()
defer df.wmutex.Unlock()
return df.woffset/int64(df.dataLen)
}
func(df *myDataFile) Close() error{
error:=df.f.Close()
return error
}
func(df *myDataFile) DataLen() uint32{
return df.dataLen
}
本文内容总结: