这是个搁置了有段时间了的个人兴趣项目,还有不少完善工作需要做,这里记录下基本思路和实现,欢迎拍砖。

一、OPQ是什么

  • An Open sourced Persistent message Queue
  • 一款开源的持久化消息队列
  • 基于go 1.4.2实现
  • 功能
  1. 消息持久化
  2. 采用推送模式
  3. 易用,无需集成客户端,调用服务API即可
  4. 消息重放
  5. 高性能(目标)
  6. 运维友好——平滑重启/高可用(todo)/可视化控制台(todo)等
  • 性能(机器 - 单台mac pro)
  1. 当消息体大小为2K Bytes时,>20,000Message/Second
  2. 当消息体大小为1K Bytes时,>30,000Message/Second
  3. 当消息体大小为128Bytes时,>60,000Message/Second

二、如何使用

下载源码

go get -u github.com/LevinLin/OPQ

编译安装

cd /path/to/OPQ
go build

运行服务

cd /path/to/OPQ
nohup ./OPQ &>/dev/null &

参数说明:

-debug

当debug=yes时,服务运行在debug模式,主要用于log/output

-port

监听端口,默认8999

-syslog

系统日志,默认为system.log

平滑重启
kill -1 %{PID}

提交消息

url

http://<HOST>[:<PORT>]/opq/push

post fields

url: 目标url topic: topic名称,每条消息必须指定一个topic message: 消息具体内容

headers

头部,如果需要

请求示例(PHP)
<?php
  $url = "http://localhost:8999/opq/push";
  
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $url);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
  curl_setopt($ch, CURLOPT_POST, 1);

  $data = array(
      'url' => 'http://127.0.0.1/Comment/addComment?comment=nny&user=q18',
      'topic'=> 'comment',
      'message'=> 'this is message body',
  );
  curl_setopt($ch, CURLOPT_POSTFIELDS, $data);

  $response = curl_exec($ch);
  var_dump($response);
  curl_close($ch);

回放消息

url

http://<HOST>[:<PORT>]/opq/replay

post fields

topic: 指定需要回放消息所属topic cmd: 消息序号

headers

头部,如果需要

请求示例(PHP)
<?php
  $url = "http://localhost:8999/opq/replay";
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $url);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); 
  curl_setopt($ch, CURLOPT_POST, 1); 
  
  $data = array(
      'topic'=> 'comment',
      'cmd'=> '30',
  );  
  curl_setopt($ch, CURLOPT_POSTFIELDS, $data);

  $response = curl_exec($ch);
  var_dump($response);
  curl_close($ch);

三、设计思路

总体上借鉴了kafka的设计(topic/消息定位等),但是没有照搬具体实现,同时舍掉了客户端代码的需求。

总体架构图:

listener

或者称为dispatcher,负责监听对消息队列的请求,将请求处理任务加到任务队列(task queue)里

recorder(s)

多个recoder,并发获取任务队列里的任务进行处理(主要是数据序列化),然后通知record service进行持久化操作

record service

record service负责数据在持久化过程中的串行写入,根据消息所属topic,分别更新对应路径下的文件:索引(<N>.idx,文件按固定数目进行切分,N为切分区间最小的消息序号),消息(<N>.msg),总数(cmd)

deliverer(s)

从dlv文件获取需要发送的消息起始序号M,根据M从索引文件查找比该消息更早的最近一条消息的索引信息S,根据S从消息文件查找到序号M的消息内容,依次顺序发送后面的消息到对应目标地址,同时更新已发送序号到dlv文件

代码结构:

四、性能方面的一些考虑

  • 带缓冲的写入,即累计一定数目的消息或达到一定数据量才写入文件,减少磁盘IO
  • 采用稀疏索引,保证索引文件足够小
  • 采用内存映射,将用到的索引文件放入内存,减少磁盘IO
  • 采用二分查找+顺序查找的方式定位索引
  • 采用FlatBuffers进行数据的序列化/反序列化
  • 基于Goroutine的dispatcher/workers模型,worker数目可调

五、待完善的事项

  • 解决单点问题,实现HA
  • 友好的管理平台Admin Portal
  • 性能优化
  • 全面的测试,目前未经过生产环境验证,请慎用( ̄▽ ̄)"