前言

asongsync.CondjavaGoCond
sync.Cond
GoCondCondGoroutineCondgoroutinegoroutinegoroutinegoroutine

先看这样一个例子:

var (
    done = false
    topic = "Golang梦工厂"
)

func main() {
    cond := sync.NewCond(&sync.Mutex{})
    go Consumer(topic,cond)
    go Consumer(topic,cond)
    go Consumer(topic,cond)
    Push(topic,cond)
    time.Sleep(5 * time.Second)

}

func Consumer(topic string,cond *sync.Cond)  {
    cond.L.Lock()
    for !done{
        cond.Wait()
    }
    fmt.Println("topic is ",topic," starts Consumer")
    cond.L.Unlock()
}

func Push(topic string,cond *sync.Cond)  {
    fmt.Println(topic,"starts Push")
    cond.L.Lock()
    done = true
    cond.L.Unlock()
    fmt.Println("topic is ",topic," wakes all")
    cond.Broadcast()
}
// 运行结果
Golang梦工厂 starts Push
topic is  Golang梦工厂  wakes all
topic is  Golang梦工厂  starts Consumer
topic is  Golang梦工厂  starts Consumer
topic is  Golang梦工厂  starts Consumer
4GoroutineGoroutinecond.Wait()Goroutinecond.BroadcastGoroutine
CondCondGoCondCond
Cond
CondLockerruntime
type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}
4
nocopywaitGroupcheckerDouble checkLwaitnotifywait()GoroutinenotifyList
type notifyList struct {
    wait   uint32
    notify uint32
    lock   uintptr // key field of the mutex
    head   unsafe.Pointer
    tail   unsafe.Pointer
}
notifyList
waitGoroutinenotifyGoroutineheadtail
Cond
wait
wait
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

代码量不多,执行步骤如下:

panicruntime_notifyListAddruntime_notifyListWaitGoroutine
runtime_notifyListAdd
// See runtime/sema.go for documentation.
func notifyListAdd(l *notifyList) uint32 {
    // This may be called concurrently, for example, when called from
    // sync.Cond.Wait while holding a RWMutex in read mode.
    return atomic.Xadd(&l.wait, 1) - 1
}
waitGoroutine
runtime_notifyListWait
// See runtime/sema.go for documentation.
func notifyListWait(l *notifyList, t uint32) {
    lockWithRank(&l.lock, lockRankNotifyList)

    // Return right away if this ticket has already been notified.
    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    // Enqueue itself.
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    if t0 != 0 {
        blockevent(s.releasetime-t0, 2)
    }
    releaseSudog(s)
}

这里主要执行步骤如下:

waitnotifyGoroutineGoroutinegoparkunlockGoroutinereleaseSudogGoroutine

看完源码我们来总结一下注意事项:

waitCondwaitc.L
signalBroadcast
signalBroadcastsignalGoroutineBoradcastGoroutinesignalbroadcast
signal
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }
    lockWithRank(&l.lock, lockRankNotifyList)
    t := l.notify
    if t == atomic.Load(&l.wait) {
        unlock(&l.lock)
        return
    }

    atomic.Store(&l.notify, t+1)

    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}
waitwaitwaitwaitnotifynotifyListticketnotify
notifyListwaitruntime_notifyListAddruntime_notifyListWaitgoroutinenotifyList Waitgoroutine
broadcast
func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil

    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}
readyWithTimegoroutineGoroutine

最后我们总结一下使用这两个方法要注意的问题:

SignalCondGoroutinegoroutine waiter CondgoroutinegoroutineSignal c.LbroadcastCond goroutine goroutineCondgoroutinegoroutinec.L

注意事项

waitpanicwaitgoroutine
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
SignalBoardcast

总结

CondGochannelKubernetesBroadcastCondchannel

好啦,这篇文章就到这里啦,素质三连(分享、点赞、在看)都是笔者持续创作更多优质内容的动力!

创建了一个Golang学习交流群,欢迎各位大佬们踊跃入群,我们一起学习交流。入群方式:加我vx拉你入群,或者公众号获取入群二维码

结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,自己也收集了一本PDF,有需要的小伙可以到自行下载。获取方式:关注公众号:[Golang梦工厂],后台回复:[微服务],即可获取。

我翻译了一份GIN中文文档,会定期进行维护,有需要的小伙伴后台回复[gin]即可下载。

翻译了一份Machinery中文文档,会定期进行维护,有需要的小伙伴们后台回复[machinery]即可获取。

我是asong,一名普普通通的程序猿,让我们一起慢慢变强吧。欢迎各位的关注,我们下期见~~~

推荐往期文章: