主要利用redis的brpop阻塞读和Golang的goroutine并发控制以及os/exec执行程序,实现队列有数据就立即执行对应程序并把结果set任务key。
运行参数设置brpop的超时(-t)和同步调度时返回的结果ttl(-e)
./dispatchdeploy
Usage: -h 192.168.6.151 -p 6388 -t 300 -a /path/testfile.pl -e 1800
-a string
start appname (default "/path/testfile.pl")
-e int
redis expire time sec (default 1800)
-h string
redis ip
-p int
redis port (default 6379)
-t int
redis brpop timeout (default 300)
静态数据
const (
maxthread = 2 //最大并发协程数
queueName = "qn_kt" //阻塞读队列
result_queueName = "rt_kt" //同步返回结果的key前缀
token = "##" //执行调度参数的指定分隔符
sync_flag = "1"
)
关键代码
//阻塞读,当有数据分割参数,使用channel控制并发协程数,在execCmd的cmd.wait正常后释放channel
for {
content, _ := redisdb.brpop(queueName, *timeout)
if content != nil {
args := strings.Split(string(content[1]), token)
if len(args) != 4 {
log.Printf("%v lack of para length %s\n", args, len(args))
} else {
//控制并发数
sync_num <- 1
go execCmd(*appname, args, redisdb)
log.Printf("%s %v Go\n", *appname, args)
}
} else {
log.Printf("timeout %d get nil contenet , just go on", *timeout)
}
}
测试
lpush三个调度到队列
127.0.0.1:6888> lpush qn_kt "6234##ZYYC0001##20170620140000##0" "5234##ZYYC0001##20170620140000##1" "7234##ZYYC0001##20170620140000##1"
(integer) 3
//控制并发数为2,立即调度执行了两个perl程序,等到返回结果执行第三个
2017/06/20 16:45:21 Start listen qn_kt
2017/06/20 16:45:25 testfile.pl [6234 ZYYC0001 20170620140000 0] Go
2017/06/20 16:45:25 testfile.pl [5234 ZYYC0001 20170620140000 1] Go
2017/06/20 16:45:30 testfile.pl [6234 ZYYC0001 20170620140000 0] finish
2017/06/20 16:45:30 testfile.pl [7234 ZYYC0001 20170620140000 1] Go
2017/06/20 16:45:30 testfile.pl [5234 ZYYC0001 20170620140000 S] finish
2017/06/20 16:45:35 testfile.pl [7234 ZYYC0001 20170620140000 S] finish
2017/06/20 16:45:51 timeout 20 get nil contenet , just go on
2017/06/20 16:46:12 timeout 20 get nil contenet , just go on
//同步调度任务执行完成后set对应任务号,由接口程序读取,1800秒后redis回收
127.0.0.1:6888> get rt_kt_7234_ZYYC0001
"7234##ZYYC0001##20170620140000##1"
127.0.0.1:6888> ttl rt_kt_7234_ZYYC0001
(integer) 1791
有疑问加站长微信联系(非本文作者)