这是一款go实现的tcp框架,希望可以给大家一个良好的体验

支持以下通信协议

  • UDP
  • TCP
  • KCP(tcpx@v3.0.0 --)

Declaration

  • 由于精力问题,功能的拓展和增强,只会对tcp通信协议来做。UDP只会保留基本的框架预设。
  • 由于第三方KCP包版本问题,为了保持对go1.9及以下的可用性,决定在tcpx@v3.0.0之后,放弃对kcp的支持。

Table of Contents generated with DocToc

设计这个框架的缘由?

Golang对tcp的支持十分友好,不过在包的拆组上,官方没有提供明确的方式。所以在对流协议的处理上,需要用特定的包协议来进行完整地拆包和组包。其次,同样一个服务,不同开发人,项目,协议类型的发起服务的写法,很难做到统一,有点群魔乱舞的味道。最后,传统的tcp处理,很容易写成switch方式来分发协议给不同处理函数,这样的处理方式很容易造成多人开发冲突,不美观而且会将项目变得很重。

所以, tcpx提供安全完整的包协议,提供仿http-gin的写法,保持统一,并且,强制使用人按照类似http样式的路由来分发。基于这样的方式来开发,可以让项目变得和http一样简单。

1. 开始

go get github.com/fwhezfwhez/tcpx

必要依赖

部分本仓库的代码,在运行时,需要安装protoc,protogen-gen,你可以通过下面的链接找到对应的下载方式。

当你下载并安装成功后,确保以下命令可以得到正确输出:

protoc --version

压测

cases exec times cost time per loop cost mem per loop cost alloc mem times per loop url
OnMessage 2000000 643 ns/op 1368 B/op 5 allocs/op
Mux without middleware 2000000 761 ns/op 1368 B/op 5 allocs/op
Mux with middleware 2000000 768 ns/op 1368 B/op 5 allocs/op
  • cost time per loop: 每次执行的耗时,越小越好
  • cost mem per loop: 每次执行的内存占用,越小越好
  • cost alloc mem times per loop: 每次执行时申请内存的次数,越小越好

2. 示例

helloworld

server:

package main

import (
	"fmt"

	"github.com/fwhezfwhez/tcpx"
)

func main() {
	srv := tcpx.NewTcpX(nil)
	srv.OnMessage = func(c *tcpx.Context) {
		var message []byte
		c.Bind(&message)
		fmt.Println(string(message))
	}
	srv.ListenAndServe("tcp", "localhost:8080")
}

client:

package main

import (
	"fmt"
	"net"

	"github.com/fwhezfwhez/tcpx"
	//"tcpx"
)

func main() {
	conn, e := net.Dial("tcp", "localhost:8080")

	if e != nil {
		panic(e)
	}
	var message = []byte("hello world")
	buf, e := tcpx.PackWithMarshaller(tcpx.Message{
		MessageID: 1,
		Header:    nil,
		Body:      message,
	}, nil)
	if e != nil {
		fmt.Println(e.Error())
		return
	}
	_, e = conn.Write(buf)
	if e != nil {
		fmt.Println(e.Error())
		return
	}
}

2.1 心跳

  • tcpx自带心跳机制,可以通过下面的实例代码开启。在心跳开启时,客户端必须在指定间隔时间内,不断地发送心跳,确保不被服务当作僵尸杀死。
  • 心跳机制依赖的是tcpx的mux路由机制。而路由机制是不允许使用Onmessage的,所以使用自带的心跳机制时,需要注意。

srv side

    srv := tcpx.NewTcpX(nil)
    srv.HeartBeatModeDetail(true, 10 * time.Second, false, tcpx.DEFAULT_HEARTBEAT_MESSAGEID)
    // srv.OnMessage =nil       Onmessage必须为nil,否则会使得心跳失效

client side

        var heartBeat []byte
        heartBeat, e = tcpx.PackWithMarshaller(tcpx.Message{
            MessageID: tcpx.DEFAULT_HEARTBEAT_MESSAGEID,
            Header:    nil,
            Body:      nil,
        }, nil)
        for {
            conn.Write(heartBeat)
            time.Sleep(10 * time.Second)
        }

重写心跳逻辑,只要确保在自定义的函数里,执行c.RecvHeartBeat(),过程可以由开发者自我订制

    srv.RewriteHeartBeatHandler(1300, func(c *tcpx.Context) {
        fmt.Println("rewrite heartbeat handler")
        c.RecvHeartBeat()
    })

