企业微信留痕文件下载
文件下载,官方给的文档是只能单线下载,实际代码运行时,遇到 大型文件,官方接口无法满足消息即时性要求,分析结构后,实现多线程下载
给出关键语句,供参考,实际就是计算文件大小,直接计算出每段文件所在字节区间,但文件大小有时不可靠,也需要处理,不合规文件
// MultipleDownloadMediaData 多线程文件下载
func (s *WeworkSyncMediaService) MultipleDownloadMediaData(corpId, msgId string, waitDown MsgFileStruct) (err error) {
md5Sum := waitDown.Md5Sum
sdkFileId := waitDown.SdkFileId
size := int(waitDown.Size)
suffix := waitDown.Suffix
indexBuffer := ""
// 测试发现,如果要多线程下载需要获取第一段切片文件后的nextBuffer,和倒数第二个切片的nextBuffer
// 也就是说每一个切片文件和最后一个切片文件无法进行多线程下载,第一个切片文件和最后一个切片必需要用微信给的值进行下载
var lastBufferChan = make(chan string, 1)
// 计算企业微信文件切片逻辑,生成nextBuffer
nextBuffer := 524288
//fileSliceNum := (size / nextBuffer) + 1
fileSliceNum := int(math.Ceil(float64(size) / float64(nextBuffer)))
calcFileSliceNum := fileSliceNum
var bufferSlice []string
for {
tmp := nextBuffer + 524287
if tmp >= size {
break
}
bufferSlice = append(bufferSlice, "Range:bytes="+cast.ToString(nextBuffer)+"-"+cast.ToString(tmp))
nextBuffer = tmp + 1
}
helper.Prs(md5Sum, "#buffers:", bufferSlice)
//helper.Reverse(&bufferSlice)
var eg1 errgroup.Group
var eg errgroup.Group
// 真实文件下载数
actualFileSliceNum := 0
// 文件提前下载完毕时,真实下载数,此值不为0,要覆盖actualFileSliceNum值
var finishEarlyFileSliceNum int
// 第一次下载,文件是否正常可以下载标记
alsPanicFileFlag := false
// 重试次数
boundaryTryTimes := 0
// 第一个文件后缀和最后一个文件的后缀
boundaryVal := 1
boundaryGetMediaData:
mediaData, err1 := MediaClient[corpId].GetMediaData(indexBuffer, sdkFileId, "", "", 3)
if err1 != nil {
boundaryTryTimes++
if boundaryTryTimes < 5 {
goto boundaryGetMediaData
}
err = err1
alsPanicFileFlag = s.GetMediaDataErrLevel(err1, msgId)
}
actualFileSliceNum++
sys.Log().Info("download index: ", md5Sum, "#", boundaryVal, "#", indexBuffer)
if alsPanicFileFlag {
return nil
}
if err != nil {
return
}
// 项目路径,因为是本地DOCKER,所以这样获取,上线后改为从配置文件中读取
filePathBase, _ := os.Getwd()
// 下载保存的文件路径
relativeDir := "/uploads/" + ContainerName + "/"
baseDir := filePathBase + relativeDir
// 文件写入路径,文件命名为文件md5值+分片数,例: 3fb6b846102670d93ea6af538567eb89_1
filePath := baseDir + md5Sum + "_" + cast.ToString(boundaryVal)
// 文件写入磁盘
eg1.Go(func() error {
fp := filePath
slice := boundaryVal
md := mediaData.Data
if err := CommonPool.Process(TunnyBasePool{
Ttype: TunnyWriteFile,
Args: map[string]interface{}{
"filePath": fp,
"slice": slice,
"data": md,
},
}); err != nil {
sys.Log().Error("分片写入文件失败:", err, md5Sum)
return errors.New("分片文件写入失败:" + cast.ToString(err))
}
return nil
})
if err1 := eg1.Wait(); err1 != nil {
err = err1
return
}
if boundaryVal > 1 && mediaData.IsFinish == false {
// 设置提醒
sys.Log().Info("多分片下载,分片计算少于微信所给分片,请关注此文件是否正常:", msgId, "#", md5Sum)
}
// 文件大于512KB
if mediaData.IsFinish == false {
// 除去第一个和最后一个文件切片数后 > 2,其它可以进行多线程下载
// boundaryVal 加此判断是因为,如果我计算的分片数实际比微信给的少了,那我会缺失文件分片下载,会造成分片会重新下载而且死循环
if len(bufferSlice) > 0 && boundaryVal == 1 {
for dIdx, dBuffer := range bufferSlice {
// 分片文件后缀没有0,1已经被下载,从2开始
tIdx := dIdx + 2
indexBuffer := dBuffer
eg.Go(func() error {
sys.Log().Info("download index: ", md5Sum, "#", tIdx, "#", indexBuffer)
// 下载文件
tryTimes := 0
getMediaData:
// 最大下载文件数量控制
GetMediaDataMaxThread <- 1
mediaDataGr, err1 := MediaClient[corpId].GetMediaData(indexBuffer, sdkFileId, "", "", 3)
<-GetMediaDataMaxThread
if err1 != nil {
tryTimes++
if tryTimes < 5 {
goto getMediaData
}
return err1
}
// 写文件到本地
filePath := baseDir + md5Sum + "_" + cast.ToString(tIdx)
writeTime := 0
writeFile:
err1 = s.writeFile(filePath, tIdx, mediaDataGr.Data)
if err1 != nil {
writeTime++
if writeTime < 5 {
goto writeFile
}
return err1
}
// 提前结束??, 最大的分片后缀就是此次的tIdx
if mediaDataGr.IsFinish {
sys.Log().Info("多分片 ,文件提前下载完成:", tIdx, "#", md5Sum)
finishEarlyFileSliceNum = tIdx
lastBufferChan <- "finishEarly"
} else if (tIdx - 1) == len(bufferSlice) {
//文件正常,没有提前下载完毕
lastBufferChan <- mediaDataGr.OutIndexBuf
}
actualFileSliceNum++
return nil
})
}
} else {
lastBufferChan <- mediaData.OutIndexBuf
}
// 等待最后一个文件切片buffer
sys.Log().Info("Multiple File, waiting last file buffer:", md5Sum)
indexBuffer = <-lastBufferChan
sys.Log().Info("Multiple File, waiting last file buffer end:", indexBuffer)
// 提前下载结束的buffer,不用下载最后的文件
if indexBuffer != "finishEarly" {
// 下载最后一个分片文件
boundaryTryTimes = 0
// 如果文件size很少,我计算出一片,实际上文件要大于2片,此值不对,需要处理
if calcFileSliceNum == 1 {
boundaryVal = fileSliceNum + 1
} else {
boundaryVal = fileSliceNum
}
// 如果我计算的分片数实际比微信给的少了,那我会缺失文件分片下载,少的文件继续进行下载,直到IsFinish为true
fileSliceNum++
goto boundaryGetMediaData
}
}
sys.Log().Info("Multiple File,Waiting to all files are done:", md5Sum)
// 多线程分片文件下载出错
if err := eg.Wait(); err != nil {
return err
}
fmt.Println("#########downading end, start to merge##########")
// 至此,文件分片全下载完毕,进行合并文件
// 读取文件片段 进行合并, 协程池
// 添加actureFileSliceNum字段,计算可能不准,2个分片文件时,微信可能提前就给完
// 添加finishEarlyFileSliceNum,>2个分片时,下载可能会提前完成
if finishEarlyFileSliceNum > 0 {
actualFileSliceNum = finishEarlyFileSliceNum
}
for k := 1; k <= actualFileSliceNum; k++ {
kt := k
eg.Go(func() error {
if errMerge := CommonPool.Process(TunnyBasePool{
Ttype: TunnyMergeFile,
Args: map[string]interface{}{
"k": kt,
"baseDir": baseDir,
"md5SumSec": md5Sum,
"fileSuffixTmp": suffix,
},
}); errMerge != nil {
sys.Log().Error("file merge goroutine err:", errMerge)
return errors.New("file merge goroutine err:" + cast.ToString(errMerge))
}
return nil
})
}
// 多线程分片文件合并
if err := eg.Wait(); err != nil {
return err
}
sys.Log().Info("file merged")
return nil
}