TCP异步框架
Golang 编程风格
Go语言面向对象编程的风格是多用组合,少用继承,以匿名嵌入的方式实现继承。
掌握Go语言,要把握一个中心,两个基本点。
channelgo-routine
不要通过共享内存来通信,要通过通信来共享内存
channel
1. 服务启动开始
1.1 启动心跳定时器循环
func (s *Server) timeOutLoop() { defer s.wg.Done() for { select { case <-s.ctx.Done(): return case timeout := <-s.timing.TimeOutChannel(): netID := timeout.Ctx.Value(netIDCtx).(int64) if v, ok := s.conns.Load(netID); ok { sc := v.(*ServerConn) sc.timerCh <- timeout } else { holmes.Warnf("invalid client %d", netID) } } } }
timeOutLoopClinetgoroutineselecttimeOutChanchannelcontextnetIDCtxServerConnchannelsendChhandlerChtimerCh
ServerConntimeCh
1.2 服务启动限制处理
如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求。
如果现有的连接数超过了MaxConnections(默认1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。
2. 网络连接处理模块
func (sc *ServerConn) Start() { holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr()) onConnect := sc.belong.opts.onConnect if onConnect != nil { onConnect(sc) } loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop} for _, l := range loopers { looper := l sc.wg.Add(1) go looper(sc, sc.wg) } }
Reactorepoll
readLoop()writeLoop()handleLoop()
这三个协程在连接创建并启动时就会各自独立运行。
2.1 ReadLoop 实现细节
for { select { case <-cDone: // connection closed holmes.Debugln("receiving cancel signal from conn") return case <-sDone: // server closed holmes.Debugln("receiving cancel signal from server") return default: msg, err = codec.Decode(rawConn) if err != nil { holmes.Errorf("error decoding message %v\n", err) if _, ok := err.(ErrUndefined); ok { // update heart beats setHeartBeatFunc(time.Now().UnixNano()) continue } return } setHeartBeatFunc(time.Now().UnixNano()) handler := GetHandlerFunc(msg.MessageNumber()) if handler == nil { if onMessage != nil { holmes.Infof("message %d call onMessage()\n", msg.MessageNumber()) onMessage(msg, c.(WriteCloser)) } else { holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber()) } continue } handlerCh <- MessageHandler{msg, handler} } }
codecrawConncodeckeymessage.goMessageHandlerhandlerChHandleLoop
2.2 HandleLoop 实现细节
for { select { case <-cDone: // connectin closed holmes.Debugln("receiving cancel signal from conn") return case <-sDone: // server closed holmes.Debugln("receiving cancel signal from server") return case msgHandler := <-handlerCh: msg, handler := msgHandler.message, msgHandler.handler if handler != nil { if askForWorker { err = WorkerPoolInstance().Put(netID, func() { handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c) }) if err != nil { holmes.Errorln(err) } addTotalHandle() } else { handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c) } } case timeout := <-timerCh: if timeout != nil { timeoutNetID := NetIDFromContext(timeout.Ctx) if timeoutNetID != netID { holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID) } if askForWorker { err = WorkerPoolInstance().Put(netID, func() { timeout.Callback(time.Now(), c.(WriteCloser)) }) if err != nil { holmes.Errorln(err) } } else { timeout.Callback(time.Now(), c.(WriteCloser)) } } } }
HandleLoophandlerChtimerChchannelchannel
handlerChtimerCh
2.2 WriteLoop 实现细节
for { select { case <-cDone: // connection closed holmes.Debugln("receiving cancel signal from conn") return case <-sDone: // server closed holmes.Debugln("receiving cancel signal from server") return case pkt = <-sendCh: if pkt != nil { if _, err = rawConn.Write(pkt); err != nil { holmes.Errorf("error writing data %v\n", err) return } } } }
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) { msg := tao.MessageFromContext(ctx).(Message) holmes.Infof("receving message %s\n", msg.Content) conn.Write(msg) }
WriteLoopsendChsendChmessageProcessMessageWritesendCh
3. 总结
TaoReadLoopHandleLoopWriteLoopLoopchannelgoroutinegoroutine
messageDeserializeMessageProcessMessagecontextContext
4. 感谢
leesper