attachments-2021-09-L0y6c211613c06bdcfc37.jpg

Pitaya是一款由国外游戏公司topfreegames使用golang进行编写,易于使用,快速且轻量级的开源分布式游戏服务器框架
Pitaya使用etcd作为默认的服务发现组件,提供使用nats和grpc进行远程调用(server to server)的可选配置,并提供在docker中运行以上组件(etcd、nats)的docker-compose配置

抽象分析

  • PlayerConn
    PlayerConn是一个封装的连接对象,继承net.Conn,并提供一个获取下一个数据包的方法

    type PlayerConn interface {
        GetNextMessage() (b []byte, err error)
        net.Conn
    }
  • Acceptor
    Acceptor代表一个服务端端口进程,接收客户端连接,并用一个内部Chan来维护这些连接对象

    type Acceptor interface {
        ListenAndServe()
        Stop()
        GetAddr() string
        GetConnChan() chan PlayerConn
    }
  • Acceptorwrapper
    Acceptorwrapper义如其名就是Acceptor的包装器,因为Acceptor的通过Chan来保存连接
    所以wrapper可以通过遍历这个Chan来实时包装这些连接

    type Wrapper interface {
        Wrap(acceptor.Acceptor) acceptor.Acceptor
    }
  • Agent
    Agent是一个服务端的应用层连接对象,包含了:
    Session信息
    服务器预发送消息队列
    拆解包对象
    最后心跳时间
    停止发送心跳的chan
    关闭发送数据的chan
    全局的关闭信号
    连接对象
    Agent当前状态
    … ..

    type (
        // Agent corresponds to a user and is used for storing raw Conn information
        Agent struct {
            Session            *session.Session  // session
            appDieChan         chan bool         // app die channel
            chDie              chan struct{}     // wait for close
            chSend             chan pendingWrite // push message queue
            chStopHeartbeat    chan struct{}     // stop heartbeats
            chStopWrite        chan struct{}     // stop writing messages
            closeMutex         sync.Mutex
            conn               net.Conn            // low-level conn fd
            decoder            codec.PacketDecoder // binary decoder
            encoder            codec.PacketEncoder // binary encoder
            heartbeatTimeout   time.Duration
            lastAt             int64 // last heartbeat unix time stamp
            messageEncoder     message.Encoder
            ... ...
            state              int32                // current agent state
        }

        pendingWrite struct {
            ctx  context.Context
            data []byte
            err  error
        }
    )
  • Component
    Component代表业务组件,提供若干个接口
    通过Component生成处理请求的Service

    type Component interface {
        Init()
        AfterInit()
        BeforeShutdown()
        Shutdown()
    }
  • Handler、Remote、Service
    Handler和Remote分别代表本地逻辑执行器和远程逻辑执行器
    Service是一组服务对象,包含若干Handler和Remote
    这里有个温柔的细节——Receiver    reflect.Value
    pitaya的设计者为了降低引用,采取在逻辑执行器中保留方法的Receiver以达到在Handler和Remote对象中,只需要保存类型的Method,而无需保存带对象引用的Value.Method

    type (
        //Handler represents a message.Message's handler's meta information.
        Handler struct {
            Receiver    reflect.Value  // receiver of method
            Method      reflect.Method // method stub
            Type        reflect.Type   // low-level type of method
            IsRawArg    bool           // whether the data need to serialize
            MessageType message.Type   // handler allowed message type (either request or notify)
        }

        //Remote represents remote's meta information.
        Remote struct {
            Receiver reflect.Value  // receiver of method
            Method   reflect.Method // method stub
            HasArgs  bool           // if remote has no args we won't try to serialize received data into arguments
            Type     reflect.Type   // low-level type of method
        }

        // Service implements a specific service, some of it's methods will be
        // called when the correspond events is occurred.
        Service struct {
            Name     string              // name of service
            Type     reflect.Type        // type of the receiver
            Receiver reflect.Value       // receiver of methods for the service
            Handlers map[string]*Handler // registered methods
            Remotes  map[string]*Remote  // registered remote methods
            Options  options             // options
        }
    )
  • Modules
    Modules模块和Component结构一致,唯一的区别在于使用上
    Modules主要是面向系统的一些全局存活的对象
    方便在统一的时机,集中进行启动和关闭

    type Base struct{}

    func (c *Base) Init() error {
        return nil
    }

    func (c *Base) AfterInit() {}

    func (c *Base) BeforeShutdown() {}

    func (c *Base) Shutdown() error {
        return nil
    }

