软件简介
1. 概述
gmqredisgogmqgmq
dispatcherbuckettimerbuckettimertimertimerbucketjobjobjobtimerbucketjob
2. 应用场景
(TTR)
3. 安装
3.1 源码运行
gmq/conf.ini
cd $GOPATH/src # 进入gopath/src目录
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 如果有就不需要安装了
govendor sync -v # 如果很慢,可能需要翻墙
go run main.go start
3.2 执行文件运行
cd $GOPATH/src/gmq
# 编译成可执行文件
go build
# 启动
./gmq start
# 停止
./gmq stop
# 守护进程模式启动,不输出日志到console
nohup ./gmq start >/dev/null 2>&1 &
# 守护进程模式下查看日志输出(配置文件conf.ini需要设置target_type=file,filename=gmq.log)
tail -f gmq.log
4. 客户端
目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq
运行
# php
# 生产者
php producer.php
# 消费者
php consumer.php
# python
# 生产者
python producer.py
# 消费者
python consumer.py
一条消息结构
{
"id": "xxxx", # 任务id,这个必须是一个唯一值,将作为redis的缓存键
"topic": "xxx", # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
"body": "xxx", # 消息内容
"delay": "111", # 延迟时间,单位秒
"TTR": "11111", # 执行超时时间,单位秒
"status": 1, # job执行状态,该字段由gmq生成
"consumeNum":1, # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}
延迟任务
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '1800', // 单位秒,半个小时后执行
'TTR' => '0'
];
超时任务
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费
];
异步任务
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
优先级任务
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
5. gmq 流程图如下:
5.1 延迟时间 delay
ready queueready queue
5.2 执行超时时间 TTR
job.TTR>0,job.delay = job.TTRready queue
5.3 确认机制
主要和TTR的设置有关系,确认机制可以分为两种:
popjob poolpopackACK,ready queueACK,job pool
6. web 监控
gmq8000,
7. 遇到问题
以下是开发遇到的问题,以及一些粗糙的解决方案
7.1 安全退出
gmqbucketready queue,job pooljob poolgoroutine
gmq
gmqdispatcherdispatcherdispatcher.closedbucketclosetimerbucketdispatcher
dispatchertimerbucketdispatcher
7.1.2 注意
kill -9 pidkill
-15 pidkill -1 pidkill -2 pid
- 9 对应 SIGKILL
- 15 对应 SIGTERM
- 1 对应 SIGHUP
- 2 对应 SIGINT
7.2 智能定时器
buckettimertimerbuckettimerbuckettimertimer
7.3 原子性问题
multi/execlua脚本,``gmqlua脚本,lua
脚本multi/execPipepiningredis
servergmq
-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do
if k%2~=0 then
local jobKey = string.format('%s:%s', ARGV[3], jobId)
local status = redis.call('hget', jobKey, 'status')
-- 检验job状态
if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
-- 先移除集合中到期的job,然后到期的job返回给timer
local isDel = redis.call('zrem', KEYS[1], jobId)
if isDel == 1 then
table.insert(res, jobId)
end
end
end
end
local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
nextTime = -1
else
nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
if nextTime < 0 then
nextTime = 1
end
end
table.insert(res,1,nextTime)
return res
7.4 redis 连接池
swoolegmqgomodule/redigo/redis
// gmq/mq/redis.go
Redis = &RedisDB{
Pool: &redis.Pool{
MaxIdle: 30, // 最大空闲链接
MaxActive: 10000, // 最大链接
IdleTimeout: 240 * time.Second, // 空闲链接超时
Wait: true, // 当连接池耗尽时,是否阻塞等待
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
},
}
8. 注意问题
job poolgmqready queue
9. 使用中可能出现的问题
9.1 客户端出现大量的 TIME_WAIT 状态,并且新的连接被拒绝
netstat -anp | grep 9503 | wc -l
tcp 0 0 10.8.8.188:41482 10.8.8.185:9503 TIME_WAIT -
TIME_WAITTIME_WAIT
TIME_WAIT