企业微信留痕文件下载

文件下载,官方给的文档是只能单线下载,实际代码运行时,遇到 大型文件,官方接口无法满足消息即时性要求,分析结构后,实现多线程下载
给出关键语句,供参考,实际就是计算文件大小,直接计算出每段文件所在字节区间,但文件大小有时不可靠,也需要处理,不合规文件


// MultipleDownloadMediaData 多线程文件下载
func (s *WeworkSyncMediaService) MultipleDownloadMediaData(corpId, msgId string, waitDown MsgFileStruct) (err error) {
	md5Sum := waitDown.Md5Sum
	sdkFileId := waitDown.SdkFileId
	size := int(waitDown.Size)
	suffix := waitDown.Suffix
	indexBuffer := ""
	// 测试发现,如果要多线程下载需要获取第一段切片文件后的nextBuffer,和倒数第二个切片的nextBuffer
	// 也就是说每一个切片文件和最后一个切片文件无法进行多线程下载,第一个切片文件和最后一个切片必需要用微信给的值进行下载
	var lastBufferChan = make(chan string, 1)
	// 计算企业微信文件切片逻辑,生成nextBuffer
	nextBuffer := 524288
	//fileSliceNum := (size / nextBuffer) + 1
	fileSliceNum := int(math.Ceil(float64(size) / float64(nextBuffer)))
	calcFileSliceNum := fileSliceNum
	var bufferSlice []string
	for {
		tmp := nextBuffer + 524287
		if tmp >= size {
			break
		}
		bufferSlice = append(bufferSlice, "Range:bytes="+cast.ToString(nextBuffer)+"-"+cast.ToString(tmp))
		nextBuffer = tmp + 1
	}
	helper.Prs(md5Sum, "#buffers:", bufferSlice)
	//helper.Reverse(&bufferSlice)

	var eg1 errgroup.Group
	var eg errgroup.Group
	// 真实文件下载数
	actualFileSliceNum := 0
	// 文件提前下载完毕时,真实下载数,此值不为0,要覆盖actualFileSliceNum值
	var finishEarlyFileSliceNum int
	// 第一次下载,文件是否正常可以下载标记
	alsPanicFileFlag := false
	// 重试次数
	boundaryTryTimes := 0
	// 第一个文件后缀和最后一个文件的后缀
	boundaryVal := 1
boundaryGetMediaData:
	mediaData, err1 := MediaClient[corpId].GetMediaData(indexBuffer, sdkFileId, "", "", 3)
	if err1 != nil {
		boundaryTryTimes++
		if boundaryTryTimes < 5 {
			goto boundaryGetMediaData
		}
		err = err1
		alsPanicFileFlag = s.GetMediaDataErrLevel(err1, msgId)
	}
	actualFileSliceNum++
	sys.Log().Info("download index: ", md5Sum, "#", boundaryVal, "#", indexBuffer)
	if alsPanicFileFlag {
		return nil
	}
	if err != nil {
		return
	}
	// 项目路径,因为是本地DOCKER,所以这样获取,上线后改为从配置文件中读取
	filePathBase, _ := os.Getwd()
	// 下载保存的文件路径
	relativeDir := "/uploads/" + ContainerName + "/"
	baseDir := filePathBase + relativeDir
	// 文件写入路径,文件命名为文件md5值+分片数,例: 3fb6b846102670d93ea6af538567eb89_1
	filePath := baseDir + md5Sum + "_" + cast.ToString(boundaryVal)
	// 文件写入磁盘
	eg1.Go(func() error {
		fp := filePath
		slice := boundaryVal
		md := mediaData.Data
		if err := CommonPool.Process(TunnyBasePool{
			Ttype: TunnyWriteFile,
			Args: map[string]interface{}{
				"filePath": fp,
				"slice":    slice,
				"data":     md,
			},
		}); err != nil {
			sys.Log().Error("分片写入文件失败:", err, md5Sum)
			return errors.New("分片文件写入失败:" + cast.ToString(err))
		}
		return nil
	})
	if err1 := eg1.Wait(); err1 != nil {
		err = err1
		return
	}

	if boundaryVal > 1 && mediaData.IsFinish == false {
		// 设置提醒
		sys.Log().Info("多分片下载,分片计算少于微信所给分片,请关注此文件是否正常:", msgId, "#", md5Sum)
	}

	// 文件大于512KB
	if mediaData.IsFinish == false {
		// 除去第一个和最后一个文件切片数后 > 2,其它可以进行多线程下载
		// boundaryVal 加此判断是因为,如果我计算的分片数实际比微信给的少了,那我会缺失文件分片下载,会造成分片会重新下载而且死循环
		if len(bufferSlice) > 0 && boundaryVal == 1 {
			for dIdx, dBuffer := range bufferSlice {
				// 分片文件后缀没有0,1已经被下载,从2开始
				tIdx := dIdx + 2
				indexBuffer := dBuffer
				eg.Go(func() error {
					sys.Log().Info("download index: ", md5Sum, "#", tIdx, "#", indexBuffer)
					// 下载文件
					tryTimes := 0
				getMediaData:
					// 最大下载文件数量控制
					GetMediaDataMaxThread <- 1
					mediaDataGr, err1 := MediaClient[corpId].GetMediaData(indexBuffer, sdkFileId, "", "", 3)
					<-GetMediaDataMaxThread
					if err1 != nil {
						tryTimes++
						if tryTimes < 5 {
							goto getMediaData
						}
						return err1
					}
					// 写文件到本地
					filePath := baseDir + md5Sum + "_" + cast.ToString(tIdx)
					writeTime := 0
				writeFile:
					err1 = s.writeFile(filePath, tIdx, mediaDataGr.Data)
					if err1 != nil {
						writeTime++
						if writeTime < 5 {
							goto writeFile
						}
						return err1
					}
					// 提前结束??, 最大的分片后缀就是此次的tIdx
					if mediaDataGr.IsFinish {
						sys.Log().Info("多分片 ,文件提前下载完成:", tIdx, "#", md5Sum)
						finishEarlyFileSliceNum = tIdx
						lastBufferChan <- "finishEarly"
					} else if (tIdx - 1) == len(bufferSlice) {
						//文件正常,没有提前下载完毕
						lastBufferChan <- mediaDataGr.OutIndexBuf
					}
					actualFileSliceNum++
					return nil
				})
			}
		} else {
			lastBufferChan <- mediaData.OutIndexBuf
		}
		// 等待最后一个文件切片buffer
		sys.Log().Info("Multiple File, waiting last file buffer:", md5Sum)
		indexBuffer = <-lastBufferChan
		sys.Log().Info("Multiple File, waiting last file buffer end:", indexBuffer)
		// 提前下载结束的buffer,不用下载最后的文件
		if indexBuffer != "finishEarly" {
			// 下载最后一个分片文件
			boundaryTryTimes = 0
			// 如果文件size很少,我计算出一片,实际上文件要大于2片,此值不对,需要处理
			if calcFileSliceNum == 1 {
				boundaryVal = fileSliceNum + 1
			} else {
				boundaryVal = fileSliceNum
			}
			// 如果我计算的分片数实际比微信给的少了,那我会缺失文件分片下载,少的文件继续进行下载,直到IsFinish为true
			fileSliceNum++
			goto boundaryGetMediaData
		}
	}
	sys.Log().Info("Multiple File,Waiting to all files are done:", md5Sum)
	// 多线程分片文件下载出错
	if err := eg.Wait(); err != nil {
		return err
	}
	fmt.Println("#########downading end, start to merge##########")
	// 至此,文件分片全下载完毕,进行合并文件
	// 读取文件片段 进行合并, 协程池
	// 添加actureFileSliceNum字段,计算可能不准,2个分片文件时,微信可能提前就给完
	// 添加finishEarlyFileSliceNum,>2个分片时,下载可能会提前完成
	if finishEarlyFileSliceNum > 0 {
		actualFileSliceNum = finishEarlyFileSliceNum
	}
	for k := 1; k <= actualFileSliceNum; k++ {
		kt := k
		eg.Go(func() error {
			if errMerge := CommonPool.Process(TunnyBasePool{
				Ttype: TunnyMergeFile,
				Args: map[string]interface{}{
					"k":             kt,
					"baseDir":       baseDir,
					"md5SumSec":     md5Sum,
					"fileSuffixTmp": suffix,
				},
			}); errMerge != nil {
				sys.Log().Error("file merge goroutine err:", errMerge)
				return errors.New("file merge goroutine err:" + cast.ToString(errMerge))
			}
			return nil
		})
	}
	// 多线程分片文件合并
	if err := eg.Wait(); err != nil {
		return err
	}
	sys.Log().Info("file merged")
	return nil
}