之前一直在用qiniu的存储服务,生成图片的缩略图,模糊图,视频的webp,现在需要把存储移到s3上,那么这些图片,视频处理就要自己动手写了,本文梳理一下大致的思路。
分析需求
vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/xxx|
cmdcmdcmd
{
"id": "xxxxx",
"pipeline": "xxx",
"code": 0,
"desc": "The fop was completed successfully",
"reqid": "xTsAAFnxUbR5J10U",
"inputBucket": "xxx",
"inputKey": "xxxxx",
"items": [
{
"cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzA=",
"code": 0,
"desc": "The fop was completed successfully",
"hash": "FhdN6V8EI4vW4XJGALSfxutvMEIv",
"key": "xx",
"returnOld": 0
},
{
"cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|imageMogr2/blur/45x8|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzBfYmx1cg==",
"code": 0,
"desc": "The fop was completed successfully",
"hash": "FgNiRzrCsa7TZx1xVSb_4d5TiaK3",
"key": "xxx",
"returnOld": 0
}
]
}
分解需求
这个程序大致需要这么几个部分:
operation
可以把 1 和 2,3 分开来看,1 比较独立,之前写过一个worker的模型,参考的是这篇文章 Handling 1 Million Requests per Minute with Go,比较详细,是用 go channel 作为queue的,我加了一个 beanstalk 作为 queue的 providor。还有一点改进是,文章中只提供了worker数量的设置,我再加了一个参数,设定每个worker可以并行执行的协程数。所以下面主要讲讲3, 2的解决办法
Pipe
可以参考这个库 pipe, 用法如下:
p := pipe.Line(
pipe.ReadFile("test.png"),
resize(300, 300),
blur(0.5),
)
output, err := pipe.CombinedOutput(p)
if err != nil {
fmt.Printf("%v\n", err)
}
buf := bytes.NewBuffer(output)
img, _ := imaging.Decode(buf)
imaging.Save(img, "test_a.png")
CmdOperation[]Op
type Cmd struct {
cmd string
saveas string
ops []Op
err error
}
type Op interface {
getPipe() pipe.Pipe
}
type ResizeOp struct {
width, height int
}
func (c ResizeOp) getPipe() pipe.Pipe {
return resize(c.width, c.height)
}
//使用方法
cmdStr := `file/test.png|thumbnail/x300|blur/20x8`
cmd := Cmd{cmdStr, "test_b.png", nil, nil}
cmd.parse()
cmd.doOps()
sync.WaitGroup
sync.WaitGroup
func main() {
cmds := []string{}
for i := 0; i < 10000; i++ {
cmds = append(cmds, fmt.Sprintf("cmd-%d", i))
}
results := handleCmds(cmds)
fmt.Println(len(results)) // 10000
}
func doCmd(cmd string) string {
return fmt.Sprintf("cmd=%s", cmd)
}
func handleCmds(cmds []string) (results []string) {
fmt.Println(len(cmds)) //10000
var count uint64
group := sync.WaitGroup{}
lock := sync.Mutex{}
for _, item := range cmds {
// 计数加一
group.Add(1)
go func(cmd string) {
result := doCmd(cmd)
atomic.AddUint64(&count, 1)
lock.Lock()
results = append(results, result)
lock.Unlock()
// 计数减一
group.Done()
}(item)
}
// 阻塞
group.Wait()
fmt.Printf("count=%d \n", count) // 10000
return
}
group.Wait()results = append(results, result)len(results)
BenchCmd
type BenchCmd struct {
cmds []Cmd
waitGroup sync.WaitGroup
errs []error
lock sync.Mutex
}
func (b *BenchCmd) doCmds() {
for _, item := range b.cmds {
b.waitGroup.Add(1)
go func(cmd Cmd) {
cmd.parse()
err := cmd.doOps()
b.lock.Lock()
b.errs = append(b.errs, err)
b.lock.Unlock()
b.waitGroup.Done()
}(item)
}
b.waitGroup.Wait()
}
最后的调用就像这样:
var cmds []Cmd
cmd_a := Cmd{`file/test.png|thumbnail/x300|blur/20x8`, "test_a.png", nil, nil}
cmd_b := Cmd{`file/test.png|thumbnail/500x1000|blur/20x108`, "test_b.png", nil, nil}
cmd_c := Cmd{`file/test.png|thumbnail/300x300`, "test_c.png", nil, nil}
cmds = append(cmds, cmd_a)
cmds = append(cmds, cmd_b)
cmds = append(cmds, cmd_c)
bench := BenchCmd{
cmds: cmds,
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
bench.doCmds()
fmt.Println(bench.errs)
pipe