kiss-rpc简介:
特性:模拟堆栈式调用方式,支持多值返回,调用简单安全, 服务器采用多线程异步模式,挖掘服务器性能。客户端支持多线程同步和异步模式,超时机制,linux下支持 epoll网络模型,类比grpc,thrift,dubbo快几倍甚至 几十倍。

环境: linux, unix, windows, macOS
传输协议:capnproto
开发语言:dlang
编译器: dmd
github:https://github.com/huntlabs/kiss-rpc

开发者笔记:开发笔记

kiss rpc 同步和异步测试:

环境:ubuntu 16.04 lts(64位)

硬件:xeon cpu e3-1230@3.3GHz x 8

内存:8G

网络:localhost(本地环回)

1.多线程异步非阻塞测试

单链接20万次rpc调用耗时为4秒,每秒的qps数量为5万左右:

1000个并发1000次调用,100万次rpc请求,总共耗时28秒。每秒qps约为3.5万次左右:

2.

多线程同步阻塞测试:

单链接100

万次rpc

调用耗时为53
秒,每秒qps

数量为1.8

万次左右:

1000

个连接1000

调用,总计100

万次rpc

调用,耗时46

秒,每秒qps

为2.1

万次:


海量互联网业务系统只能依赖分布式架构来解决,而分布式开发的基石则是RPC;本文主要针对两个开源的RPC框架(gRPC、 Apache Thrift),以及配合GoLang、C++两个开发语言进行性能对比分析。

测试场景client, server都是单进程,长连接,在单次连接内发起1w(5w)次rpc调用,计算耗时;
client, server都是单进程,短连接,共发起1w(5w)次连接,每次连接单次RPC调用,计算耗时;
并发4个client进程,每个进程长连接10w rpc,服务端单进程多线程(协程),计算耗时;

由于不同语言,耗时统计存在偏差,比如boost.timer在程序里计算出来的耗时明显偏小,所以统一使用linux命令time来计算耗时;

测试数据和分析
一、 单进程下,长短连接,两个RPC框架和两大语言对比

小结:

整体上看,长连接性能优于短连接,性能差距在两倍以上;
对比Go语言的两个RPC框架,Thrift性能明显优于gRPC,性能差距也在两倍以上;
对比Thrift框架下的的两种语言,长连接下Go 与C++的RPC性能基本在同一个量级,在短连接下,Go性能大概是C++的二倍;
对比Thrift&C++下的TSimpleServer与TNonblockingServer,在单进程客户端长连接的场景下,TNonblockingServer因为存在线程管理开销,性能较TSimpleServer差一些;但在短连接时,主要开销在连接建立上,线程池管理开销可忽略;
两套RPC框架,以及两大语言运行都非常稳定,5w次请求耗时约是1w次的5倍;

二、 多进程(线程,协程)下,两大RPC框架和两大语言对比

编写rpc接口时候,客户端的rpc文件和服务端rpc文件必须保持一致,务必保持对应的文件目录结构,文件名,类名,函数名,否则出现调用出错。

服务端接口:
1.网络事件触发模块,:interface server_socket_event_interface //服务端网络事件触发接口

{

void listen_failed(const string str); //监听失败

void inconming(rpc_socket_base_interface socket); //连接进入

void disconnectd(rpc_socket_base_interface socket); //连接断开

void write_failed(rpc_socket_base_interface socket); //写入失败

void read_failed(rpc_socket_base_interface socket); //读取失败

}

2.socket接口调用:

        interface rpc_socket_base_interface //socket操作

{

bool doWrite(byte[] data); //写入数据

int getFd(); //获取fd

string getIp(); //获取ip

string getPort(); //获取端口

void disconnect(); //断开连接

}

3.绑定rpc:
     rpc_server_impl!(hello) rp_impl; //绑定rpc类接口

rp_impl = new rpc_server_impl!(hello)(rp_server); //绑定对应的服务端口

rp_impl.bind_request_callback("say", &this.say); //绑定对应的rpc函数

void say(rpc_request req)

req.pop(r_s, r_num, r_i, r_d); //取出对应的调用参数,必须和调用端的参数一致

//writefln("hello.say:%s, %s, %s, num:%s,", r_s, r_i, r_d, r_num);

