无阻塞获取值
select-default模式,节选自fasthttp1.19/client.go#L1955-L1963。
// waiting reports whether w is still waiting for an answer (connection or error).
func (w *wantConn) waiting() bool {
select {
case <-w.ready:
return false
default:
return true
}
}
超时控制
select-timer模式,例如等待tcp节点发送连接包,超时后则关闭连接。
func (n *node) waitForConnectPkt() {
select {
case <-n.connected:
log.Println("接收到连接包")
case <-time.After(time.Second * 5):
log.Println("接收连接包超时")
n.conn.Close()
}
}
类事件驱动循环
for-select模式,例如监控tcp节点心跳是否正常。
func (n *node) heartbeatDetect() {
for {
select {
case <-n.heartbeat:
// 收到心跳信号则退出select等待下一次心跳
break
case <-time.After(time.Second*3):
// 心跳超时,关闭连接
n.conn.Close()
return
}
}
}
带优先级的任务队列
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
defer done()
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}