使用p2p下载的时候有用户问是否中心节点下载完成后,才能开始p2p下载,尤其是几GB 甚至几十GB的大文件。
实际中中心节点的文件是一边下载一边共享给其他节点的,从分片的角度看,下载完第一个分片,全节点共享下载过程就开始了。
本文介绍p2p中心节点作为初始节点下载文件提速的一些优化改进,同时借此分享下golang中的一些并发控制的思考

直接下载

最简单的直接下载,使用默认的http client发起请求,将数据流写入writer即可,实现相关的writer即可实现一边下载一边记录分片信息,从而提供给其他节点下载
社区Dragonfly项目的下载源文件的方法

// 直接下载文件
// writer一边拷入文件会一边计算分片,每完成一个分片便可以共享给其他节点下载
func directDownload(url string, headers map[string]string, writer io.Writer) error {
    req, err := http.NewRequest(http.MethodGet, url, nil)
    if err != nil {
        return err
    }
    for k, v := range headers {
        req.Header.Set(k, v)
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil{
        return err
    }
    defer resp.Body.Close()
    if _, err := io.Copy(writer, resp.Body); err != nil {
        return err
    }
    return nil
}

多线程下载

但是当文件很大,单链接限速时,文件下载较慢,在http下载中通常提供了Ranges:bytes=a-b来指定本次请求下载a~b字节的部分,因此可以利用该功能来做多线程下载
指定线程数,文件大小,实现多线程下载一个文件,这也是golang中基础的并发执行子任务,搭配使用sync.WaitGroup{}来检测所有的子任务是否完成,完成所有的子任务后即可退出

多线程下载在单线程速度有限时提速明显,例如单链接限速5MB/s时,可以通过多线程来加速下载,获得几倍速度,一些开源的百度云下载加速也是多线程加速。

// 文件过大时,每个协程下载的分片依旧很大,中断后重试的成本略高
func downloadWithMultiThreadsV1(url string, headers map[string]string, filePath string, threads int, size int64) error {
    pieceSize := size / int64(threads)
    wg := sync.WaitGroup{}
    for i := 0; i < threads; i++ {
        wg.Add(1)
        go func(pieceNum int64, pieceSize int64, headers map[string]string) {
            defer wg.Done()
            if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {
                panic(err)
            }
        }(int64(i), pieceSize, copyHeader(headers))
    }
    wg.Wait()
    return nil
}

// 下载分片写入文件
func downloadWithPiece(url string, headers map[string]string, filePath string, pieceNum int64, pieceSize int64) error {
    startRange := pieceNum * pieceSize
    endRange := (pieceNum+1)*pieceSize - 1
    byteRanges := fmt.Sprintf("bytes=%d-%d", startRange, endRange)
    headers["Range"] = byteRanges
    fw, err := os.OpenFile(filePath, os.O_WRONLY, os.ModePerm)
    if err != nil {
        return err
    }
    // 打开文件偏移至该写入的位置
    _, err = fw.Seek(startRange, 0)
    if err != nil {
        return err
    }
    err = directDownload(url, headers, fw)
    if err != nil {
        return err
    }
    return nil
}

基于小分片多线程下载

大文件在跨region、跨国传输时,网络波动容易导致链接断开后,直接开固定线程数去均分文件下载失败后需要重试下载的分片很大,重试成本较高。
因此可以分较小的分片去多线程下载,失败后重试成本也较低,实现断点续传也简单。
一个1GB的文件以16MB分片去下载,需要60次,显然像上一个方法那样直接for循环启动去下载会导致协程过多,且源站压力也会大,因此需要控制并发。

直接使用channel传递信号来控制并发个数是比较基础的并发控制手段。
常见的并发编程写法可见《Go语言中文文档》-并发编程
《Go语言高级编程》-常见的并发模式
这样子既实现了多线程下载,在下载单个分片失败后,重试成本也较低。

// 以16MB的大小来下载文件
// 同时下载3个分片
// 各个分片竞争下载速度,会出现下载分片n的时候速度很慢,而后面一直在下载n+3、n+4等导致前部分分片迟迟未完成
func downloadWithMultiThreadsV2(url string, headers map[string]string, filePath string, threads int, size int64) error {
    var pieceSize int64 = 16 * 1024 *1024
    totalPieceNum := (size+1) / pieceSize
    maxThreads := make(chan struct{}, threads)
    var pieceNum  int64 = 0
    wg := sync.WaitGroup{}
    for ; pieceNum <= totalPieceNum; pieceNum++{
        maxThreads <- struct{}{}
        wg.Add(1)
        go func(pieceNum int64, pieceSize int64, headers map[string]string) {
            defer func() {
                <-maxThreads
                wg.Done()
            }()
            if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {
                panic(err)
            }
        }(pieceNum, pieceSize, copyHeader(headers))
    }
    wg.Wait()
    return nil
}

下载文件校验

下载文件常常担心文件是否为原文件,因此下载后需要校验,最常用的为md5,但是文件较大时下载完成后,还要再从磁盘读取文件计算md5,耗时耗力,md5计算大约在mac上单核计算是750MB/s左右,加上从磁盘读取速度就更低了,最好是能一边下一遍校验。
对于单线程下载大文件很容易实现校验,将接收数据流的writer与计算md5的writer组合成一个writer,就可以边下载边校验md5了

// 计算下载的md5hash
func downloadGetHash(url string, headers map[string]string, fw io.Writer) (string, error) {
    md5Hash := md5.New()
    writer := io.MultiWriter(fw, md5Hash)
    req, err := http.NewRequest(http.MethodGet, url, nil)
    if err != nil {
        return "", err
    }
    for k, v := range headers {
        req.Header.Set(k, v)
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil{
        return "", err
    }
    defer resp.Body.Close()

    if _, err := io.Copy(writer, resp.Body); err != nil {
        return "", err
    }
    md5Value := fmt.Sprintf("%x", md5Hash.Sum(nil))
    return md5Value, nil
}

多线程下载校验md5的难点

多线程下载时,分片完成顺序不一致,无法像单线程下载那样简单一边下载一边计算,不用重复从磁盘读取。
当然可以从头开始等待下载完一个分片,然后就去校验一个分片,刚下载好去读取出来校验,可以走文件cache,不会额外占用磁盘i/o也是可以接受的。
然而在使用channel来控制并发的下载过程,可能出现还在等第1个分片下载完成来校验,但是其他线程已经下载1+n个分片去了,这时候等第一个分片下载好去校验md5时,接下来2~1+n个分片可能已经不在cache里了,得走一遍磁盘读取了。

滑动窗口多线程分片下载

各个分片竞争下载速度,会出现下载分片n的时候速度很慢,而后面一直在下载n+3、n+4等导致前部分分片迟迟未完成,导致早期的分片无法尽快下载完成提供给客户端下载,所以我们希望整体是按顺序去下载分片的。
需求变成了x线程下载时,同时最多在下载x个分片,然而n+1,n+2等完成后还要等第n个分片完成才能去下载第n+x个分片,其实就是滑动窗口。
滑动窗口来控制并发可以避免后续竞争资源导致先发的任务迟迟不能完成,不会出现靠前分片远远落后后面的分片,减少了cache淘汰速度

tcp传输中使用了滑动窗口提高吞吐量,根据网络情况来调整窗口大小,可以参考下文

我们在使用多线程下载中使用线程数的窗口大小控制思想就可以了,找了圈golang 滑动窗口,都是针对网关流量的时间滑动窗口实现- -,只能自己实现了
滑动窗口的本质就是在进行第n+x个任务时检测第n个任务是否完成,完成了则可以继续,否则等待
思考一下可以使用map+sync.mux就可以实现一个简单并发控制的滑动窗口

package lock

import "sync"

type SlideWindowLocker struct {
    threads int
    window map[int]*sync.Mutex
}

func NewSlideWindow(threads int) *SlideWindowLocker  {
    window := make(map[int]*sync.Mutex)
    for i:=0;i<threads;i++{
        window[i] = &sync.Mutex{}
    }
    return &SlideWindowLocker{
        threads: threads,
        window:  window,
    }
}

// 可以获取第n+x个锁的前提是第n个锁已被解锁
func (sw *SlideWindowLocker) Lock(i int) *sync.Mutex {
    mux := sw.window[i % sw.threads]
    mux.Lock()
    return mux
}

func (sw *SlideWindowLocker) GetLock(i int) *sync.Mutex {
    mux := sw.window[i % sw.threads]
    return mux
}

使用滑动窗口来控制并发的多线程下载代码就可以实现了
至此,我们就实现了一个多线程下载的方法

// 滑动窗口多线程下载文件
// 线程数为j
// 下载第n+j个分片时要求第n个分片已完成,否则等待第n个分片完成
func downloadWithMultiThreadsV3(url string, headers map[string]string, filePath string, threads int, size int64) error {
    var pieceSize int64 = 16 * 1024 *1024
    totalPieceNum := (size+1) / pieceSize
    var pieceNum  int64 = 0
    wg := sync.WaitGroup{}

    window := NewSlideWindow(threads)
    for ; pieceNum <= totalPieceNum; pieceNum++{
        mux := window.Lock(int(pieceNum))
        wg.Add(1)
        go func(mux *sync.Mutex, pieceNum int64, pieceSize int64, headers map[string]string) {
            defer mux.Unlock()
            defer wg.Done()
            if err := downloadWithPiece(url, headers, filePath, pieceNum, pieceSize); err != nil {
                panic(err)
            }
        }(mux, pieceNum, pieceSize, copyHeader(headers))
    }
    wg.Wait()
    return nil
}

多线程下载校验md5的实现

对于滑动窗口控制并发下载也稍作调整,
每次循环下载当前分片时直接开启多个协程下载之后的分片,使用mux控制并发,使用map来记录下载完成情况
当前下载完成后立即将当前分片投入计算hash

至此完成了多线程下载情况下校验全文md5,同时又减少磁盘i/o和计算时间。

例如其他的sha256等校验(镜像文件使用该校验)也可以直接增加相应的writer来实现,校验文件是否一致

var finishMap = sync.Map{}
// 多线程下载并计算md5
func downloadWithMultiThreadsV4(url string, headers map[string]string, filePath string, threads int, size int64) error {
    var pieceSize int64 = 16 * 1024 *1024
    totalPieceNum := (size+1) / pieceSize
    var pieceNum  int64 = 0

    md5Hash := md5.New()
    window := NewSlideWindow(threads)
    for ; pieceNum <= totalPieceNum; pieceNum++{
        // 并发下载
        for i := 1; i < threads && int(pieceNum)+i<int(totalPieceNum);i++{
            go downloadWithPieceWithMux(window.GetLock(int(pieceNum)+i), url, copyHeader(headers), filePath, pieceNum, pieceSize)
        }
        downloadWithPieceWithMux(window.GetLock(int(pieceNum)), url, copyHeader(headers), filePath, pieceNum, pieceSize)
        fo, _ := os.Open(filePath)
        fo.Seek(pieceNum * pieceSize, 0)
        io.CopyN(md5Hash, fo, pieceSize)
    }
    fmt.Sprintf("%x", md5Hash.Sum(nil))
    return nil
}

本处子协程下载分片的错误直接panic仅为了减少篇幅,实际中调用子协程去执行任务还需要去获取执行结果,获取错误情况并加以处理。