集中管理的对象容器在外部module.go中定义

    var (
        modulesMap = make(map[string]interfaces.Module)
        modulesArr = []moduleWrapper{}
    )

    type moduleWrapper struct {
        module interfaces.Module
        name   string
    }
  • HandleService
    HandleService就是服务端的主逻辑对象,负责处理一切数据包
    chLocalProcess用于保存待处理的客户端数据包
    chRemoteProcess用于保存待处理的来自其他服务器的数据包
    services注册了处理客户端的服务
    内部聚合一个RemoteService对象,专门负责处理服务器间的数据包

    type (
        HandlerService struct {
            appDieChan         chan bool             // die channel app
            chLocalProcess     chan unhandledMessage // channel of messages that will be processed locally
            chRemoteProcess    chan unhandledMessage // channel of messages that will be processed remotely
            decoder            codec.PacketDecoder   // binary decoder
            encoder            codec.PacketEncoder   // binary encoder
            heartbeatTimeout   time.Duration
            messagesBufferSize int
            remoteService      *RemoteService
            serializer         serialize.Serializer          // message serializer
            server             *cluster.Server               // server obj
            services           map[string]*component.Service // all registered service
            messageEncoder     message.Encoder
            metricsReporters   []metrics.Reporter
        }

        unhandledMessage struct {
            ctx   context.Context
            agent *agent.Agent
            route *route.Route
            msg   *message.Message
        }
    )
  • RemoteService
    RemoteService中维护服务发现和注册提供的远程服务

    type RemoteService struct {
        rpcServer              cluster.RPCServer
        serviceDiscovery       cluster.ServiceDiscovery
        serializer             serialize.Serializer
        encoder                codec.PacketEncoder
        rpcClient              cluster.RPCClient
        services               map[string]*component.Service // all registered service
        router                 *router.Router
        messageEncoder         message.Encoder
        server                 *cluster.Server // server obj
        remoteBindingListeners []cluster.RemoteBindingListener
    }
  • Timer
    Timer模块中维护一个全局定时任务管理者,使用线程安全的map来保存定时任务,通过time.Ticker的chan信号来定期触发

    var (
        Manager = &struct {
            incrementID    int64      
            timers         sync.Map   
            ChClosingTimer chan int64 
            ChCreatedTimer chan *Timer
        }{}

        Precision = time.Second

        GlobalTicker *time.Ticker
    )
  • pipeline
    pipeline模块提供全局钩子函数的配置
    BeforeHandler 在业务逻辑之前执行
    AfterHandler 在业务逻辑之后执行

    var (
        BeforeHandler = &pipelineChannel{}
        AfterHandler = &pipelineAfterChannel{}
    )

    type (
        HandlerTempl func(ctx context.Context, in interface{}) (out interface{}, err error)AfterHandlerTempl func(ctx context.Context, out interface{}, err error) (interface{}, error)pipelineChannel struct {
            Handlers []HandlerTempl
        }

        pipelineAfterChannel struct {
            Handlers []AfterHandlerTempl
        }
    )

框架流程

app.go是系统启动的入口
创建HandlerService
并根据启动模式如果是集群模式创建RemoteService
开启服务端事件监听
开启监听服务器关闭信号的Chan

    var (
        app = &App{
            ... ..
        }

        remoteService  *service.RemoteService
        handlerService *service.HandlerService
    )
    func Start() {
        ... ..
        if app.serverMode == Cluster {
            ... ..
            app.router.SetServiceDiscovery(app.serviceDiscovery)

            remoteService = service.NewRemoteService(
                app.rpcClient,
                app.rpcServer,
                app.serviceDiscovery,
                app.router,
                ... ..
            )

            app.rpcServer.SetPitayaServer(remoteService)

            initSysRemotes()
        }

        handlerService = service.NewHandlerService(
            app.dieChan,
            app.heartbeat,
            app.server,
            remoteService,
            ... ..
        )

        ... ..

        listen()
        ... ..
        // stop server
        select {
        case             logger.Log.Warn("the app will shutdown in a few seconds")
        case s :=             logger.Log.Warn("got signal: ", s, ", shutting down...")
            close(app.dieChan)
        }
        ... ..
    }

