之前一直在用qiniu的存储服务,生成图片的缩略图,模糊图,视频的webp,现在需要把存储移到s3上,那么这些图片,视频处理就要自己动手写了,本文梳理一下大致的思路。

分析需求

先看一下qiniu的接口是如何处理图片的,例如先截取视频第一秒的图片,再把图片缩略,最后存储到一个新的key,命令可以这么写 vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/xxx, 可以看到三个操作之间用 | 符号分割,类似unix 的 pipe 操作。

上面的操作算作一个cmd, 一次API请求可以同时处理多个cmd,cmd之间用分号分割, 处理完毕后,在回调中把处理结果返回,例如


{

    "id": "xxxxx", 

    "pipeline": "xxx", 

    "code": 0, 

    "desc": "The fop was completed successfully", 

    "reqid": "xTsAAFnxUbR5J10U", 

    "inputBucket": "xxx", 

    "inputKey": "xxxxx", 

    "items": [

        {

            "cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzA=", 

            "code": 0, 

            "desc": "The fop was completed successfully", 

            "hash": "FhdN6V8EI4vW4XJGALSfxutvMEIv", 

            "key": "xx", 

            "returnOld": 0

        }, 

        {

            "cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|imageMogr2/blur/45x8|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzBfYmx1cg==", 

            "code": 0, 

            "desc": "The fop was completed successfully", 

            "hash": "FgNiRzrCsa7TZx1xVSb_4d5TiaK3", 

            "key": "xxx", 

            "returnOld": 0

        }

    ]

}

分解需求

这个程序大致需要这么几个部分:

一个http接口,接受任务,接受后把任务扔到队列,返回一个job ID。 worker异步处理任务,worker的个数 和 每个worker 并行的处理的个数 能够配置,worker有重试机制。
从 job payload 中解析出需要做的任务,解析出每个cmd, 最好能并行执行每一个 cmd, 记录每一个cmd的结果

每个cmd中有多个 operation, 并且用 pipe 连接,前一个operaion的输出是后一个operation的输入

可以把 1 和 2,3 分开来看,1 比较独立,之前写过一个worker的模型,参考的是这篇文章 Handling 1 Million Requests per Minute with Go,比较详细,是用 go channel 作为queue的,我加了一个 beanstalk 作为 queue的 providor。还有一点改进是,文章中只提供了worker数量的设置,我再加了一个参数,设定每个worker可以并行执行的协程数。所以下面主要讲讲3, 2的解决办法

Pipe

可以参考这个库 pipe, 用法如下:


p := pipe.Line(

    pipe.ReadFile("test.png"),

    resize(300, 300),

    blur(0.5),

)

output, err := pipe.CombinedOutput(p) if err != nil {     fmt.Printf("%v\n", err) }

buf := bytes.NewBuffer(output) img, _ := imaging.Decode(buf)

imaging.Save(img, "test_a.png")