这是个搁置了有段时间了的个人兴趣项目,还有不少完善工作需要做,这里记录下基本思路和实现,欢迎拍砖。
一、OPQ是什么
- An Open sourced Persistent message Queue
- 一款开源的持久化消息队列
- 基于go 1.4.2实现
- 功能
- 消息持久化
- 采用推送模式
- 易用,无需集成客户端,调用服务API即可
- 消息重放
- 高性能(目标)
- 运维友好——平滑重启/高可用(todo)/可视化控制台(todo)等
- 性能(机器 - 单台mac pro)
- 当消息体大小为2K Bytes时,>20,000Message/Second
- 当消息体大小为1K Bytes时,>30,000Message/Second
- 当消息体大小为128Bytes时,>60,000Message/Second
二、如何使用
下载源码
go get -u github.com/LevinLin/OPQ
编译安装
cd /path/to/OPQ
go build
运行服务
cd /path/to/OPQ
nohup ./OPQ &>/dev/null &
参数说明:
-debug
当debug=yes时,服务运行在debug模式,主要用于log/output
-port
监听端口,默认8999
-syslog
系统日志,默认为system.log
平滑重启
kill -1 %{PID}
提交消息
url
http://<HOST>[:<PORT>]/opq/push
post fields
url: 目标url topic: topic名称,每条消息必须指定一个topic message: 消息具体内容
headers
头部,如果需要
请求示例(PHP)
<?php
$url = "http://localhost:8999/opq/push";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
$data = array(
'url' => 'http://127.0.0.1/Comment/addComment?comment=nny&user=q18',
'topic'=> 'comment',
'message'=> 'this is message body',
);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
$response = curl_exec($ch);
var_dump($response);
curl_close($ch);
回放消息
url
http://<HOST>[:<PORT>]/opq/replay
post fields
topic: 指定需要回放消息所属topic cmd: 消息序号
headers
头部,如果需要
请求示例(PHP)
<?php
$url = "http://localhost:8999/opq/replay";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
$data = array(
'topic'=> 'comment',
'cmd'=> '30',
);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
$response = curl_exec($ch);
var_dump($response);
curl_close($ch);
三、设计思路
总体上借鉴了kafka的设计(topic/消息定位等),但是没有照搬具体实现,同时舍掉了客户端代码的需求。
总体架构图:
listener
或者称为dispatcher,负责监听对消息队列的请求,将请求处理任务加到任务队列(task queue)里
recorder(s)
多个recoder,并发获取任务队列里的任务进行处理(主要是数据序列化),然后通知record service进行持久化操作
record service
record service负责数据在持久化过程中的串行写入,根据消息所属topic,分别更新对应路径下的文件:索引(<N>.idx,文件按固定数目进行切分,N为切分区间最小的消息序号),消息(<N>.msg),总数(cmd)
deliverer(s)
从dlv文件获取需要发送的消息起始序号M,根据M从索引文件查找比该消息更早的最近一条消息的索引信息S,根据S从消息文件查找到序号M的消息内容,依次顺序发送后面的消息到对应目标地址,同时更新已发送序号到dlv文件
代码结构:
四、性能方面的一些考虑
- 带缓冲的写入,即累计一定数目的消息或达到一定数据量才写入文件,减少磁盘IO
- 采用稀疏索引,保证索引文件足够小
- 采用内存映射,将用到的索引文件放入内存,减少磁盘IO
- 采用二分查找+顺序查找的方式定位索引
- 采用FlatBuffers进行数据的序列化/反序列化
- 基于Goroutine的dispatcher/workers模型,worker数目可调
五、待完善的事项
- 解决单点问题,实现HA
- 友好的管理平台Admin Portal
- 性能优化
- 全面的测试,目前未经过生产环境验证,请慎用( ̄▽ ̄)"