listen方法也就是开启服务,具体包括以下步骤:
1.注册Component
2.注册定时任务的GlobalTicker
3.开启Dispatch处理业务和定时任务(ticket)的goroutine
4.开启acceptor处理连接的goroutine
5.开启主逻辑的goroutine
6.注册Modules

    func listen() {
        startupComponents()

        timer.GlobalTicker = time.NewTicker(timer.Precision)

        logger.Log.Infof("starting server %s:%s", app.server.Type, app.server.ID)
        for i := 0; i "pitaya.concurrency.handler.dispatch"); i++ {
            go handlerService.Dispatch(i)
        }
        for _, acc := range app.acceptors {
            a := acc
            go func() {
                for conn := range a.GetConnChan() {
                    go handlerService.Handle(conn)
                }
            }()

            go func() {
                a.ListenAndServe()
            }()

            logger.Log.Infof("listening with acceptor %s on addr %s", reflect.TypeOf(a), a.GetAddr())
        }
        ... ..
        startModules()

        logger.Log.Info("all modules started!")

        app.running = true
    }

startupComponents对Component进行初始化
然后把Component注册到handlerService和remoteService上

    func startupComponents() {
        // component initialize hooks
        for _, c := range handlerComp {
            c.comp.Init()
        }

        // component after initialize hooks
        for _, c := range handlerComp {
            c.comp.AfterInit()
        }

        // register all components
        for _, c := range handlerComp {
            if err := handlerService.Register(c.comp, c.opts); err != nil {
                logger.Log.Errorf("Failed to register handler: %s", err.Error())
            }
        }

        // register all remote components
        for _, c := range remoteComp {
            if remoteService == nil {
                logger.Log.Warn("registered a remote component but remoteService is not running! skipping...")
            } else {
                if err := remoteService.Register(c.comp, c.opts); err != nil {
                    logger.Log.Errorf("Failed to register remote: %s", err.Error())
                }
            }
        }
        ... ..
    }

比如HandlerService的注册,反射得到component类型的全部方法,判断isHandlerMethod就加入services里面
并聚合Component对象的反射Value对象为全部Handler的Method Receiver,减少了对象引用

    func NewService(comp Component, opts []Option) *Service {
        s := &Service{
            Type:     reflect.TypeOf(comp),
            Receiver: reflect.ValueOf(comp),
        }
        ... ..
        return s
    }

    func (h *HandlerService) Register(comp component.Component, opts []component.Option) error {
        s := component.NewService(comp, opts)
        ... ..
        if err := s.ExtractHandler(); err != nil {
            return err
        }

        h.services[s.Name] = s
        for name, handler := range s.Handlers {
            handlers[fmt.Sprintf("%s.%s", s.Name, name)] = handler
        }
        return nil
    }
    func (s *Service) ExtractHandler() error {
        typeName := reflect.Indirect(s.Receiver).Type().Name()
        ... ..
        s.Handlers = suitableHandlerMethods(s.Type, s.Options.nameFunc)
        ... ..
        for i := range s.Handlers {
            s.Handlers[i].Receiver = s.Receiver
        }
        return nil
    }
    func suitableHandlerMethods(typ reflect.Type, nameFunc func(string) string) map[string]*Handler {
        methods := make(map[string]*Handler)
        for m := 0; m             method := typ.Method(m)
            mt := method.Type
            mn := method.Name
            if isHandlerMethod(method) {
                ... ..
                handler := &Handler{
                    Method:      method,
                    IsRawArg:    raw,
                    MessageType: msgType,
                }
                ... ..
                methods[mn] = handler
            }
        }
        return methods
    }

