记录一下
package redis
import (
"bytes"
"encoding/json"
"errors"
)
type Job struct {
JobId string `json:"job_id"`
Uid uint64 `json:"uid"`
Action string `json:"action"` //
Input string `json:"input"` //json
CreateTime int64 `json:"create_time"` //时间戳-毫秒
Retry int `json:"retry"` //重试次数
ErrCode int `json:"err_code"` //错误code
Message string `json:"message"` //错误信息
}
//入
func PushQueue(aliasName string, dbNode int, jobKey string, jobs []*Job) error {
if len(jobs) == 0 {
return errors.New("no jobs")
}
if jobKey == "" {
return errors.New("no job key")
}
pool, err := GetConnPool(aliasName, dbNode, true)
if err != nil {
return err
}
conn := pool.Get()
defer conn.Close()
for _, v := range jobs {
b, err := json.Marshal(v)
if err != nil {
return err
}
//入队列
_, err = conn.Do("rpush", jobKey, b)
if err != nil {
return err
}
}
return nil
}
//取
func PopQueue(aliasName string, dbNode int, jobKey string) (*Job, error) {
pool, err := GetConnPool(aliasName, dbNode, true)
if err != nil {
return nil, err
}
conn := pool.Get()
defer conn.Close()
reply, err := conn.Do("LPOP", jobKey)
if err != nil {
return nil, err
}
if reply != nil {
var j *Job
decoder := json.NewDecoder(bytes.NewReader(reply.([]byte)))
if err := decoder.Decode(&j); err != nil {
return nil, err
}
return j, nil
}
return nil, nil
}