应用p2p下载的时候有用户问是否核心节点下载实现后,能力开始p2p下载,尤其是几GB 甚至几十GB的大文件。
理论中核心节点的文件是一边下载一边共享给其余节点的,从分片的角度看,下载完第一个分片,全节点共享下载过程就开始了。
本文介绍p2p核心节点作为初始节点下载文件提速的一些优化改良,同时借此分享下golang中的一些并发管制的思考
间接下载
最简略的间接下载,应用默认的http client发动申请,将数据流写入writer即可,实现相干的writer即可实现一边下载一边记录分片信息,从而提供给其余节点下载
社区Dragonfly我的项目的下载源文件的办法
// 间接下载文件
// writer一边拷入文件会一边计算分片,每实现一个分片便能够共享给其余节点下载
func directDownload(url string, headers map[string]string, writer io.Writer) error {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
for k, v := range headers {
req.Header.Set(k, v)
}
resp, err := http.DefaultClient.Do(req)
if err != nil{
return err
}
defer resp.Body.Close()
if _, err := io.Copy(writer, resp.Body); err != nil {
return err
}
return nil
}
多线程下载
然而当文件很大,单链接限速时,文件下载较慢,在http下载中通常提供了Ranges:bytes=a-b来指定本次申请下载a~b字节的局部,因而能够利用该性能来做多线程下载
指定线程数,文件大小,实现多线程下载一个文件,这也是golang中根底的并发执行子工作,搭配应用sync.WaitGroup{}来检测所有的子工作是否实现,实现所有的子工作后即可退出
多线程下载在单线程速度无限时提速显著,例如单链接限速5MB/s时,能够通过多线程来减速下载,取得几倍速度,一些开源的百度云下载减速也是多线程减速。
// 文件过大时,每个协程下载的分片仍旧很大,中断后重试的老本略高
func downloadWithMultiThreadsV1(url string, headers map[string]string, filePath string, threads int, size int64) error {
pieceSize := size / int64(threads)
wg := sync.WaitGroup{}
for i := 0; i < threads; i++ {
wg.Add(1)
go func(pieceNum int64, pieceSize int64, headers map[string]string) {
defer wg.Done()
if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {
panic(err)
}
}(int64(i), pieceSize, copyHeader(headers))
}
wg.Wait()
return nil
}
// 下载分片写入文件
func downloadWithPiece(url string, headers map[string]string, filePath string, pieceNum int64, pieceSize int64) error {
startRange := pieceNum * pieceSize
endRange := (pieceNum+1)*pieceSize - 1
byteRanges := fmt.Sprintf("bytes=%d-%d", startRange, endRange)
headers["Range"] = byteRanges
fw, err := os.OpenFile(filePath, os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
// 关上文件偏移至该写入的地位
_, err = fw.Seek(startRange, 0)
if err != nil {
return err
}
err = directDownload(url, headers, fw)
if err != nil {
return err
}
return nil
}
基于小分片多线程下载
大文件在跨region、跨国传输时,网络稳定容易导致链接断开后,间接开固定线程数去均分文件下载失败后须要重试下载的分片很大,重试老本较高。
因而能够分较小的分片去多线程下载,失败后重试老本也较低,实现断点续传也简略。
一个1GB的文件以16MB分片去下载,须要60次,显然像上一个办法那样间接for循环启动去下载会导致协程过多,且源站压力也会大,因而须要管制并发。
间接应用channel传递信号来管制并发个数是比拟根底的并发管制伎俩。
常见的并发编程写法可见《Go语言中文文档》-并发编程
《Go语言高级编程》-常见的并发模式
这样子既实现了多线程下载,在下载单个分片失败后,重试老本也较低。
// 以16MB的大小来下载文件
// 同时下载3个分片
// 各个分片竞争下载速度,会呈现下载分片n的时候速度很慢,而前面始终在下载n+3、n+4等导致前局部分片迟迟未实现
func downloadWithMultiThreadsV2(url string, headers map[string]string, filePath string, threads int, size int64) error {
var pieceSize int64 = 16 * 1024 *1024
totalPieceNum := (size+1) / pieceSize
maxThreads := make(chan struct{}, threads)
var pieceNum int64 = 0
wg := sync.WaitGroup{}
for ; pieceNum <= totalPieceNum; pieceNum++{
maxThreads <- struct{}{}
wg.Add(1)
go func(pieceNum int64, pieceSize int64, headers map[string]string) {
defer func() {
<-maxThreads
wg.Done()
}()
if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {
panic(err)
}
}(pieceNum, pieceSize, copyHeader(headers))
}
wg.Wait()
return nil
}
下载文件校验
下载文件经常放心文件是否为原文件,因而下载后须要校验,最罕用的为md5,然而文件较大时下载实现后,还要再从磁盘读取文件计算md5,耗时耗力,md5计算大概在mac上单核计算是750MB/s左右,加上从磁盘读取速度就更低了,最好是能一边下一遍校验。
对于单线程下载大文件很容易实现校验,将接管数据流的writer与计算md5的writer组合成一个writer,就能够边下载边校验md5了
// 计算下载的md5hash
func downloadGetHash(url string, headers map[string]string, fw io.Writer) (string, error) {
md5Hash := md5.New()
writer := io.MultiWriter(fw, md5Hash)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return "", err
}
for k, v := range headers {
req.Header.Set(k, v)
}
resp, err := http.DefaultClient.Do(req)
if err != nil{
return "", err
}
defer resp.Body.Close()
if _, err := io.Copy(writer, resp.Body); err != nil {
return "", err
}
md5Value := fmt.Sprintf("%x", md5Hash.Sum(nil))
return md5Value, nil
}
多线程下载校验md5的难点
多线程下载时,分片实现程序不统一,无奈像单线程下载那样简略一边下载一边计算,不必反复从磁盘读取。
当然能够从头开始期待下载完一个分片,而后就去校验一个分片,刚下载好去读取进去校验,能够走文件cache,不会额定占用磁盘i/o也是能够承受的。
然而在应用channel来管制并发的下载过程,可能呈现还在等第1个分片下载实现来校验,然而其余线程曾经下载1+n个分片去了,这时候等第一个分片下载好去校验md5时,接下来2~1+n个分片可能曾经不在cache里了,得走一遍磁盘读取了。
滑动窗口多线程分片下载
各个分片竞争下载速度,会呈现下载分片n的时候速度很慢,而前面始终在下载n+3、n+4等导致前局部分片迟迟未实现,导致晚期的分片无奈尽快下载实现提供给客户端下载,所以咱们心愿整体是按程序去下载分片的。
需要变成了x线程下载时,同时最多在下载x个分片,然而n+1,n+2等实现后还要等第n个分片实现能力去下载第n+x个分片,其实就是滑动窗口。
滑动窗口来管制并发能够防止后续竞争资源导致先发的工作迟迟不能实现,不会呈现靠前分片远远落后前面的分片,缩小了cache淘汰速度
tcp传输中应用了滑动窗口进步吞吐量,依据网络状况来调整窗口大小,能够参考下文
tcp滑动窗口简介
tcp滑动窗口源码简析
咱们在应用多线程下载中应用线程数的窗口大小管制思维就能够了,找了圈golang 滑动窗口,都是针对网关流量的工夫滑动窗口实现- -,只能本人实现了
滑动窗口的实质就是在进行第n+x个工作时检测第n个工作是否实现,实现了则能够持续,否则期待
思考一下能够应用map+sync.mux就能够实现一个简略并发管制的滑动窗口
package lock
import "sync"
type SlideWindowLocker struct {
threads int
window map[int]*sync.Mutex
}
func NewSlideWindow(threads int) *SlideWindowLocker {
window := make(map[int]*sync.Mutex)
for i:=0;i<threads;i++{
window[i] = &sync.Mutex{}
}
return &SlideWindowLocker{
threads: threads,
window: window,
}
}
// 能够获取第n+x个锁的前提是第n个锁已被解锁
func (sw *SlideWindowLocker) Lock(i int) *sync.Mutex {
mux := sw.window[i % sw.threads]
mux.Lock()
return mux
}
func (sw *SlideWindowLocker) GetLock(i int) *sync.Mutex {
mux := sw.window[i % sw.threads]
return mux
}
应用滑动窗口来管制并发的多线程下载代码就能够实现了
至此,咱们就实现了一个多线程下载的办法
// 滑动窗口多线程下载文件
// 线程数为j
// 下载第n+j个分片时要求第n个分片已实现,否则期待第n个分片实现
func downloadWithMultiThreadsV3(url string, headers map[string]string, filePath string, threads int, size int64) error {
var pieceSize int64 = 16 * 1024 *1024
totalPieceNum := (size+1) / pieceSize
var pieceNum int64 = 0
wg := sync.WaitGroup{}
window := NewSlideWindow(threads)
for ; pieceNum <= totalPieceNum; pieceNum++{
mux := window.Lock(int(pieceNum))
wg.Add(1)
go func(mux *sync.Mutex, pieceNum int64, pieceSize int64, headers map[string]string) {
defer mux.Unlock()
defer wg.Done()
if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {
panic(err)
}
}(mux, pieceNum, pieceSize, copyHeader(headers))
}
wg.Wait()
return nil
}
多线程下载校验md5的实现
对于滑动窗口管制并发下载也稍作调整,
每次循环下载以后分片时间接开启多个协程下载之后的分片,应用mux管制并发,应用map来记录下载实现状况
以后下载实现后立刻将以后分片投入计算hash
至此实现了多线程下载状况下校验全文md5,同时又缩小磁盘i/o和计算工夫。
例如其余的sha256等校验(镜像文件应用该校验)也能够间接减少相应的writer来实现,校验文件是否统一
var finishMap = sync.Map{}
// 多线程下载并计算md5
func downloadWithMultiThreadsV4(url string, headers map[string]string, filePath string, threads int, size int64) error {
var pieceSize int64 = 16 * 1024 *1024
totalPieceNum := (size+1) / pieceSize
var pieceNum int64 = 0
md5Hash := md5.New()
window := NewSlideWindow(threads)
for ; pieceNum <= totalPieceNum; pieceNum++{
// 并发下载
for i := 1; i < threads && int(pieceNum)+i<int(totalPieceNum);i++{
go downloadWithPieceWithMux(window.GetLock(int(pieceNum)+i), url, copyHeader(headers), filePath, pieceNum, pieceSize)
}
downloadWithPieceWithMux(window.GetLock(int(pieceNum)), url, copyHeader(headers), filePath, pieceNum, pieceSize)
fo, _ := os.Open(filePath)
fo.Seek(pieceNum * pieceSize, 0)
io.CopyN(md5Hash, fo, pieceSize)
}
fmt.Sprintf("%x", md5Hash.Sum(nil))
return nil
}
本处子协程下载分片的谬误间接panic仅为了缩小篇幅,理论中调用子协程去执行工作还须要去获取执行后果,获取谬误状况并加以解决。