Teleport

Teleport是一个通用、高效、灵活的Socket框架。

可用于Peer-Peer对等通信、RPC、长连接网关、微服务、推送服务,游戏服务等领域。

Teleport-Framework

性能测试

测试用例

  • 一个服务端与一个客户端进程,在同一台机器上运行
  • CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
  • Memory: 16G
  • OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
  • Go: 1.9.2
  • 信息大小: 581 bytes
  • 信息编码:protobuf
  • 发送 1000000 条信息

测试结果

  • teleport
并发client 平均值(ms) 中位数(ms) 最大值(ms) 最小值(ms) 吞吐率(TPS)
100 1 0 16 0 75505
500 9 11 97 0 52192
1000 19 24 187 0 50040
2000 39 54 409 0 42551
5000 96 128 1148 0 46367
  • teleport/socket
并发client 平均值(ms) 中位数(ms) 最大值(ms) 最小值(ms) 吞吐率(TPS)
100 0 0 14 0 225682
500 2 1 24 0 212630
1000 4 3 51 0 180733
2000 8 6 64 0 183351
5000 21 18 651 0 133886
  • CPU耗时火焰图 teleport/socket

tp_socket_profile_torch

  • 堆栈信息火焰图 teleport/socket

tp_socket_heap_torch

版本

版本 状态 分支
v3 release
v2 release
v1 release

安装

go get -u -f github.com/henrylee2cn/teleport

特性

HeaderBodyHeaderBodyJSONProtobufstringtcptcp4tcp6unixunixpacket

代码示例

server.go

package main

import (
    "fmt"
    "time"

    tp "github.com/henrylee2cn/teleport"
)

func main() {
    srv := tp.NewPeer(tp.PeerConfig{
        CountTime:     true,
        ListenAddress: ":9090",
    })
    srv.RoutePull(new(math))
    srv.ListenAndServe()
}

type math struct {
    tp.PullCtx
}

func (m *math) Add(args *[]int) (int, *tp.Rerror) {
    if m.Query().Get("push_status") == "yes" {
        m.Session().Push(
            "/push/status",
            fmt.Sprintf("%d numbers are being added...", len(*args)),
        )
        time.Sleep(time.Millisecond * 10)
    }
    var r int
    for _, a := range *args {
        r += a
    }
    return r, nil
}

client.go

package main

import (
    tp "github.com/henrylee2cn/teleport"
)

func main() {
    tp.SetLoggerLevel("ERROR")
    cli := tp.NewPeer(tp.PeerConfig{})
    defer cli.Close()
    cli.RoutePush(new(push))
    sess, err := cli.Dial(":9090")
    if err != nil {
        tp.Fatalf("%v", err)
    }

    var reply int
    rerr := sess.Pull("/math/add?push_status=yes",
        []int{1, 2, 3, 4, 5},
        &reply,
    ).Rerror()

    if rerr != nil {
        tp.Fatalf("%v", rerr)
    }
    tp.Printf("reply: %d", reply)
}

type push struct {
    tp.PushCtx
}

func (p *push) Status(args *string) *tp.Rerror {
    tp.Printf("server status: %s", *args)
    return nil
}

框架设计

名称解释

Packet.Body

数据包内容

每个数据包的内容如下:

// in .../teleport/socket package

// Packet a socket data packet.
type Packet struct {
    // Has unexported fields.
}

func GetPacket(settings ...PacketSetting) *Packet
func NewPacket(settings ...PacketSetting) *Packet
func (p *Packet) Body() interface{}
func (p *Packet) BodyCodec() byte
func (p *Packet) Context() context.Context
func (p *Packet) MarshalBody() ([]byte, error)
func (p *Packet) Meta() *utils.Args
func (p *Packet) Ptype() byte
func (p *Packet) Reset(settings ...PacketSetting)
func (p *Packet) Seq() string
func (p *Packet) SetBody(body interface{})
func (p *Packet) SetBodyCodec(bodyCodec byte)
func (p *Packet) SetNewBody(newBodyFunc NewBodyFunc)
func (p *Packet) SetPtype(ptype byte)
func (p *Packet) SetSeq(seq string)
func (p *Packet) SetSize(size uint32) error
func (p *Packet) SetUri(uri string)
func (p *Packet) SetUriObject(uriObject *url.URL)
func (p *Packet) Size() uint32
func (p *Packet) String() string
func (p *Packet) UnmarshalBody(bodyBytes []byte) error
func (p *Packet) Uri() string
func (p *Packet) UriObject() *url.URL
func (p *Packet) XferPipe() *xfer.XferPipe

