条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。
- 条件变量与互斥量组合使用
- 互斥量为共享数据的访问提供互斥支持
- 条件变量就状态的变化向相关线程发出通知
- 等待通知: wait
- 阻塞当前线程,直到收到该条件变量发来的通知
- 单发通知: signal
- 让该条件变量向至少一个正在等待它的通知的线程发送通知,表示共享数据的状态已经改变。
- 广播通知: broadcast
- 让条件变量给正在等待它的通知的所有线程都发送通知。
示例func NewCond(l Locker) *Cond
改造上一节的锁使用代码
Read()方法改造如下:
func (df *myDataFile) Read() (rsn int64, d Data, err error){
// 读取并更新读偏移量
var offset int64
// 读互斥锁定
df.rmutex.Lock()
offset = df.roffset
// 更改偏移量, 当前偏移量+数据块长度
df.roffset += int64(df.dataLen)
// 读互斥解锁
df.rmutex.Unlock()
//读取一个数据块,最后读取的数据块序列号
rsn = offset / int64(df.dataLen)
bytes := make([]byte, df.dataLen)
//读写锁:读锁定
df.fmutex.RLock()
defer df.fmutex.RUnlock()
for {
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
if err == io.EOF {
//暂时放弃fmutex的 读锁,并等待通知的到来
df.rcond.Wait()
continue
}
}
return
}
d = bytes
return
}
Write()方法改造如下:
func (df *myDataFile) Write(d Data) (wsn int64, err error){
//读取并更新写的偏移量
var offset int64
df.wmutex.Lock()
offset = df.woffset
df.woffset += int64(df.dataLen)
df.wmutex.Unlock()
//写入一个数据块,最后写入数据块的序号
wsn = offset / int64(df.dataLen)
var bytes []byte
if len(d) > int(df.dataLen){
bytes = d[0:df.dataLen]
}else{
bytes = d
}
df.fmutex.Lock()
defer df.fmutex.Unlock()
_, err = df.f.Write(bytes)
//发送通知
df.rcond.Signal()
return
}
因为一个数据块只能有一个读操作读取,因此我们使用条件变量Signal方法通知某一个为此等待的Wait方法,唤醒一个相关的Goroutine。
还有一件事不能忘记,初始化rcond字段
func NewDataFile(path string, dataLen uint32) (DataFile, error){
//f, err := os.OpenFile(path, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0666)
f,err := os.Create(path)
if err != nil {
fmt.Println("Fail to find", f, "cServer start Failed")
return nil, err
}
if dataLen == 0 {
return nil, errors.New("Invalid data length!")
}
df := &myDataFile{
f : f,
dataLen:dataLen,
}
//创建一个可用的条件变量(初始化),返回一个*sync.Cond类型的结果值,我们就可以调用该值拥有的三个方法Wait,Signal,Broadcast
df.rcond = sync.NewCond(df.fmutex.RLocker())
return df, nil
}