2.2 在线、离线

  • 在线离线需要使用tcpx自带的用户池。

2.3 优雅退出,重启

  • 优雅退出

在服务被中断时,你可以进行一些收尾工作

  • 优雅停止tcp服务(仅仅是tcp的端口暂停请求进入,服务进程不会因此被终结)

  • 优雅停止有两种执行策略:

closeAllConnection = falsecloseAllConnection = true
  • 优雅重启:
graceful stopgraceful start

2.4 中间件

这里的例子可以告诉你,如何使用tcpx的中间件

2.5 包协议详情

tcpx自带包协议,这里的例子将会描述拆包装包的详情

2.6 聊天

这里是使用tcpx,实现了一个简单的聊天

2.7 无包协议通讯

如果你不喜欢tcpx自带的包协议,可以利用Raw方式来处理请求。以这种方式接入请求,只有全局中间件(r.UseGlobal)和锚中间件(r.Use)会生效。你会发现,使用srv.OnMessage其实等价于Raw+tcp包协议。

使用该通讯方式时,需要自主读流,拆包解析。

2.8 用户池

例子和2.2共享,使用offline和online时,需要使用tcpx自带的(很基本功能)的用户连接池。

srv.WithBuiltInPool(true)ctx.Offline()ctx.Online(username string)

官方的池不打算做很深的拓展,所以如果池的要求很复杂,还是建议使用者自己实现一个。比如它目前无法做到:

  • 一个用户两个渠道同时上线。当然,如果你使用ctx.Online(username:channel),也可以做到多渠道同时上线

2.9 鉴权

鉴权帮助服务端主动隔绝非法连接建立。类似redis, 当连接建立后,必须在指定时间内发送鉴权消息,否则将无法使用服务的任何路由,并且将被杀死。

它和心跳有点不同,因为他一旦收到了正确的鉴权信息,那么它会将该连接视为可信任的,并会结束待鉴权的协程(相比之下,心跳协程会一直持续)。

它和中间件拦截也不同,因为中间件拦截,要求消息体本身,做到像https一样,每一则消息都要带上校验信息才能通过。鉴权只要发出一次正确的请求,则他的生命周期内,都将进行无校验通信(当然,你可以在你的中间件里进行校验通信)。

3. 使用方法

用法总结

使用OnMessage

  • OnMessage 一旦使用,则默认消息内容有开发者自己管理,它将使tcpx的mux路由失效,包括心跳
func main(){
    srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
    srv.OnClose = OnClose
    srv.OnConnect = OnConnect
    srv.OnMessage = OnMessage

    go func(){
        fmt.Println("tcp srv listen on 7171")
        if e := srv.ListenAndServe("tcp", ":7171"); e != nil {
            panic(e)
        }
    }()

    // udp
    go func(){
        fmt.Println("udp srv listen on 7172")
        if e := srv.ListenAndServe("udp", ":7172"); e != nil {
            panic(e)
        }
    }()
    // kcp
    go func(){
        fmt.Println("kcp srv listen on 7173")
        if e := srv.ListenAndServe("kcp", ":7173"); e != nil {
            panic(e)
        }
    }()
    select {}
}