// NewBodyFunc creates a new body by header.
type NewBodyFunc func(Header) interface{}

编解码器

数据包中Body内容的编解码器。

type Codec interface {
    // Id returns codec id.
    Id() byte
    // Name returns codec name.
    Name() string
    // Marshal returns the encoding of v.
    Marshal(v interface{}) ([]byte, error)
    // Unmarshal parses the encoded data and stores the result
    // in the value pointed to by v.
    Unmarshal(data []byte, v interface{}) error
}

过滤管道

传输数据的过滤管道。

// XferFilter handles byte stream of packet when transfer.
type XferFilter interface {
    // Id returns transfer filter id.
    Id() byte
    // Name returns transfer filter name.
    Name() string
    // OnPack performs filtering on packing.
    OnPack([]byte) ([]byte, error)
    // OnUnpack performs filtering on unpacking.
    OnUnpack([]byte) ([]byte, error)
}
// Get returns transfer filter by id.
func Get(id byte) (XferFilter, error)
// GetByName returns transfer filter by name.
func GetByName(name string) (XferFilter, error)

// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// Note: the length can not be bigger than 255!
type XferPipe struct {
    // Has unexported fields.
}
func NewXferPipe() *XferPipe
func (x *XferPipe) Append(filterId ...byte) error
func (x *XferPipe) AppendFrom(src *XferPipe)
func (x *XferPipe) Ids() []byte
func (x *XferPipe) Len() int
func (x *XferPipe) Names() []string
func (x *XferPipe) OnPack(data []byte) ([]byte, error)
func (x *XferPipe) OnUnpack(data []byte) ([]byte, error)
func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool)
func (x *XferPipe) Reset()

插件

运行过程中以挂载方式执行的插件。

type (
    // Plugin plugin background
    Plugin interface {
        Name() string
    }
    // PreNewPeerPlugin is executed before creating peer.
    PreNewPeerPlugin interface {
        Plugin
        PreNewPeer(*PeerConfig, *PluginContainer) error
    }
    ...
)

通信协议

支持通过接口定制自己的通信协议:

type (
    // Proto pack/unpack protocol scheme of socket packet.
    Proto interface {
        // Version returns the protocol's id and name.
        Version() (byte, string)
        // Pack writes the Packet into the connection.
        // Note: Make sure to write only once or there will be package contamination!
        Pack(*Packet) error
        // Unpack reads bytes from the connection to the Packet.
        // Note: Concurrent unsafe!
        Unpack(*Packet) error
    }
    ProtoFunc func(io.ReadWriter) Proto
)

接着,你可以使用以下任意方式指定自己的通信协议:

func SetDefaultProtoFunc(socket.ProtoFunc)
type Peer interface {
    ...
    ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session
    DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
    Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
    Listen(protoFunc ...socket.ProtoFunc) error
    ...
}
FastProto
{4 bytes packet length}
{1 byte protocol version}
{1 bytes transfer pipe length}
{transfer pipe IDs}
# The following is handled data by transfer pipe
{4 bytes sequence length}
{sequence}
{1 byte packet type}
{4 bytes URI length}
{URI}
{4 bytes metadata length}
{metadata(urlencoded)}
{1 byte body codec id}
{body}

用法

Peer端点(服务端或客户端)示例

// Start a server
var peer1 = tp.NewPeer(tp.PeerConfig{
    ListenAddress: "0.0.0.0:9090", // for server role
})
peer1.Listen()

...

