断点续传

​ 断点续传是一种人为切分传输文件,采用多线程进行部分上传/下载操作的文件传输技术。当网络出现状况,文件传输中断的时候,用户可以从已经上传/下载的部分继续进行传输而不必从头开始,减少资源和时间消耗。

​ 通俗的讲,以下载为例:当我进行下载的时候,我并不会一次性的去请求所有的资源,而是用多个线程,每个线程去请求一部分资源。当所有文件下载完成后,将它们拼接成一个完整的文件并对它做完整性校验,如果通过的话,那我们这个资源便成功下载了的。如果要打个比方的话,漂亮国最近完成了总统大选,A洲有共计500,000张选票需要统计,我们并不会让计票员A去一个人清点完所有的选票,而是派出一个大的计票团队前往每一个选举站点,清点完每一个站点的选票,然后统计往上汇总。就算计票员A记到一半跑路了,我们也不必让计票员B重新开始所有的清点,只需要让B去A负责的站点重新清点就可以了。这样一来,时间和金钱就都被节省下来了。断点续传就是这样的道理。

断点续传原理解析

Accept-RangesRangeContent-Range

Accept-Ranges

RFC 7233Accept-Ranges
Accept-Ranges: bytes
Accept-Ranges: none
nonebytes

Range

RFC 7233206 Partial Content416 Range Not Satisfiable200
Range: <unit>=<range-start>-
Range: <unit>=<range-start>-<range-end>
Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end>
Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end>, <range-start>-<range-end>
unitrange-startrange-end

Content-Range

RFC 7233
Content-Range: <unit> <range-start>-<range-end>/<size>
Content-Range: <unit> <range-start>-<range-end>/*
Content-Range: <unit> */<size>
unitrange-startrange-endsize

Go 实现

类型定义

首先我们需要一个文件下载的struct,这个struct会包含文件大小,文件路径, 文件名称, 下载线程数,下载链接和文件切片类型的数组,文件切片包含其切片序号,起讫位置,下载内容,和完成标识。

//filePart 文件切片
type filePart struct {
    Index int    //文件切片的序号
    From  int64  //开始位置
    To    int64  //结束位置
    Data  []byte //下载内容
    Done  bool   //标注切片是否下载完成
}

//FileDownloader 文件下载器
type FileDownloader struct {
    fileSize       int64//文件大小
    url            string//链接地址
    outputFileName string//文件名称
    totalPart      int //下载线程
    outputDir      string//文件路径
    doneFilePart   []filePart//切片
}

流程方法

创建下载器

文件下载器创建函数,参数为链接地址,文件名称,文件路径和下载线程数,返回一个文件下载器,默认文件名称为其在服务器中的名称,文件路径为当前工作路径。

//NewFileDownloader .
func NewFileDownloader(url, outputFileName, outputDir string, totalPart int) *FileDownloader {
    if outputDir == "" {
        wd, err := os.Getwd() //获取当前工作目录
        if err != nil {
            log.Println(err)
        }
        outputDir = wd
    }
    return &FileDownloader{
        fileSize:       0,
        url:            url,
        outputFileName: outputFileName,
        outputDir:      outputDir,
        totalPart:      totalPart,
        doneFilePart:   make([]filePart, totalPart),
    }
}

获取链接Head响应

通过Head请求,获取相应链接是否支持范围请求,其文件资源大小和文件名称和MD5校验码

// getNewRequest 创建一个request
func (d FileDownloader) getNewRequest(method string) (*http.Request, error) {
    r, err := http.NewRequest(
        method,
        d.url,
        nil,
    )
    if err != nil {
        return nil, err
    }
    return r, nil
}

//获取传输文件名称
func parseFileInfoFrom(resp *http.Response) string {
    contentDisposition := resp.Header.Get("Content-Disposition")
    if contentDisposition != "" {
        _, params, err := mime.ParseMediaType(contentDisposition)

        if err != nil {
            panic(err)
        }
        return params["filename"]
    }
    filename := filepath.Base(resp.Request.URL.Path)
    return filename
}

//head 获取要下载的文件的基本信息(header) 使用HTTP Method Head
func (d *FileDownloader) head() (int64, error, string) {
    //TODO head request
    r, err := d.getNewRequest("HEAD") 
    r = setNewHeader(r)

    if err != nil {
        return 0, err, ""
    }
    resp, err := http.DefaultClient.Do(r)
    if err != nil {
        return 0, err, ""
    }
    if resp.StatusCode > 299 {
        return 0, errors.New(fmt.Sprintf("Can't process, response is %v", resp.StatusCode)), ""
    }

    if resp.Header.Get("Accept-Ranges") != "bytes" {
        return 0, errors.New("服务器不支持文件断点续传"), ""
    }

    etag := resp.Header.Get("Etag")

    d.outputFileName = parseFileInfoFrom(resp)
    length, err := strconv.Atoi(resp.Header.Get("Content-Length"))

    return int64(length), err, etag
}

开始下载任务

.tmp
func exists(path string) bool {
    _, err := os.Stat(path) //os.Stat获取文件信息
    if err != nil {
        if os.IsExist(err) {
            return true
        }
        return false
    }
    return true
}

