Teleport是一个通用、高效、灵活的Socket框架。
可用于Peer-Peer对等通信、RPC、长连接网关、微服务、推送服务,游戏服务等领域。
性能测试
测试用例
- 一个服务端与一个客户端进程,在同一台机器上运行
- CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
- Memory: 16G
- OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
- Go: 1.9.2
- 信息大小: 581 bytes
- 信息编码:protobuf
- 发送 1000000 条信息
测试结果
- teleport
并发client | 平均值(ms) | 中位数(ms) | 最大值(ms) | 最小值(ms) | 吞吐率(TPS) |
---|---|---|---|---|---|
100 | 1 | 0 | 16 | 0 | 75505 |
500 | 9 | 11 | 97 | 0 | 52192 |
1000 | 19 | 24 | 187 | 0 | 50040 |
2000 | 39 | 54 | 409 | 0 | 42551 |
5000 | 96 | 128 | 1148 | 0 | 46367 |
- teleport/socket
并发client | 平均值(ms) | 中位数(ms) | 最大值(ms) | 最小值(ms) | 吞吐率(TPS) |
---|---|---|---|---|---|
100 | 0 | 0 | 14 | 0 | 225682 |
500 | 2 | 1 | 24 | 0 | 212630 |
1000 | 4 | 3 | 51 | 0 | 180733 |
2000 | 8 | 6 | 64 | 0 | 183351 |
5000 | 21 | 18 | 651 | 0 | 133886 |
- CPU耗时火焰图 teleport/socket
- 堆栈信息火焰图 teleport/socket
版本
版本 | 状态 | 分支 |
---|---|---|
v3 | release | |
v2 | release | |
v1 | release |
安装
go get -u -f github.com/henrylee2cn/teleport
特性
HeaderBodyHeaderBodyJSONProtobufstringtcptcp4tcp6unixunixpacket
代码示例
server.go
package main import ( "fmt" "time" tp "github.com/henrylee2cn/teleport" ) func main() { srv := tp.NewPeer(tp.PeerConfig{ CountTime: true, ListenAddress: ":9090", }) srv.RoutePull(new(math)) srv.ListenAndServe() } type math struct { tp.PullCtx } func (m *math) Add(args *[]int) (int, *tp.Rerror) { if m.Query().Get("push_status") == "yes" { m.Session().Push( "/push/status", fmt.Sprintf("%d numbers are being added...", len(*args)), ) time.Sleep(time.Millisecond * 10) } var r int for _, a := range *args { r += a } return r, nil }
client.go
package main import ( tp "github.com/henrylee2cn/teleport" ) func main() { tp.SetLoggerLevel("ERROR") cli := tp.NewPeer(tp.PeerConfig{}) defer cli.Close() cli.RoutePush(new(push)) sess, err := cli.Dial(":9090") if err != nil { tp.Fatalf("%v", err) } var reply int rerr := sess.Pull("/math/add?push_status=yes", []int{1, 2, 3, 4, 5}, &reply, ).Rerror() if rerr != nil { tp.Fatalf("%v", rerr) } tp.Printf("reply: %d", reply) } type push struct { tp.PushCtx } func (p *push) Status(args *string) *tp.Rerror { tp.Printf("server status: %s", *args) return nil }
框架设计
名称解释
Packet.Body
数据包内容
每个数据包的内容如下:
// in .../teleport/socket package // Packet a socket data packet. type Packet struct { // Has unexported fields. } func GetPacket(settings ...PacketSetting) *Packet func NewPacket(settings ...PacketSetting) *Packet func (p *Packet) Body() interface{} func (p *Packet) BodyCodec() byte func (p *Packet) Context() context.Context func (p *Packet) MarshalBody() ([]byte, error) func (p *Packet) Meta() *utils.Args func (p *Packet) Ptype() byte func (p *Packet) Reset(settings ...PacketSetting) func (p *Packet) Seq() string func (p *Packet) SetBody(body interface{}) func (p *Packet) SetBodyCodec(bodyCodec byte) func (p *Packet) SetNewBody(newBodyFunc NewBodyFunc) func (p *Packet) SetPtype(ptype byte) func (p *Packet) SetSeq(seq string) func (p *Packet) SetSize(size uint32) error func (p *Packet) SetUri(uri string) func (p *Packet) SetUriObject(uriObject *url.URL) func (p *Packet) Size() uint32 func (p *Packet) String() string func (p *Packet) UnmarshalBody(bodyBytes []byte) error func (p *Packet) Uri() string func (p *Packet) UriObject() *url.URL func (p *Packet) XferPipe() *xfer.XferPipe // NewBodyFunc creates a new body by header. type NewBodyFunc func(Header) interface{}
编解码器
数据包中Body内容的编解码器。
type Codec interface { // Id returns codec id. Id() byte // Name returns codec name. Name() string // Marshal returns the encoding of v. Marshal(v interface{}) ([]byte, error) // Unmarshal parses the encoded data and stores the result // in the value pointed to by v. Unmarshal(data []byte, v interface{}) error }
过滤管道
传输数据的过滤管道。
// XferFilter handles byte stream of packet when transfer. type XferFilter interface { // Id returns transfer filter id. Id() byte // Name returns transfer filter name. Name() string // OnPack performs filtering on packing. OnPack([]byte) ([]byte, error) // OnUnpack performs filtering on unpacking. OnUnpack([]byte) ([]byte, error) } // Get returns transfer filter by id. func Get(id byte) (XferFilter, error) // GetByName returns transfer filter by name. func GetByName(name string) (XferFilter, error) // XferPipe transfer filter pipe, handlers from outer-most to inner-most. // Note: the length can not be bigger than 255! type XferPipe struct { // Has unexported fields. } func NewXferPipe() *XferPipe func (x *XferPipe) Append(filterId ...byte) error func (x *XferPipe) AppendFrom(src *XferPipe) func (x *XferPipe) Ids() []byte func (x *XferPipe) Len() int func (x *XferPipe) Names() []string func (x *XferPipe) OnPack(data []byte) ([]byte, error) func (x *XferPipe) OnUnpack(data []byte) ([]byte, error) func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool) func (x *XferPipe) Reset()
插件
运行过程中以挂载方式执行的插件。
type ( // Plugin plugin background Plugin interface { Name() string } // PreNewPeerPlugin is executed before creating peer. PreNewPeerPlugin interface { Plugin PreNewPeer(*PeerConfig, *PluginContainer) error } ... )
通信协议
支持通过接口定制自己的通信协议:
type ( // Proto pack/unpack protocol scheme of socket packet. Proto interface { // Version returns the protocol's id and name. Version() (byte, string) // Pack writes the Packet into the connection. // Note: Make sure to write only once or there will be package contamination! Pack(*Packet) error // Unpack reads bytes from the connection to the Packet. // Note: Concurrent unsafe! Unpack(*Packet) error } ProtoFunc func(io.ReadWriter) Proto )
接着,你可以使用以下任意方式指定自己的通信协议:
func SetDefaultProtoFunc(socket.ProtoFunc) type Peer interface { ... ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror) Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror) Listen(protoFunc ...socket.ProtoFunc) error ... }
FastProto
{4 bytes packet length} {1 byte protocol version} {1 bytes transfer pipe length} {transfer pipe IDs} # The following is handled data by transfer pipe {4 bytes sequence length} {sequence} {1 byte packet type} {4 bytes URI length} {URI} {4 bytes metadata length} {metadata(urlencoded)} {1 byte body codec id} {body}
用法
Peer端点(服务端或客户端)示例
// Start a server var peer1 = tp.NewPeer(tp.PeerConfig{ ListenAddress: "0.0.0.0:9090", // for server role }) peer1.Listen() ... // Start a client var peer2 = tp.NewPeer(tp.PeerConfig{}) var sess, err = peer2.Dial("127.0.0.1:8080")
Pull-Controller-Struct 接口模板
type Aaa struct { tp.PullCtx } func (x *Aaa) XxZz(args *<T>) (<T>, *tp.Rerror) { ... return r, nil }
- 注册到根路由:
// register the pull route: /aaa/xx_zz peer.RoutePull(new(Aaa)) // or register the pull route: /xx_zz peer.RoutePullFunc((*Aaa).XxZz)
Pull-Handler-Function 接口模板
func XxZz(ctx tp.PullCtx, args *<T>) (<T>, *tp.Rerror) { ... return r, nil }
- 注册到根路由:
// register the pull route: /xx_zz peer.RoutePullFunc(XxZz)
Push-Controller-Struct 接口模板
type Bbb struct { tp.PushCtx } func (b *Bbb) YyZz(args *<T>) *tp.Rerror { ... return nil }
- 注册到根路由:
// register the push route: /bbb/yy_zz peer.RoutePush(new(Bbb)) // or register the push route: /yy_zz peer.RoutePushFunc((*Bbb).YyZz)
Push-Handler-Function 接口模板
// YyZz register the route: /yy_zz func YyZz(ctx tp.PushCtx, args *<T>) *tp.Rerror { ... return nil }
- 注册到根路由:
// register the push route: /yy_zz peer.RoutePushFunc(YyZz)
Unknown-Pull-Handler-Function 接口模板
func XxxUnknownPull (ctx tp.UnknownPullCtx) (interface{}, *tp.Rerror) { ... return r, nil }
- 注册到根路由:
// register the unknown pull route: /* peer.SetUnknownPull(XxxUnknownPull)
Unknown-Push-Handler-Function 接口模板
func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror { ... return nil }
- 注册到根路由:
// register the unknown push route: /* peer.SetUnknownPush(XxxUnknownPush)
结构体(函数)名称映射到URI路径的规则:
AaBb/aa_bbAa_Bb/aa/bbaa_bb/aa/bbAa__Bb/aa_bbaa__bb/aa_bbABC_XYZ/abc/xyzABcXYz/abc_xyzABC__XYZ/abc_xyz
插件示例
// NewIgnoreCase Returns a ignoreCase plugin. func NewIgnoreCase() *ignoreCase { return &ignoreCase{} } type ignoreCase struct{} var ( _ tp.PostReadPullHeaderPlugin = new(ignoreCase) _ tp.PostReadPushHeaderPlugin = new(ignoreCase) ) func (i *ignoreCase) Name() string { return "ignoreCase" } func (i *ignoreCase) PostReadPullHeader(ctx tp.ReadCtx) *tp.Rerror { // Dynamic transformation path is lowercase ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path) return nil } func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror { // Dynamic transformation path is lowercase ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path) return nil }
注册以上操作和插件示例到路由
// add router group group := peer.SubRoute("test") // register to test group group.RoutePull(new(Aaa), NewIgnoreCase()) peer.RoutePullFunc(XxZz, NewIgnoreCase()) group.RoutePush(new(Bbb)) peer.RoutePushFunc(YyZz) peer.SetUnknownPull(XxxUnknownPull) peer.SetUnknownPush(XxxUnknownPush)
配置信息
type PeerConfig struct { Network string `yaml:"network" ini:"network" comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"` ListenAddress string `yaml:"listen_address" ini:"listen_address" comment:"Listen address; for server role"` DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"` RedialTimes int32 `yaml:"redial_times" ini:"redial_times" comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"` DefaultBodyCodec string `yaml:"default_body_codec" ini:"default_body_codec" comment:"Default body codec type id"` DefaultSessionAge time.Duration `yaml:"default_session_age" ini:"default_session_age" comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"` DefaultContextAge time.Duration `yaml:"default_context_age" ini:"default_context_age" comment:"Default PULL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"` SlowCometDuration time.Duration `yaml:"slow_comet_duration" ini:"slow_comet_duration" comment:"Slow operation alarm threshold; ns,µs,ms,s ..."` PrintDetail bool `yaml:"print_detail" ini:"print_detail" comment:"Is print body and metadata or not"` CountTime bool `yaml:"count_time" ini:"count_time" comment:"Is count cost time or not"` }
通信优化
func SetPacketSizeLimit(maxPacketSize uint32)
func SetSocketKeepAlive(keepalive bool)
func SetSocketKeepAlivePeriod(d time.Duration)
func SetSocketNoDelay(_noDelay bool)
func SetSocketReadBuffer(bytes int)
func SetSocketWriteBuffer(bytes int)
扩展包
编解码器
import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"
插件
import "github.com/henrylee2cn/teleport/plugin"import binder "github.com/henrylee2cn/tp-ext/plugin-binder"import heartbeat "github.com/henrylee2cn/tp-ext/plugin-heartbeat"import "github.com/henrylee2cn/teleport/plugin"import secure "github.com/henrylee2cn/tp-ext/plugin-secure"
协议
import "github.com/henrylee2cn/teleport/socketimport jsonproto "github.com/henrylee2cn/tp-ext/proto-jsonproto"import pbproto "github.com/henrylee2cn/tp-ext/proto-pbproto"
传输过滤器
import "github.com/henrylee2cn/teleport/xfer"import md5Hash "github.com/henrylee2cn/tp-ext/xfer-md5Hash"
其他模块
import cliSession "github.com/henrylee2cn/tp-ext/mod-cliSession"import websocket "github.com/henrylee2cn/tp-ext/mod-websocket"html "github.com/xiaoenai/ants/helper/mod-html"
基于Teleport的项目
project | description |
---|---|
TP-Micro 是一个基于 Teleport 定制的、简约而强大的微服务框架 | |
Ants 是一套基于 TP-Micro 和 Teleport 的、高可用的微服务平台解决方案 | |
Pholcus(幽灵蛛)是一款纯Go语言编写的支持分布式的高并发、重量级爬虫软件,定位于互联网数据采集,为具备一定Go或JS编程基础的人提供一个只需关注规则定制的功能强大的爬虫工具 |
企业用户
开源协议
Teleport 项目采用商业应用友好的 Apache2.0 协议发布