resp.push(r_s ~ ":server response"~ to!string(r_i), r_num, r_i+1, r_d+0.2); //压入参数返回给调用端

rp_impl.response(resp); //返回参数给调用端

}

5.启动服务端口:
auto rp_server = new rpc_server(new server_socket); //服务端口绑定对应的 socket事件

auto hello_server_test = new hello(rp_server); //rpc类绑定对应的服务端口

auto poll = new GroupPoll!(); //创建线程管理组

rp_server.listen("0.0.0.0", 4444, poll); //监听端口,并绑定线程管理组

poll.start(); //启动线程管理

poll.wait(); //等待事件触发

客户端接口:
1.网络事件模块:
interface client_socket_event_interface //客户端网络事件触发接口

{

void connectd(rpc_socket_base_interface socket); //连接成功

void disconnectd(rpc_socket_base_interface socket); //断开连接

void write_failed(rpc_socket_base_interface socket); //写入失败

void read_failed(rpc_socket_base_interface socket); //读取失败

}

2.socket接口调用:
        interface rpc_socket_base_interface //socket操作

{

bool doWrite(byte[] data); //写入数据

int getFd(); //获取fd

string getIp(); //获取ip

string getPort(); //获取端口

void disconnect(); //断开连接

}

3.绑定rpc:

rpc_client_impl!(hello) rp_impl; //绑定rpc调用的类

rp_impl = new rpc_client_impl!(hello)(rp_client); //rpc绑定socket

4.同步rpc调用:

auto req = new rpc_request; //创建一个rpc请求

req.push(s, 1, i, 0.1); //压入参数

rpc_response resp = rp_impl.sync_call(req); //同步调用服务器rpc接口

if(resp.get_status == RESPONSE_STATUS.RS_OK) //判断调用是否成功

{

string r_s;

int r_i, r_num;

double r_d;

resp.pop(r_s, r_num, r_i, r_d); //取出服务器返回的参数

//writefln("server response:%s, %s, %s", r_s, r_i, r_d);

if(r_i % 100000 == 0)

{

writefln("single connect test, sync rpc request num:%s, total time:%s", r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);

}

}

5.异步调用:
auto req = new rpc_request; //创建一个rpc请求

req.push(s, 1, i, 0.1); //压入参数

rp_impl.async_call(req, delegate(rpc_response resp){ //远程异步回调

if(resp.get_status == RESPONSE_STATUS.RS_OK) //判断调用是否成功

{

string r_s;

int r_i, r_num;

double r_d;

resp.pop(r_s, r_num, r_i, r_d);//取出服务器返回的参数

//writefln("server response:%s, %s, %s", r_s, r_i, r_d);

if(r_i%20000 == 0)

{

writefln("single connect test, rpc request num:%s, total time:%s", r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);

}

}else

{

writeln("error", resp.get_status);

}

});

6.客户端启动:
import kiss.util.Log;

load_log_conf("default.conf"); //日志配置

auto poll = new GroupPoll!(); //创建线程管理组

auto client = new client_socket; //创建客户端socket

client.connect_to_server(poll); //连接到服务器

poll.start; //启动线程管理组

poll.wait; //等待事件触发

服务端调用代码:
1.倒入头文件:
import KissRpc.unit;

import KissRpc.rpc_server;

import KissRpc.rpc_server_impl;

import KissRpc.rpc_response;

import KissRpc.rpc_socket_base_interface;

import KissRpc.rpc_request;

import kiss.event.GroupPoll;

2.监听端口:

    auto rp_server = new rpc_server(new server_socket);

auto hello_server_test = new hello(rp_server);

auto poll = new GroupPoll!();

rp_server.listen("0.0.0.0", 4444, poll);

poll.start();

poll.wait();

3.绑定rpc事件:
    class hello{

this(rpc_server rp_server)

{

rp_impl = new rpc_server_impl!(hello)(rp_server);

rp_impl.bind_request_callback("say", &this.say);

}

shared static int call_count = 0;

void say(rpc_request req)

{

auto resp = new rpc_response(req);

string r_s;

int r_i, r_num;

double r_d;

req.pop(r_s, r_num, r_i, r_d);

//writefln("hello.say:%s, %s, %s, num:%s,", r_s, r_i, r_d, r_num);

resp.push(r_s ~ ":server response"~ to!string(r_i), r_num, r_i+1, r_d+0.2);

rp_impl.response(resp);

}

rpc_server_impl!(hello) rp_impl;

}