func OnConnect(c *tcpx.Context) {
    fmt.Println(fmt.Sprintf("connecting from remote host %s network %s", c.ClientIP(), c.Network()))
}
func OnClose(c *tcpx.Context) {
    fmt.Println(fmt.Sprintf("connecting from remote host %s network %s has stoped", c.ClientIP(), c.Network())
}
var packx = tcpx.NewPackx(tcpx.JsonMarshaller{})
func OnMessage(c *tcpx.Context) {
    // handle c.Stream
    type ServiceA struct{
        Username string `json:"username"`
    }
    type ServiceB struct{
        ServiceName string `json:"service_name"`
    }

    messageID, e :=packx.MessageIDOf(c.Stream)
    if e!=nil {
        fmt.Println(errorx.Wrap(e).Error())
        return
    }

    switch messageID {
    case 7:
        var serviceA ServiceA
        // block, e := packx.Unpack(c.Stream, &serviceA)
        block, e :=c.Bind(&serviceA)
        fmt.Println(block, e)
        c.Reply(8, "success")
    case 9:
        var serviceB ServiceB
        //block, e := packx.Unpack(c.Stream, &serviceB)
        block, e :=c.Bind(&serviceB)
        fmt.Println(block, e)
        c.JSON(10, "success")
    }
}

使用路由和中间件

  • 下面的例子同时使用了三种中间件,他们是: 全局中间件,锚中间件,路由中间件
  • 执行顺序: 全局-->锚--->路由中间件
  • 锚中间件可以即启即关,包裹在Use和UnUse之间的路由,将享受到这些中间件的加持

这里介绍一下为什么要将Use()方式使用的中间件成为锚中间件: 我们知道船锚在停靠时落下,他的位置很随意,落下时Use(), 升起时UnUse(),它的行为规律正如同我们的中间件一样,即用即起,所以成为锚中间件。

// 全局中间件
srv.UseGlobal(m1)

// 锚中间件
srv.Use(mkey,m)
srv.UnUse(mkey)

// 路由中间件
srv.AddHandler(m1,m2,m3, handler)
func main(){
    srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
    srv.OnClose = OnClose
    srv.OnConnect = OnConnect
    // srv.OnMessage = OnMessage

    srv.UseGlobal(MiddlewareGlobal)
    srv.Use("middleware1", Middleware1, "middleware2", Middleware2)
    srv.AddHandler(1, SayHello)

    srv.UnUse("middleware2")
    srv.AddHandler(3, SayGoodBye)

    if e := srv.ListenAndServe("tcp", ":7171"); e != nil {
        panic(e)
    }
}

func OnConnect(c *tcpx.Context) {
    fmt.Println(fmt.Sprintf("connecting from remote host %s network %s", c.ClientIP(), c.Network()))
}
func OnClose(c *tcpx.Context) {
    fmt.Println(fmt.Sprintf("connecting from remote host %s network %s has stoped", c.ClientIP(), c.Network())
}
// func OnMessage(c *tcpx.Context) {
    // handle c.Stream
// }
func SayHello(c *tcpx.Context) {
    var messageFromClient string
    var messageInfo tcpx.Message
    messageInfo, e := c.Bind(&messageFromClient)
    if e != nil {
        panic(e)
    }
    fmt.Println("receive messageID:", messageInfo.MessageID)
    fmt.Println("receive header:", messageInfo.Header)
    fmt.Println("receive body:", messageInfo.Body)

    var responseMessageID int32 = 2
    e = c.Reply(responseMessageID, "hello")
    fmt.Println("reply:", "hello")
    if e != nil {
        fmt.Println(e.Error())
    }
}

func SayGoodBye(c *tcpx.Context) {
    var messageFromClient string
    var messageInfo tcpx.Message
    messageInfo, e := c.Bind(&messageFromClient)
    if e != nil {
        panic(e)
    }
    fmt.Println("receive messageID:", messageInfo.MessageID)
    fmt.Println("receive header:", messageInfo.Header)
    fmt.Println("receive body:", messageInfo.Body)

    var responseMessageID int32 = 4
    e = c.Reply(responseMessageID, "bye")
    fmt.Println("reply:", "bye")
    if e != nil {
        fmt.Println(e.Error())
    }
}
func Middleware1(c *tcpx.Context) {
    fmt.Println("I am middleware 1 exampled by 'srv.Use(\"middleware1\", Middleware1)'")
}

func Middleware2(c *tcpx.Context) {
    fmt.Println("I am middleware 2 exampled by 'srv.Use(\"middleware2\", Middleware2),srv.UnUse(\"middleware2\")'")
}

func Middleware3(c *tcpx.Context) {
    fmt.Println("I am middleware 3 exampled by 'srv.AddHandler(5, Middleware3, SayName)'")
}

func MiddlewareGlobal(c *tcpx.Context) {
    fmt.Println("I am global middleware exampled by 'srv.UseGlobal(MiddlewareGlobal)'")
}

3.1 使用中间件的详情

全局中间件
    srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
    srv.UseGlobal(MiddlewareGlobal)
路由中间件
    srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
    srv.AddHandler(5, Middleware3, SayName)
锚中间件
    srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
    srv.Use("middleware1", Middleware1, "middleware2", Middleware2)
    srv.AddHandler(5, SayName)
中间件实现的例子
  • 使用c.Next()执行下一个装载的中间件,或者路由
  • 使用c.Abort()终止链式处理,用法和gin完全一致。
func Middleware1(c *tcpx.Context) {
    fmt.Println("I am middleware 1 exampled by 'srv.Use(\"middleware1\", Middleware1)'")
    // c.Next()
    // c.Abort()
}
执行顺序全局锚路由中间件
c.Abortc.Abort()

注意: 再次重申一遍,路由中间件将在启用OnMessage时失效,而全局和锚中间件,横跨所有处理方式,所有协议(包括udp,kcp),都将生效。

3.2 省略

3.3 如何准备一段待发送(待回复)的消息?

  • tcpx自带包协议,你只需要传入(最少)messageID,消息载荷
  • 消息载荷将在序列化时,使用srv声明的序列化方法,默认是json,包括但不限于(json,protobuf, toml, yaml,xml),并且支持自定义协议
  • 自定义协议仅仅是对载荷的序列化可以自设计,不能改变协议规则中 header与 messageid的序列方式。(因为只有json才可以序列化map,而header是map)

client

func main(){
    var packx = tcpx.NewPackx(tcpx.JsonMarshaller{})
    buf1, e := packx.Pack(5, "hello,I am client xiao ming")
    buf2, e := packx.Pack(7, struct{
    Username string
    Age int
    }{"xiaoming", 5})
    ...
}

If you're not golang client, see

3.4 选择/自定义包协议的载荷序列化方式

Now, tcpx supports json,xml,protobuf,toml,yaml like:

client

var packx = tcpx.NewPackx(tcpx.JsonMarshaller{})
// var packx = tcpx.NewPackx(tcpx.XmlMarshaller{})
// var packx = tcpx.NewPackx(tcpx.ProtobufMarshaller{})
// var packx = tcpx.NewPackx(tcpx.TomlMarshaller{})
// var packx = tcpx.NewPackx(tcpx.YamlMarshaller{})

server

srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
// srv := tcpx.NewTcpX(tcpx.XmlMarshaller{})
// srv := tcpx.NewTcpX(tcpx.ProtobufMarshaller{})
// srv := tcpx.NewTcpX(tcpx.TomlMarshaller{})
// srv := tcpx.NewTcpX(tcpx.YamlMarshaller{})

自定义:

type OtherMarshaller struct{}
func (om OtherMarshaller) Marshal(v interface{}) ([]byte, error) {
    return []byte(""), nil
}
func (om OtherMarshaller) Unmarshal(data []byte, dest interface{}) error {
    return nil
}
func (om OtherMarshaller) MarshalName() string{
    return "other_marshaller"
}

client

var packx = tcpx.NewPackx(OtherMarshaller{})

server

srv := tcpx.NewTcpX(tcpx.OtherMarshaller{})

3.5 非golang如何生成包协议.

tcpx官方提供了go版本的包协议生成,并且提供了python版的参考样例。其他语言,可能需要开发者自己去适配了,当然,tcpx提供http协议网关,你可以参考#5来使用它,

以下是包协议细节

[4]byte -- length             固定4字节,大端编码,指代消息长度(length = packLength-4)
[4]byte -- messageID          固定4字节,大端编码
[4]byte -- headerLength       固定4字节,大端编码
[4]byte -- bodyLength         固定4字节,大端编码
[]byte -- header              json序列化的map结构
[]byte -- body                任意序列化的载荷

因为不是每个序列化方式,都支持序列化map结构,所以header固定用json来序列化。

3.6 不想使用messageID来做路由

messageID的路由虽然很小巧精致,但是不得不说,他在团队开发时,有一个问题,messageID不能重复,所以开发时,多人需要注意不能混用,重用。messageID的同步会增加心理负担。 通过可读化的文本来路由的方式,已经提上日程,不久后将支持同时使用messageid和文本写路由。有兴趣的读者,可以在这里前瞻:

github.com/fwhezfwhez/wsx

使用时,类似:

// 以下方式规划中
srv.Any("/user/create-user/", mux, handler)
srv.Any("/send-heartbeat/")

3.7 业务分发?

tcpx目前基于messageid进行路由分发(基于文本分发的方式规划中)

func main(){
    srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
    // request messageID 1
    // response messageID 2
    srv.AddHandler(1, SayHello)
    if e := srv.ListenAndServe("tcp", ":7171"); e != nil {
        panic(e)
    }
}
func SayHello(c *tcpx.Context) {
    var messageFromClient string
    var messageInfo tcpx.Message
    messageInfo, e := c.Bind(&messageFromClient)
    if e != nil {
        panic(e)
    }
    fmt.Println("receive messageID:", messageInfo.MessageID)
    fmt.Println("receive header:", messageInfo.Header)
    fmt.Println("receive body:", messageInfo.Body)

    var responseMessageID int32 = 2
    e = c.Reply(responseMessageID, "hello")
    fmt.Println("reply:", "hello")
    if e != nil {
        fmt.Println(e.Error())
    }
}

4. 使用频率比较高的方法

这里,将对核心api进行单独讲解。

参数字段将被省略

tcpx.TcpX
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
methods desc
srv.GlobalUse() 使用全局中间件
srv.Use() 使用锚中间件
srv.UnUse() 锚中间件升起。组合之间的路由将执行中间件的逻辑。升起后的锚可以在后续的路由中不断使用和升起。
srv.AddHandler() 增加路由
srv.ListenAndServe() 监听启动,成功时会阻塞
tcpx.Context
var c *tcpx.Context
methods desc
c.Bind() 将数据源绑定进指定结构体
c.Reply() 回复消息
c.Next() 中间件放行
c.Abort() 中间件阻断
c.JSON() 回复json消息
c.XML() 回复xml消息
c.YAML() 回复yaml消息
c.Protobuf() 回复protobuf消息
c.TOML() 回复toml消息
tcpx.Packx
var packx *tcpx.Packx
methods desc
packx.Pack() 生成包协议
packx.UnPack() 解析包协议
packx. MessageIDOf() 从stream从获取它的messageID,要求stream已经按照协议完美切割成一块
packx.LengthOf() stream块长度-4
tcpx.Message
var message tcpx.Message
methods desc
message.Get() 从消息的header中获取一个key对应的值
message.Set() 将键值设进消息的header

5. 协议转换网关

go run main.go -port 7000

5.1 Gateway pack detail

note: Each message should call once

POST http://localhost:7000/gateway/pack/transfer/
application/json

body:

{
    "marshal_name":<marshal_name>,
    "stream": <stream>,
    "message_id": <message_id>,
    "header": <header>
}
"json","xml", "toml", "yaml", "protobuf"

returns:

{
    "message":<message>,
    "stream":<stream>
}
field type desc example nessessary
message string "success" when status 200, "success", "error message" when 400/500 "success" yes
stream []byte packed stream,when error or status not 200, no stream field no

example:

payload:

{"username": "hello, tcpx"}   ---json-->  "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0="

request:

{
    "marshal_name": "json",
    "stream": "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0=",
    "message_id": 1,
    "header": {
      "api": "/pack/"
    }
}

example response:

{
    "stream": "AAAANgAAAAEAAAAQAAAAGnsiYXBpIjoiL3BhY2svIn17InVzZXJuYW1lIjoiaGVsbG8sIHRjcHgifQ=="
}

5.2 Gateway unpack detail

note: able to unpack many messages once.

POST http://localhost:7000/gateway/unpack/transfer/
application/json

body:

{
    "marshal_name": <marshal_name>,
    "stream": <stream>
}
"json","xml", "toml", "yaml", "protobuf"

returns:

{
    "message": <message>,
    "blocks" <blocks>
}
field type desc example nessessary
message string "success" when status 200, "success", "error message" when 400/500 "success" yes
blocks []block unpacked blocks, when status not 200, no this field no
block obj each message block information, when status not 200,no this field ++ look below++ no

block example:

{
    "message_id": 1,
    "header": {"k1":"v1"},
    "marshal_name": "json",
    "stream": "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0="
}

example request:

{
    "marshal_name": "json",
    "stream": "AAAANgAAAAEAAAAQAAAAGnsiYXBpIjoiL3BhY2svIn17InVzZXJuYW1lIjoiaGVsbG8sIHRjcHgifQ=="
}

example response:

{
    "message": "success",
    "blocks": [
      {
        "message_id": 1,
        "header": {
          "k1": "v1"
        },
        "marshal_name": "json",
        "stream": "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0="
      }
    ]
}

to payload:

"eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0="   ---json-->  {"username": "hello, tcpx"}