TCP异步框架
Golang 编程风格
Go语言面向对象编程的风格是多用组合,少用继承,以匿名嵌入的方式实现继承。
掌握Go语言,要把握一个中心,两个基本点。
channelgo-routine不要通过共享内存来通信,要通过通信来共享内存
channel1. 服务启动开始
1.1 启动心跳定时器循环
func (s *Server) timeOutLoop() { defer s.wg.Done() for { select { case <-s.ctx.Done(): return
case timeout := <-s.timing.TimeOutChannel():
netID := timeout.Ctx.Value(netIDCtx).(int64) if v, ok := s.conns.Load(netID); ok {
sc := v.(*ServerConn)
sc.timerCh <- timeout
} else {
holmes.Warnf("invalid client %d", netID)
}
}
}
}timeOutLoopClinetgoroutineselecttimeOutChanchannelcontextnetIDCtxServerConnchannelsendChhandlerChtimerChServerConntimeCh1.2 服务启动限制处理
如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求。
如果现有的连接数超过了MaxConnections(默认1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。
2. 网络连接处理模块
func (sc *ServerConn) Start() {
holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
onConnect := sc.belong.opts.onConnect if onConnect != nil {
onConnect(sc)
}
loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop} for _, l := range loopers {
looper := l
sc.wg.Add(1) go looper(sc, sc.wg)
}
}ReactorepollreadLoop()writeLoop()handleLoop()这三个协程在连接创建并启动时就会各自独立运行。
2.1 ReadLoop 实现细节
for { select { case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn") return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server") return
default:
msg, err = codec.Decode(rawConn) if err != nil {
holmes.Errorf("error decoding message %v\n", err) if _, ok := err.(ErrUndefined); ok { // update heart beats
setHeartBeatFunc(time.Now().UnixNano()) continue
} return
}
setHeartBeatFunc(time.Now().UnixNano())
handler := GetHandlerFunc(msg.MessageNumber()) if handler == nil { if onMessage != nil {
holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
onMessage(msg, c.(WriteCloser))
} else {
holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
} continue
}
handlerCh <- MessageHandler{msg, handler}
}
}codecrawConncodeckeymessage.goMessageHandlerhandlerChHandleLoop2.2 HandleLoop 实现细节
for { select { case <-cDone: // connectin closed
holmes.Debugln("receiving cancel signal from conn") return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server") return
case msgHandler := <-handlerCh:
msg, handler := msgHandler.message, msgHandler.handler if handler != nil { if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
}) if err != nil {
holmes.Errorln(err)
}
addTotalHandle()
} else {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
}
} case timeout := <-timerCh: if timeout != nil {
timeoutNetID := NetIDFromContext(timeout.Ctx) if timeoutNetID != netID {
holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
} if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
timeout.Callback(time.Now(), c.(WriteCloser))
}) if err != nil {
holmes.Errorln(err)
}
} else {
timeout.Callback(time.Now(), c.(WriteCloser))
}
}
}
}HandleLoophandlerChtimerChchannelchannelhandlerChtimerCh2.2 WriteLoop 实现细节
for { select { case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn") return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server") return
case pkt = <-sendCh: if pkt != nil { if _, err = rawConn.Write(pkt); err != nil {
holmes.Errorf("error writing data %v\n", err) return
}
}
}
}func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
msg := tao.MessageFromContext(ctx).(Message)
holmes.Infof("receving message %s\n", msg.Content)
conn.Write(msg)
}WriteLoopsendChsendChmessageProcessMessageWritesendCh3. 总结
TaoReadLoopHandleLoopWriteLoopLoopchannelgoroutinegoroutinemessageDeserializeMessageProcessMessagecontextContext4. 感谢
leesper