无阻塞获取值

select-default模式,节选自fasthttp1.19/client.go#L1955-L1963。

// waiting reports whether w is still waiting for an answer (connection or error).
func (w *wantConn) waiting() bool {
	select {
	case <-w.ready:
		return false
	default:
		return true
	}
}

超时控制

select-timer模式,例如等待tcp节点发送连接包,超时后则关闭连接。

func (n *node) waitForConnectPkt() {
	select {
	case <-n.connected:
		log.Println("接收到连接包")
	case <-time.After(time.Second * 5):
		log.Println("接收连接包超时")
		n.conn.Close()
	}
}

类事件驱动循环

for-select模式,例如监控tcp节点心跳是否正常。

func (n *node) heartbeatDetect() {
	for {
		select {
		case <-n.heartbeat:
			// 收到心跳信号则退出select等待下一次心跳
			break
		case <-time.After(time.Second*3):
			// 心跳超时,关闭连接
			n.conn.Close()
			return
		}
	}
}

带优先级的任务队列

func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
	defer done()

	// When processing events we want to prioritize Node updates over Pod updates,
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
	// start evicting Pods from tainted Nodes.
	for {
		select {
		case <-stopCh:
			return
		case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
			tc.handleNodeUpdate(nodeUpdate)
			tc.nodeUpdateQueue.Done(nodeUpdate)
		case podUpdate := <-tc.podUpdateChannels[worker]:
			// If we found a Pod update we need to empty Node queue first.
		priority:
			for {
				select {
				case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
					tc.handleNodeUpdate(nodeUpdate)
					tc.nodeUpdateQueue.Done(nodeUpdate)
				default:
					break priority
				}
			}
			// After Node queue is emptied we process podUpdate.
			tc.handlePodUpdate(podUpdate)
			tc.podUpdateQueue.Done(podUpdate)
		}
	}
}