概述

因为gRPC 的 异步调用 代码写的比较绕,所以这篇文章主要用来记录一下 gRPC 的异步调用。

需要注意的是,gRPC 为了实现异步调用,使用的是 CompletionQueue 绑定进行 RPC 调用,实际写代码的时候会感觉到比较奇怪。相应的因为是异步的,所以会调用 CompletionQueue::Next 来等待回包操作。这里先留个印象,下面讲流程的时候会比较清晰。

编译安装

为了完成我们的demo,没有安装gRPC的同学可以直接使用下面的命令进行安装:

 $  export  MY_INSTALL_DIR=$HOME/.local
$ mkdir -p $MY_INSTALL_DIR
$ export PATH="$MY_INSTALL_DIR/bin:$PATH"
# 工具安装
$ sudo apt install -y cmake #安装好cmke的同学省略
$ sudo apt install -y build-essential autoconf libtool pkg-config
# 下载项目
$ git clone --recurse-submodules -b v1.45.0 --depth 1 --shallow-submodules 
# 编译安装
$ cd g rpc 
$ mkdir -p cmake/build
$ pushd cmake/build
$ cmake -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=$INSTALL_DIR ../..
$ make -j
$ make install
$ popd
  

下面介绍的例子都使用官方的:。

异步 Client

对于同步的 client 来说,由于调用远程方法时会阻塞当前 线程 ,但是异步允许同时发送多个请求,并且不会阻塞。

那么我们在使用 gRPC的异步API的时候 需要做到以下几点才行:

  1. 由于要进行异步非阻塞的请求,那么在发送请求的时候肯定不能等待回包;
  2. 所有的回包都通过另一个线程异步处理,避免主流程阻塞;
  3. 回包的数据通过某种介质进行传递,gRPC 使用的是CompletionQueue 来进行传递。

整体流程从上面图可以看得出分为以下几个:

  1. 启动client,启动一个旁路线程循环获取 CompletionQueue 数据;
  2. 发送异步调用给 server;
  3. 如果 server 回包了,会将数据放入到 CompletionQueue 里面;
  4. 异步线程获取 CompletionQueue 数据并返回;

我们下面来看一下例子。

创建

首先是创建客户端,然后异步线程处理 server 的回包,因为处理回包的时候会阻塞。

   // 创建客户端
  GreeterClient greeter(grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials()));
 
  // 起新的线程,从队列中取出结果并处理
  std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);

  for (int i = 0; i < 100; i++) {
    std::string user("world " + std::to_string(i));
    // 发送rpc请求
    greeter.SayHello(user);  // The actual RPC call!
  }

  std:: cout  << "Press control-c to quit" << std::endl << std::endl;
  // 阻塞
  thread_.join();  // blocks forever
  
异步请求

这里会通过调用 SayHello 来发送请求:

 class GreeterClient {
 public:
  explicit GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Greeter::NewStub(channel)) {}
 ...
  void  SayHello(const std::string& user) {
    Hello Request  request;
    request.set_name(user);

    // 用来存储 rpc 数据
    AsyncClientCall* call = new AsyncClientCall;
 
    // 这里是调用 Async 方法创建 RPC 对象,但是不会理吗开始进行 RPC请求
    call->response_reader =
        stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
 
    // 初始化RPC调用
    call->response_reader->StartCall();
 
    // 进行RPC请求,然后 call 对象作为一个 tag 放进去,会包数据会放到 reply中
    // 这里不会阻塞等待请求
    call->response_reader->Finish(&call->reply, &call->status, (void*) call);
  }

  private :
  ...
  CompletionQueue cq_;
};
  

因为回包逻辑不在这里,所以在调用完 Finish 之后不需要等待可以直接返回。在调用完这个方法之后就可以等待 server 回包了,server 会将回包数据塞入到 cq_ 里面。

异步处理回包