//Run 开始下载任务
func (d *FileDownloader) Run() error {
    fileTotalSize, err, etag := d.head()
    if err != nil {
        return err
    }
    d.fileSize = fileTotalSize

    jobs := make([]filePart, d.totalPart)
    eachSize := fileTotalSize / int64(d.totalPart)

    path := filepath.Join(d.outputDir, d.outputFileName+".tmp")

    tmpFile := new(os.File)

    fByte := make([]byte, d.fileSize)

    if exists(path) {
        tmpFile, err = os.OpenFile(path, os.O_RDWR, 0)
        if err != nil {
            return err
        }
        fByte, err = ioutil.ReadAll(tmpFile)
    } else {
        tmpFile, err = os.Create(path)
    }
    if err != nil {
        return err
    }
    defer tmpFile.Close()

    for i := range jobs {
        jobs[i].Index = i
        if i == 0 {
            jobs[i].From = 0
        } else {
            jobs[i].From = jobs[i-1].To + 1
        }
        if i < d.totalPart-1 {
            jobs[i].To = jobs[i].From + eachSize
        } else {
            //the last filePart
            jobs[i].To = fileTotalSize - 1
        }
    }

    for i, j := range jobs {
        tmpJob := j
        emptyTmp := make([]byte, tmpJob.To-j.From)
        if bytes.Compare(emptyTmp, fByte[tmpJob.From:j.To]) != 0 {
            tmpJob.Data = fByte[j.From : j.To+1]
            tmpJob.Done = true
            d.doneFilePart[tmpJob.Index] = tmpJob
        } else {
            tmpJob.Done = false
        }
        jobs[i] = tmpJob
    }

    var wg sync.WaitGroup
    for _, j := range jobs {
        if !j.Done {
            wg.Add(1)
            go func(job filePart) {
                defer wg.Done()
                err := d.downloadPart(job, tmpFile)
                if err != nil {
                    log.Println("下载文件失败:", err, job)
                }
            }(j)
        }
    }
    wg.Wait()
    return d.checkIntegrity(etag, tmpFile)
}

切片下载资源

传入对象为文件切片和临时文件指针,若下载完成,则将完成部分内容通过WriteAt写入临时文件相应的位置,并将该文件切片标注为Done。

//下载切片
func (d FileDownloader) downloadPart(c filePart, f *os.File) error {
    r, err := d.getNewRequest("GET")
    r = setNewHeader(r)
    if err != nil {
        return err
    }
    log.Printf("开始[%d]下载from:%d to:%d\n", c.Index, c.From, c.To)
    r.Header.Set("Range", fmt.Sprintf("bytes=%v-%v", c.From, c.To))
    resp, err := http.DefaultClient.Do(r)
    if err != nil {
        return err
    }
    if resp.StatusCode > 299 {
        return errors.New(fmt.Sprintf("服务器错误状态码: %v", resp.StatusCode))
    }
    defer resp.Body.Close()

    bs, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }
    if len(bs) != int(c.To-c.From+1) {
    }
    c.Data = bs
    c.Done = true

    d.doneFilePart[c.Index] = c

    _, err = f.WriteAt(bs, int64(c.From))

    if err != nil {
        c.Done = true
    }

    log.Printf("结束[%d]下载", c.Index)
    return err

}

下载文件完整性校验

.tmp
func (d FileDownloader) checkIntegrity(etag string, t *os.File) error {
    log.Println("开始合并文件")

    defer t.Close()
    hash := md5.New()
    totalSize := 0

    for _, s := range d.doneFilePart {
        hash.Write(s.Data)
        totalSize += len(s.Data)
    }

    if int64(totalSize) != d.fileSize {
        return errors.New("文件不完整")
    }

    if hex.EncodeToString(hash.Sum(nil)) != etag[1:len(etag)-1] {
        return errors.New("文件损坏")
    } else {
        log.Println("文件md5校验成功")
    }

    os.Rename(filepath.Join(d.outputDir, d.outputFileName+".tmp"), filepath.Join(d.outputDir, d.outputFileName))

    return nil
}

开始

因涉及到资源的重定向,所以特意加入了重定向后的信息获取

func getRedirectInfo(u, rawCookies, userAgent string) (*http.Response, error) {
    log.Println("获取重定向信息")
    var a *url.URL
    a, _ = url.Parse(u)
    header := http.Header{}

    //header.Add("Cookie", rawCookies)
    header.Add("User-Agent", userAgent)
    request := http.Request{
        Header: header,
        Method: "GET",
        URL:    a,
    }

    client := &http.Client{
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }
    response, err := client.Do(&request)
    if err != nil {
        return nil, err
    }

    return response, nil
}

func setNewHeader(r *http.Request) *http.Request {
    r.Header.Add("User-Agent", userAgent)
    r.Header.Add("Upgrade-Insecure-Requests", "1")
    return r
}
func DownloadFile(URL string) {
    a, _ := getRedirectInfo(URL, "", userAgent)
    location := a.Header.Get("Location")

    startTime := time.Now()

    downloader := NewFileDownloader(location, "", "", 10)
    if err := downloader.Run(); err != nil {
        log.Fatal(err)
    }
    fmt.Printf("\n 文件下载完成耗时: %f second\n", time.Now().Sub(startTime).Seconds())
}

不足之处

此项目完成了对断点传输和多线程下载的支持,但若切片下载未完成,则会直接丢失整个切片的进度而并非记录上该切片的已下载的信息。