handlerService.Dispatch方法负责各种业务的处理,包括:
1.处理chLocalProcess中的本地Message
2.使用remoteService处理chRemoteProcess中的远程Message
3.在定时ticket到达时调用timer.Cron执行定时任务
4.管理定时任务的创建
5.管理定时任务的删除

    func (h *HandlerService) Dispatch(thread int) {
        defer timer.GlobalTicker.Stop()

        for {
            select {
            case lm :=                 metrics.ReportMessageProcessDelayFromCtx(lm.ctx, h.metricsReporters, "local")
                h.localProcess(lm.ctx, lm.agent, lm.route, lm.msg)

            case rm :=                 metrics.ReportMessageProcessDelayFromCtx(rm.ctx, h.metricsReporters, "remote")
                h.remoteService.remoteProcess(rm.ctx, nil, rm.agent, rm.route, rm.msg)

            case // execute cron task
                timer.Cron()

            case t := // new Timers
                timer.AddTimer(t)

            case id := // closing Timers
                timer.RemoveTimer(id)
            }
        }
    }

接下来看看Acceptor的工作,以下为Tcp实现,就是负责接收连接,流入acceptor的Chan

    func (a *TCPAcceptor) ListenAndServe() {
        if a.hasTLSCertificates() {
            a.ListenAndServeTLS(a.certFile, a.keyFile)
            return
        }

        listener, err := net.Listen("tcp", a.addr)
        if err != nil {
            logger.Log.Fatalf("Failed to listen: %s", err.Error())
        }
        a.listener = listener
        a.running = true
        a.serve()
    }
    func (a *TCPAcceptor) serve() {
        defer a.Stop()
        for a.running {
            conn, err := a.listener.Accept()
            if err != nil {
                logger.Log.Errorf("Failed to accept TCP connection: %s", err.Error())
                continue
            }

            a.connChan                 Conn: conn,
            }
        }
    }

前面讲过对于每个Acceptor开启了一个goroutine去处理连接,也就是下面代码

    for conn := range a.GetConnChan() {
        go handlerService.Handle(conn)
    }

所以流入Chan的连接就会被实时的开启一个goroutine去处理,处理过程就是先创建一个Agent对象
并开启一个goroutine给Agent负责维护连接的心跳
然后开启死循环,读取连接的数据processPacket

    func (h *HandlerService) Handle(conn acceptor.PlayerConn) {
        // create a client agent and startup write goroutine
        a := agent.NewAgent(conn, h.decoder, h.encoder, h.serializer, h.heartbeatTimeout, h.messagesBufferSize, h.appDieChan, h.messageEncoder, h.metricsReporters)

        // startup agent goroutine
        go a.Handle()
        ... ..
        for {
            msg, err := conn.GetNextMessage()

            if err != nil {
                logger.Log.Errorf("Error reading next available message: %s", err.Error())
                return
            }

            packets, err := h.decoder.Decode(msg)
            if err != nil {
                logger.Log.Errorf("Failed to decode message: %s", err.Error())
                return
            }

            if len(packets) 1 {
                logger.Log.Warnf("Read no packets, data: %v", msg)
                continue
            }

            // process all packet
            for i := range packets {
                if err := h.processPacket(a, packets[i]); err != nil {
                    logger.Log.Errorf("Failed to process packet: %s", err.Error())
                    return
                }
            }
        }
    }

这时如果使用了pitaya提供的漏桶算法实现的限流wrap来包装acceptor,则会对客户端发送的消息进行限流限速
这里也是灵活利用for循环遍历chan的特性,所以也是实时地对连接进行包装

    func (b *BaseWrapper) ListenAndServe() {
        go b.pipe()
        b.Acceptor.ListenAndServe()
    }

    // GetConnChan returns the wrapper conn chan
    func (b *BaseWrapper) GetConnChan() chan acceptor.PlayerConn {
        return b.connChan
    }

    func (b *BaseWrapper) pipe() {
        for conn := range b.Acceptor.GetConnChan() {
            b.connChan         }
    }
    type RateLimitingWrapper struct {
        BaseWrapper
    }

    func NewRateLimitingWrapper(c *config.Config) *RateLimitingWrapper {
        r := &RateLimitingWrapper{}
        r.BaseWrapper = NewBaseWrapper(func(conn acceptor.PlayerConn) acceptor.PlayerConn {
            ... ..
            return NewRateLimiter(conn, limit, interval, forceDisable)
        })
        return r
    }

    func (r *RateLimitingWrapper) Wrap(a acceptor.Acceptor) acceptor.Acceptor {
        r.Acceptor = a
        return r
    }

    func (r *RateLimiter) GetNextMessage() (msg []byte, err error) {
        if r.forceDisable {
            return r.PlayerConn.GetNextMessage()
        }

        for {
            msg, err := r.PlayerConn.GetNextMessage()
            if err != nil {
                return nil, err
            }

            now := time.Now()
            if r.shouldRateLimit(now) {
                logger.Log.Errorf("Data=%s, Error=%s", msg, constants.ErrRateLimitExceeded)
                metrics.ReportExceededRateLimiting(pitaya.GetMetricsReporters())
                continue
            }

            return msg, err
        }
    }

