软件简介

1. 概述

gmqredisgogmqgmq
dispatcherbuckettimerbuckettimertimertimerbucketjobjobjobtimerbucketjob

2. 应用场景

(TTR)

3. 安装

3.1 源码运行

gmq/conf.ini
cd $GOPATH/src # 进入gopath/src目录
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 如果有就不需要安装了
govendor sync -v # 如果很慢,可能需要翻墙
go run main.go start

3.2 执行文件运行

cd $GOPATH/src/gmq
# 编译成可执行文件
go build 
# 启动
./gmq start
# 停止
./gmq stop

# 守护进程模式启动,不输出日志到console
nohup ./gmq start >/dev/null 2>&1  &
# 守护进程模式下查看日志输出(配置文件conf.ini需要设置target_type=file,filename=gmq.log)
tail -f gmq.log

4. 客户端

目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq

运行

# php
# 生产者
php producer.php
# 消费者
php consumer.php

# python
# 生产者
python producer.py
# 消费者
python consumer.py

一条消息结构

{
    "id": "xxxx",   # 任务id,这个必须是一个唯一值,将作为redis的缓存键
    "topic": "xxx",  # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
    "body": "xxx",   # 消息内容
    "delay": "111",  # 延迟时间,单位秒
    "TTR": "11111",  # 执行超时时间,单位秒
    "status": 1,     # job执行状态,该字段由gmq生成
    "consumeNum":1,  # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}

延迟任务

 $data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '1800', // 单位秒,半个小时后执行
        'TTR'   => '0'
    ];

超时任务

 $data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费
    ];

异步任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

优先级任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

5. gmq 流程图如下:

5.1 延迟时间 delay

ready queueready queue

5.2 执行超时时间 TTR

job.TTR>0,job.delay = job.TTRready queue

5.3 确认机制

主要和TTR的设置有关系,确认机制可以分为两种:

popjob poolpopackACK,ready queueACK,job pool

6. web 监控

gmq8000,

7. 遇到问题

以下是开发遇到的问题,以及一些粗糙的解决方案

7.1 安全退出

gmqbucketready queue,job pooljob poolgoroutine
gmq
gmqdispatcherdispatcherdispatcher.closedbucketclosetimerbucketdispatcher
dispatchertimerbucketdispatcher

7.1.2 注意

kill -9 pidkill
-15 pidkill -1 pidkill -2 pid
  • 9 对应 SIGKILL
  • 15 对应 SIGTERM
  • 1 对应 SIGHUP
  • 2 对应 SIGINT

7.2 智能定时器

buckettimertimerbuckettimerbuckettimertimer

7.3 原子性问题

multi/execlua脚本,``gmqlua脚本,lua
脚本multi/execPipepiningredis
servergmq
-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do 
    if k%2~=0 then
        local jobKey = string.format('%s:%s', ARGV[3], jobId)
        local status = redis.call('hget', jobKey, 'status')
        -- 检验job状态
        if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
            -- 先移除集合中到期的job,然后到期的job返回给timer
            local isDel = redis.call('zrem', KEYS[1], jobId)
            if isDel == 1 then
                table.insert(res, jobId)
            end
        end
    end
end

local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
    nextTime = -1
else
    nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
    if nextTime < 0 then
        nextTime = 1
    end
end

table.insert(res,1,nextTime)
return res

7.4 redis 连接池

swoolegmqgomodule/redigo/redis
// gmq/mq/redis.go
Redis = &RedisDB{
    Pool: &redis.Pool{
        MaxIdle:     30,    // 最大空闲链接
        MaxActive:   10000, // 最大链接
        IdleTimeout: 240 * time.Second, // 空闲链接超时
        Wait:        true, // 当连接池耗尽时,是否阻塞等待
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
            if err != nil {
                return nil, err
            }
            return c, nil
        },
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute {
                return nil
            }
            _, err := c.Do("PING")
            return err
        },
    },
}

8. 注意问题

job poolgmqready queue

9. 使用中可能出现的问题

9.1 客户端出现大量的 TIME_WAIT 状态,并且新的连接被拒绝

netstat -anp | grep 9503 | wc -l
tcp        0      0 10.8.8.188:41482        10.8.8.185:9503         TIME_WAIT   -
TIME_WAITTIME_WAIT
TIME_WAIT

10. 相关链接