4.socket事件:
class server_socket : server_socket_event_interface

{

void listen_failed(const string str)

{

de_writeln("server listen failed", str);

}

void disconnectd(rpc_socket_base_interface socket)

{

de_writeln("client is disconnect");

}

shared static int connect_num;

void inconming(rpc_socket_base_interface socket)

{

writefln("client inconming:%s:%s, connect num:%s", socket.getIp, socket.getPort, connect_num++);

}

void write_failed(rpc_socket_base_interface socket)

{

de_writefln("write buffer to client is failed, %s:%s", socket.getIp, socket.getPort);

}

void read_failed(rpc_socket_base_interface socket)

{

de_writefln("read buffer from client is failed, %s:%s", socket.getIp, socket.getPort);

}

}

客户端调用代码:
1.倒入头文件:
import KissRpc.rpc_request;

import KissRpc.rpc_client_impl;

import KissRpc.rpc_client;

import KissRpc.unit;

import KissRpc.rpc_response;

import KissRpc.rpc_socket_base_interface;

import kiss.event.GroupPoll;

2.连接服务器:
     import kiss.util.Log;

load_log_conf("default.conf");

auto poll = new GroupPoll!();

for(int i= 0; i< test_client; i++)

{

auto client = new client_socket(i);

client.connect_to_server(poll);

}

poll.start;

poll.wait;

3.异步调用:
 auto req = new rpc_request;

req.push(s, 1, i, 0.1);

rp_impl.async_call(req, delegate(rpc_response resp){ //异步调用接口

if(resp.get_status == RESPONSE_STATUS.RS_OK)

{

string r_s;

int r_i, r_num;

double r_d;

resp.pop(r_s, r_num, r_i, r_d);

//writefln("server response:%s, %s, %s", r_s, r_i, r_d);

if(r_i%20000 == 0)

{

writefln("single connect test, rpc request num:%s, total time:%s", r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);

}

}else

{

writeln("error", resp.get_status);

}

});

4.同步调用: auto req = new rpc_request;

req.push(s, num, i, 0.1);

rpc_response resp = rp_impl.sync_call(req); //同步调用接口

if(resp.get_status == RESPONSE_STATUS.RS_OK)

{

string r_s;

int r_num;

int r_i;

double r_d;

resp.pop(r_s, r_num, r_i, r_d);

//writefln("hello.say:%s, %s, %s, num:%s", r_s, r_i, r_d, r_num);

finish_num++;

if(r_i == test_num)

{

writefln("%s connect test, client num:%s, rpc request num:%s, total time:%s",r_num, r_num, r_i, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);

if(finish_num == test_num* test_client)

{

writefln("$$$$$$$$$$$ %s connect test, client num:%s, rpc request num:%s, total time:%s",test_client, test_client, finish_num, Clock.currStdTime().stdTimeToUnixTime!(long)()- start_clock);

}

}

}else

{

writeln("error, ", resp.get_status);

}

5.socket事件:

class client_socket : client_socket_event_interface

{

this()

{

rp_client = new rpc_client(this);

}

void connect_to_server(GroupPoll!() poll)

{

rp_client.connect("0.0.0.0", 4444, poll);

}

void connectd(rpc_socket_base_interface socket)

{

de_writefln("connect to server, %s:%s", socket.getIp, socket.getPort);

auto hello_client = new hello(rp_client);

start_clock = Clock.currStdTime().stdTimeToUnixTime!(long)();

for(int i= 0; i < test_num; ++i)

{

hello_client.say("test hello client", i);

}

}

void disconnectd(rpc_socket_base_interface socket)

{

de_writefln("client disconnect ....");

}

void write_failed(rpc_socket_base_interface socket)

{

de_writefln("client write failed , %s:%s", socket.getIp, socket.getPort);

}

void read_failed(rpc_socket_base_interface socket)

{

de_writefln("client read failed , %s:%s", socket.getIp, socket.getPort);

}

private:

rpc_client rp_client;

}