processPacket对数据包解包后,执行processMessage

    func (h *HandlerService) processPacket(a *agent.Agent, p *packet.Packet) error {
        switch p.Type {
        case packet.Handshake:
            ... ..
        case packet.HandshakeAck:
            ... ..
        case packet.Data:
            if a.GetStatus()                 return fmt.Errorf("receive data on socket which is not yet ACK, session will be closed immediately, remote=%s",
                    a.RemoteAddr().String())
            }
            msg, err := message.Decode(p.Data)
            if err != nil {
                return err
            }
            h.processMessage(a, msg)
        case packet.Heartbeat:
            // expected
        }
        a.SetLastAt()
        return nil
    }

processMessage中包装数据包为unHandledMessage
根据消息类型,流入chLocalProcess 或者chRemoteProcess 也就转交给上面提到的负责Dispatch的goroutine去处理了

    func (h *HandlerService) processMessage(a *agent.Agent, msg *message.Message) {
        requestID := uuid.New()
        ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, time.Now().UnixNano())
        ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, msg.Route)
        ctx = pcontext.AddToPropagateCtx(ctx, constants.RequestIDKey, requestID.String())
        tags := opentracing.Tags{
            "local.id":   h.server.ID,
            "span.kind":  "server",
            "msg.type":   strings.ToLower(msg.Type.String()),
            "user.id":    a.Session.UID(),
            "request.id": requestID.String(),
        }
        ctx = tracing.StartSpan(ctx, msg.Route, tags)
        ctx = context.WithValue(ctx, constants.SessionCtxKey, a.Session)

        r, err := route.Decode(msg.Route)
        ... ..
        message := unhandledMessage{
            ctx:   ctx,
            agent: a,
            route: r,
            msg:   msg,
        }
        if r.SvType == h.server.Type {
            h.chLocalProcess         } else {
            if h.remoteService != nil {
                h.chRemoteProcess             } else {
                logger.Log.Warnf("request made to another server type but no remoteService running")
            }
        }
    }

服务器进程启动的最后一步是对全局模块启动

在外部的module.go文件中,提供了对module的全局注册方法、全部顺序启动方法、全部顺序关闭方法

    func RegisterModule(module interfaces.Module, name string) error {
        ... ..
    }

    func startModules() {
        for _, modWrapper := range modulesArr {
            modWrapper.module.Init()
        }
        for _, modWrapper := range modulesArr {
            modWrapper.module.AfterInit()
        }
    }

    func shutdownModules() {
        for i := len(modulesArr) - 1; i >= 0; i-- {
            modulesArr[i].module.BeforeShutdown()
        }

        for i := len(modulesArr) - 1; i >= 0; i-- {
            mod := modulesArr[i].module
            mod.Shutdown()
        }
    }

