TCP异步框架

Golang 编程风格

  • Go语言面向对象编程的风格是多用组合,少用继承,以匿名嵌入的方式实现继承。

掌握Go语言,要把握一个中心,两个基本点

channelgo-routine

不要通过共享内存来通信,要通过通信来共享内存

channel

1. 服务启动开始

1.1 启动心跳定时器循环

func (s *Server) timeOutLoop() {    defer s.wg.Done()    for {        select {        case <-s.ctx.Done():            return

        case timeout := <-s.timing.TimeOutChannel():
            netID := timeout.Ctx.Value(netIDCtx).(int64)            if v, ok := s.conns.Load(netID); ok {
                sc := v.(*ServerConn)
                sc.timerCh <- timeout
            } else {
                holmes.Warnf("invalid client %d", netID)
            }
        }
    }
}
timeOutLoopClinetgoroutineselecttimeOutChanchannelcontextnetIDCtxServerConnchannelsendChhandlerChtimerCh
ServerConntimeCh

1.2 服务启动限制处理

  • 如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求。

  • 如果现有的连接数超过了MaxConnections(默认1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。

2. 网络连接处理模块

func (sc *ServerConn) Start() {
    holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
    onConnect := sc.belong.opts.onConnect    if onConnect != nil {
        onConnect(sc)
    }

    loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}    for _, l := range loopers {
        looper := l
        sc.wg.Add(1)        go looper(sc, sc.wg)
    }
}
Reactorepoll
readLoop()writeLoop()handleLoop()

这三个协程在连接创建并启动时就会各自独立运行。

2.1 ReadLoop 实现细节

    for {        select {        case <-cDone: // connection closed
            holmes.Debugln("receiving cancel signal from conn")            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")            return
        default:
            msg, err = codec.Decode(rawConn)            if err != nil {
                holmes.Errorf("error decoding message %v\n", err)                if _, ok := err.(ErrUndefined); ok {                    // update heart beats
                    setHeartBeatFunc(time.Now().UnixNano())                    continue
                }                return
            }
            setHeartBeatFunc(time.Now().UnixNano())
            handler := GetHandlerFunc(msg.MessageNumber())            if handler == nil {                if onMessage != nil {
                    holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
                    onMessage(msg, c.(WriteCloser))
                } else {
                    holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
                }                continue
            }
            handlerCh <- MessageHandler{msg, handler}
        }
    }
codecrawConncodeckeymessage.goMessageHandlerhandlerChHandleLoop

2.2 HandleLoop 实现细节

for {    select {    case <-cDone: // connectin closed
        holmes.Debugln("receiving cancel signal from conn")        return
    case <-sDone: // server closed
        holmes.Debugln("receiving cancel signal from server")        return
    case msgHandler := <-handlerCh:
        msg, handler := msgHandler.message, msgHandler.handler        if handler != nil {            if askForWorker {
                err = WorkerPoolInstance().Put(netID, func() {
                    handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                })                if err != nil {
                    holmes.Errorln(err)
                }
                addTotalHandle()
            } else {
                handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
            }
        }    case timeout := <-timerCh:        if timeout != nil {
            timeoutNetID := NetIDFromContext(timeout.Ctx)            if timeoutNetID != netID {
                holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
            }            if askForWorker {
                err = WorkerPoolInstance().Put(netID, func() {
                    timeout.Callback(time.Now(), c.(WriteCloser))
                })                if err != nil {
                    holmes.Errorln(err)
                }
            } else {
                timeout.Callback(time.Now(), c.(WriteCloser))
            }
        }
    }
}
HandleLoophandlerChtimerChchannelchannel
handlerChtimerCh

2.2 WriteLoop 实现细节

for {    select {    case <-cDone: // connection closed
        holmes.Debugln("receiving cancel signal from conn")        return
    case <-sDone: // server closed
        holmes.Debugln("receiving cancel signal from server")        return
    case pkt = <-sendCh:        if pkt != nil {            if _, err = rawConn.Write(pkt); err != nil {
                holmes.Errorf("error writing data %v\n", err)                return
            }
        }
    }
}
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
    msg := tao.MessageFromContext(ctx).(Message)
    holmes.Infof("receving message %s\n", msg.Content)
    conn.Write(msg)
}
WriteLoopsendChsendChmessageProcessMessageWritesendCh

3. 总结

TaoReadLoopHandleLoopWriteLoopLoopchannelgoroutinegoroutine
messageDeserializeMessageProcessMessagecontextContext

4. 感谢

leesper