异步处理回包的逻辑就在我们一开始创建的 thread 里面会循环调用。

   void AsyncCompleteRpc() {
    void* got_tag;
     bool  ok = false;
 
    // 从队列里面取出回调信息,没有如果还没回包的话会阻塞
    while (cq_.Next(&got_tag, &ok)) {
      
      Async Client Call* call = static_cast<AsyncClientCall*>(got_tag);
 
      GPR_ASSERT(ok);

      if (call->status.ok())
        // 获取到回包的数据
        std::cout << "Greeter received: " << call->reply.message() << std::endl;
      else
        std::cout << "RPC failed" << std::endl;
 
      // 销毁new的对象
      delete call;
    }
  }
  

这里会一直循环调用 Next 方法处理 server 的 response,如果没有回包会一直阻塞,所以这里需要新起一个线程处理,避免阻塞主流程。

相关视频推荐

需要C/C++ Linux服务器架构师学习资料加qun 812855908 获取(资料包括 C/C++,Linux,golang技术, Nginx ,ZeroMQ, MySQL , Redis ,fastdfs, MongoDB , ZK ,流媒体, CDN ,P2P,K8S, Docker ,TCP/IP,协程,DPDK, ffmpeg 等),免费分享

异步 Server

一般我们在写 Server 端的时候如果是同步操作的话,在收到请求之后立即处理,然后给 client 回包,这个回包的过程中需要等待回包完毕整个 RPC 请求才算结束,这里就存在阻塞等待的过程,而使用异步请求就是用来避免阻塞,可以在单个线程里面处理更多的信息量。

但是在理解 gRPC 异步 API 的时候还是会感觉非常的别扭,导致一开始看不太明白实例代码的含义。

首先,gRPC让你像流水线操作一样,要先准备一个 CallData 对象作为一个容器,然后 gRPC 会通过 ServerCompletionQueue 将各种事件发送到 CallData 对象中,让这个对象根据自身的状态进行处理。

然后处理完毕当前的事件之后还需要手动再创建一个 CallData 对象,这个对象是为下个 Client 请求准备的,整个过程就像流水线一样。

上面这个异步的过程还有一个小状态机在里面,全部都由 CallData 对象进行扭转:

  • CallData 对象刚创建的时候会从 CREATE 状态扭转为 PROCESS 状态,表示等待接收请求;
  • 请求过来之后,首先会创建一个 CallData 对象,然后处理完后扭转为 FINISH 状态,等待给 Client 回包结束;
  • 回包结束之后将 CallData 对象自身删除。

清楚了这个CallData 对象是用来做什么之后,下面我们来看一下整个 Server 的流程应该如下:

  1. Server 启动,注册,创建一个 CallData 对象,这个对象用来给下个 Client 请求准备的;
  2. 创建好的 CallData 对象会交给 gRPC 托管,有事件过来的时候会将事件放入到 CallData 对象对,然后以 ServerCompletionQueue 对象来进行通知;
  3. 等待 Client 请求过来…
  4. 有事件过来之后会从 ServerCompletionQueue 对象中解包出来数据,转为 CallData 对象调用 Proceed 方法,然后进行业务逻辑处理,并重新创建 CallData 对象,用来给下个 Client 请求准备的;
  5. 等待给 Client 的回包结束。。。
  6. 继续处理 ServerCompletionQueue 回过来的 event 事件,清理自身 CallData 对象。

由于这个图我也不知道怎么画了,算了摆烂,不画了,自己脑补ba~

下面看看代码。

启动注册
   void Run() {
    // 启动注册监听
    std::string server_address("0.0.0.0:50051"); 
    ServerBuilder builder; 
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); 
    builder.RegisterService(&service_); 
    cq_ = builder.AddCompletionQueue(); 
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;
    // 处理RPC
    HandleRpcs();
  }
  