处理细节

  • localProcess
    接下来看看localprocess对于消息的处理细节(为了直观省略部分异常处理代码)
    使用processHandlerMessagef方法对包装出来的ctx对象进行业务操作
    最终根据消息的类型 notify / Request 区分是否需要响应,执行不同处理

    func (h *HandlerService) localProcess(ctx context.Context, a *agent.Agent, route *route.Route, msg *message.Message) {
        var mid uint
        switch msg.Type {
        case message.Request:
            mid = msg.ID
        case message.Notify:
            mid = 0
        }

        ret, err := processHandlerMessage(ctx, route, h.serializer, a.Session, msg.Data, msg.Type, false)
        if msg.Type != message.Notify {
            ... ..
            err := a.Session.ResponseMID(ctx, mid, ret)
            ... ..
        } else {
            metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, nil)
            tracing.FinishSpan(ctx, err)
        }
    }
  • processHandlerMessage
    这里面负进行业务逻辑
    会先调用executeBeforePipeline(ctx, arg),执行前置的钩子函数
    再通过util.Pcall(h.Method, args)反射调用handler方法
    再调用executeAfterPipeline(ctx, resp, err),执行后置的钩子函数
    最后调用serializeReturn(serializer, resp),对请求结果进行序列化

    func processHandlerMessage(
        ctx context.Context,
        rt *route.Route,
        serializer serialize.Serializer,
        session *session.Session,
        data []byte,
        msgTypeIface interface{},
        remote bool,
    ) ([]byte, error) {
        if ctx == nil {
            ctx = context.Background()
        }
        ctx = context.WithValue(ctx, constants.SessionCtxKey, session)
        ctx = util.CtxWithDefaultLogger(ctx, rt.String(), session.UID())

        h, err := getHandler(rt)
        ... ..

        msgType, err := getMsgType(msgTypeIface)
        ... ..

        logger := ctx.Value(constants.LoggerCtxKey).(logger.Logger)
        exit, err := h.ValidateMessageType(msgType)
        ... ..

        arg, err := unmarshalHandlerArg(h, serializer, data)
        ... ..

        if arg, err = executeBeforePipeline(ctx, arg); err != nil {
            return nil, err
        }
        ... ..

        args := []reflect.Value{h.Receiver, reflect.ValueOf(ctx)}
        if arg != nil {
            args = append(args, reflect.ValueOf(arg))
        }

        resp, err := util.Pcall(h.Method, args)
        if remote && msgType == message.Notify {
            resp = []byte("ack")
        }

        resp, err = executeAfterPipeline(ctx, resp, err)
        ... ..

        ret, err := serializeReturn(serializer, resp)
        ... ..

        return ret, nil
    }
  • executeBeforePipeline
    实际就是执行pipeline的BeforeHandler

    func executeBeforePipeline(ctx context.Context, data interface{}) (interface{}, error) {
        var err error
        res := data
        if len(pipeline.BeforeHandler.Handlers) > 0 {
            for _, h := range pipeline.BeforeHandler.Handlers {
                res, err = h(ctx, res)
                if err != nil {
                    logger.Log.Debugf("pitaya/handler: broken pipeline: %s", err.Error())
                    return res, err
                }
            }
        }
        return res, nil
    }
  • executeAfterPipeline
    实际就是执行pipeline的AfterHandler

    func executeAfterPipeline(ctx context.Context, res interface{}, err error) (interface{}, error) {
        ret := res
        if len(pipeline.AfterHandler.Handlers) > 0 {
            for _, h := range pipeline.AfterHandler.Handlers {
                ret, err = h(ctx, ret, err)
            }
        }
        return ret, err
    }

util.pcall里展示了golang反射的一种高级用法
method.Func.Call,第一个参数是Receiver,也就是调用对象方法的实例
这种设计对比直接保存Value对象的method,反射时直接call,拥有的额外好处就是降低了对象引用,方法不和实例绑定

    func Pcall(method reflect.Method, args []reflect.Value) (rets interface{}, err error) {
        ... ..
        r := method.Func.Call(args)
        if len(r) == 2 {
            if v := r[1].Interface(); v != nil {
                err = v.(error)
            } else if !r[0].IsNil() {
                rets = r[0].Interface()
            } else {
                err = constants.ErrReplyShouldBeNotNull
            }
        }
        return
    }

程序员编程交流QQ群:805358732

如果你想用Python开辟副业赚钱,但不熟悉爬虫与反爬虫技术,没有接单途径,也缺乏兼职经验
关注下方微信公众号:Python编程学习圈,获取价值999元全套Python入门到进阶的学习资料以及教程,还有Python技术交流群一起交流学习哦。

attachments-2022-06-bq1R7J5P62afe0ede3e79.jpeg