// Start a client
var peer2 = tp.NewPeer(tp.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")

Pull-Controller-Struct 接口模板

type Aaa struct {
    tp.PullCtx
}
func (x *Aaa) XxZz(args *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
  • 注册到根路由:
// register the pull route: /aaa/xx_zz
peer.RoutePull(new(Aaa))

// or register the pull route: /xx_zz
peer.RoutePullFunc((*Aaa).XxZz)

Pull-Handler-Function 接口模板

func XxZz(ctx tp.PullCtx, args *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
  • 注册到根路由:
// register the pull route: /xx_zz
peer.RoutePullFunc(XxZz)

Push-Controller-Struct 接口模板

type Bbb struct {
    tp.PushCtx
}
func (b *Bbb) YyZz(args *<T>) *tp.Rerror {
    ...
    return nil
}
  • 注册到根路由:
// register the push route: /bbb/yy_zz
peer.RoutePush(new(Bbb))

// or register the push route: /yy_zz
peer.RoutePushFunc((*Bbb).YyZz)

Push-Handler-Function 接口模板

// YyZz register the route: /yy_zz
func YyZz(ctx tp.PushCtx, args *<T>) *tp.Rerror {
    ...
    return nil
}
  • 注册到根路由:
// register the push route: /yy_zz
peer.RoutePushFunc(YyZz)

Unknown-Pull-Handler-Function 接口模板

func XxxUnknownPull (ctx tp.UnknownPullCtx) (interface{}, *tp.Rerror) {
    ...
    return r, nil
}
  • 注册到根路由:
// register the unknown pull route: /*
peer.SetUnknownPull(XxxUnknownPull)

Unknown-Push-Handler-Function 接口模板

func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror {
    ...
    return nil
}
  • 注册到根路由:
// register the unknown push route: /*
peer.SetUnknownPush(XxxUnknownPush)

结构体(函数)名称映射到URI路径的规则:

AaBb/aa_bbAa_Bb/aa/bbaa_bb/aa/bbAa__Bb/aa_bbaa__bb/aa_bbABC_XYZ/abc/xyzABcXYz/abc_xyzABC__XYZ/abc_xyz

插件示例

// NewIgnoreCase Returns a ignoreCase plugin.
func NewIgnoreCase() *ignoreCase {
    return &ignoreCase{}
}

type ignoreCase struct{}

var (
    _ tp.PostReadPullHeaderPlugin = new(ignoreCase)
    _ tp.PostReadPushHeaderPlugin = new(ignoreCase)
)

func (i *ignoreCase) Name() string {
    return "ignoreCase"
}

func (i *ignoreCase) PostReadPullHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}

func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}

注册以上操作和插件示例到路由

// add router group
group := peer.SubRoute("test")
// register to test group
group.RoutePull(new(Aaa), NewIgnoreCase())
peer.RoutePullFunc(XxZz, NewIgnoreCase())
group.RoutePush(new(Bbb))
peer.RoutePushFunc(YyZz)
peer.SetUnknownPull(XxxUnknownPull)
peer.SetUnknownPush(XxxUnknownPush)

配置信息

type PeerConfig struct {
    Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
    ListenAddress      string        `yaml:"listen_address"       ini:"listen_address"       comment:"Listen address; for server role"`
    DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"`
    RedialTimes        int32         `yaml:"redial_times"         ini:"redial_times"         comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"`
    DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
    DefaultSessionAge  time.Duration `yaml:"default_session_age"  ini:"default_session_age"  comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    DefaultContextAge  time.Duration `yaml:"default_context_age"  ini:"default_context_age"  comment:"Default PULL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
    PrintDetail        bool          `yaml:"print_detail"         ini:"print_detail"         comment:"Is print body and metadata or not"`
    CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
}

通信优化

func SetPacketSizeLimit(maxPacketSize uint32)
func SetSocketKeepAlive(keepalive bool)
func SetSocketKeepAlivePeriod(d time.Duration)
func SetSocketNoDelay(_noDelay bool)
func SetSocketReadBuffer(bytes int)
func SetSocketWriteBuffer(bytes int)

扩展包

编解码器

import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"

插件

import "github.com/henrylee2cn/teleport/plugin"import binder "github.com/henrylee2cn/tp-ext/plugin-binder"import heartbeat "github.com/henrylee2cn/tp-ext/plugin-heartbeat"import "github.com/henrylee2cn/teleport/plugin"import secure "github.com/henrylee2cn/tp-ext/plugin-secure"

协议

import "github.com/henrylee2cn/teleport/socketimport jsonproto "github.com/henrylee2cn/tp-ext/proto-jsonproto"import pbproto "github.com/henrylee2cn/tp-ext/proto-pbproto"

传输过滤器

import "github.com/henrylee2cn/teleport/xfer"import md5Hash "github.com/henrylee2cn/tp-ext/xfer-md5Hash"

其他模块

import cliSession "github.com/henrylee2cn/tp-ext/mod-cliSession"import websocket "github.com/henrylee2cn/tp-ext/mod-websocket"html "github.com/xiaoenai/ants/helper/mod-html"

基于Teleport的项目

project description
TP-Micro 是一个基于 Teleport 定制的、简约而强大的微服务框架
Ants 是一套基于 TP-Micro 和 Teleport 的、高可用的微服务平台解决方案
Pholcus(幽灵蛛)是一款纯Go语言编写的支持分布式的高并发、重量级爬虫软件,定位于互联网数据采集,为具备一定Go或JS编程基础的人提供一个只需关注规则定制的功能强大的爬虫工具

企业用户

深圳市梦之舵信息技术有限公司    北京风行在线技术有限公司    北京可即时代网络公司

开源协议

Teleport 项目采用商业应用友好的 Apache2.0 协议发布