启动主流程
   std::unique_ptr<ServerCompletionQueue> cq_;
 void HandleRpcs() { 
    // 这里其实是异步逻辑的处理地方
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) { 
      // Next方法会阻塞,直到有下个请求过来,才会继续往下走
      GPR_ASSERT(cq_->Next(&tag, &ok));
      // 必须检查 Next 的返回值,这个返回值告诉我们是有事件到来
      // 还是 cq_ 正在关闭。
      GPR_ASSERT(ok);
      // 转成 CallData 调用 Proceed
      static_cast<CallData*>(tag)->Proceed();
    }
  }
  

主流程这里会创建 CallData 对象,然后不断循环从 cq_ 对象里面获取事件,这个 cq_ 就是一个等待队列,没有事件过来的时候会一直阻塞。有事件过来之后会从 cq_ 里面取出 tag 转换成 CallData对象调用 Proceed 方法。

创建 CallData & 逻辑处理 & 完成
 class ServerImpl final {
 ...
 private: 
  class CallData {
   public: 
    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq),  responder _(&ctx_), status_(CREATE) { 
      Proceed();
    }

    void Proceed() {
      if (status_ == CREATE) { 
        // 首次进入,改变状态到 PROCESS
        status_ = PROCESS; 
        // 需要注意的是,这里将 this 作为 tag 塞入到请求中作为唯一识别请求
        service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
      } else if (status_ == PROCESS) { 
        // 在我们处理这个请求之前,创建一个新的 CallData 实例用于处理未来
        // 的新请求。实例会在它的 FINISH 状态流程中释放自己占用的内存
        new CallData(service_, cq_); 
        // 实际的业务逻辑了
        std:: string  prefix("Hello ");
        // 从request_中获取client请求数据,并设置回包数据
        reply_.set_message(prefix + request_.name()); 
        // 业务处理完毕,让 gRPC 运行时知道我们已经完成了,使用这个实例的内存
        // 地址作为事件内唯一识别请求的 tag
        status_ = FINISH;
        responder_.Finish(reply_, Status::OK, this);
      } else {
        GPR_ASSERT(status_ == FINISH); 
        // 已经到达 FINISH 状态,释放自身占用内存(CallData)
        delete this;
      }
    }

   private: 
    Greeter::AsyncService* service_; 
    // 生产-消费队列,用来异步服务消息通知
    ServerCompletionQueue* cq_; 
    // RPC 的上下文信息,用于例如压缩、 鉴权 以及发送元数据给客户等用途。
    ServerContext ctx_; 
    // 从客户端接受到了什么
    HelloRequest request_; 
    // 从客户端返回什么
    HelloReply reply_; 
    // 用于回复客户端的方法
    ServerAsyncResponseWriter<HelloReply> responder_; 
    // 状态机
     enum  CallStatus { CREATE, PROCESS, FINISH };
    CallStatus status_;  // The current serving state.
  };
  ...
};
  

从这里我们就可以和上面的 HandleRpcs 方法联系起来。

  1. 首先是 new CallData 会直接调用 Proceed 方法,这个时候会进入到 if 的第一个分支,然后将自身 this 写入到 cq_ 中,并将状态扭转为 PROCESS;
  2. 这个时候会继续会到 HandleRpcs 方法的 while 循环中等待事件;
  3. 当有 Client 发送请求过来时会进入到 Proceed 的 if 第二个分支进行业务逻辑的处理;
    1. 这里首先会 new CallData 给下一个请求使用;
    2. 然后从 request_ 获取到 Client 请求参数并进行处理;
    3. 回包数据写入到 reply_ 中,最后调用 Finish 结束;
  4. 这个时候会继续会到 HandleRpcs 方法的 while 循环中等待给 Client 的 response 回包结束;
  5. 收到回包结束之后会继续调用到 Proceed 方法的 if 第三个分支,删除当前对象。
总结

其实对比 go 的 grpc 的异步 API 来看,不得不说 cpp 的 API 设计很有问题,咋一看之下,根本不知道 new CallData 有什么用,为啥 new 了之后不做任何事情,也没有 delete 操作,不会内存溢出吗?然后进入到 构造方法 中才发现逻辑都在构造函数中,这样的写代码的方式我只在这里看到过。