断点续传
断点续传是一种人为切分传输文件,采用多线程进行部分上传/下载操作的文件传输技术。当网络出现状况,文件传输中断的时候,用户可以从已经上传/下载的部分继续进行传输而不必从头开始,减少资源和时间消耗。
通俗的讲,以下载为例:当我进行下载的时候,我并不会一次性的去请求所有的资源,而是用多个线程,每个线程去请求一部分资源。当所有文件下载完成后,将它们拼接成一个完整的文件并对它做完整性校验,如果通过的话,那我们这个资源便成功下载了的。如果要打个比方的话,漂亮国最近完成了总统大选,A洲有共计500,000张选票需要统计,我们并不会让计票员A去一个人清点完所有的选票,而是派出一个大的计票团队前往每一个选举站点,清点完每一个站点的选票,然后统计往上汇总。就算计票员A记到一半跑路了,我们也不必让计票员B重新开始所有的清点,只需要让B去A负责的站点重新清点就可以了。这样一来,时间和金钱就都被节省下来了。断点续传就是这样的道理。
断点续传原理解析
Accept-RangesRangeContent-Range
Accept-Ranges
RFC 7233Accept-Ranges
Accept-Ranges: bytes
Accept-Ranges: none
nonebytes
Range
RFC 7233206 Partial Content416 Range Not Satisfiable200
Range: <unit>=<range-start>-
Range: <unit>=<range-start>-<range-end>
Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end>
Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end>, <range-start>-<range-end>
unitrange-startrange-end
Content-Range
RFC 7233
Content-Range: <unit> <range-start>-<range-end>/<size>
Content-Range: <unit> <range-start>-<range-end>/*
Content-Range: <unit> */<size>
unitrange-startrange-endsize
Go 实现
类型定义
首先我们需要一个文件下载的struct,这个struct会包含文件大小,文件路径, 文件名称, 下载线程数,下载链接和文件切片类型的数组,文件切片包含其切片序号,起讫位置,下载内容,和完成标识。
//filePart 文件切片
type filePart struct {
Index int //文件切片的序号
From int64 //开始位置
To int64 //结束位置
Data []byte //下载内容
Done bool //标注切片是否下载完成
}
//FileDownloader 文件下载器
type FileDownloader struct {
fileSize int64//文件大小
url string//链接地址
outputFileName string//文件名称
totalPart int //下载线程
outputDir string//文件路径
doneFilePart []filePart//切片
}
流程方法
创建下载器
文件下载器创建函数,参数为链接地址,文件名称,文件路径和下载线程数,返回一个文件下载器,默认文件名称为其在服务器中的名称,文件路径为当前工作路径。
//NewFileDownloader .
func NewFileDownloader(url, outputFileName, outputDir string, totalPart int) *FileDownloader {
if outputDir == "" {
wd, err := os.Getwd() //获取当前工作目录
if err != nil {
log.Println(err)
}
outputDir = wd
}
return &FileDownloader{
fileSize: 0,
url: url,
outputFileName: outputFileName,
outputDir: outputDir,
totalPart: totalPart,
doneFilePart: make([]filePart, totalPart),
}
}
获取链接Head响应
通过Head请求,获取相应链接是否支持范围请求,其文件资源大小和文件名称和MD5校验码
// getNewRequest 创建一个request
func (d FileDownloader) getNewRequest(method string) (*http.Request, error) {
r, err := http.NewRequest(
method,
d.url,
nil,
)
if err != nil {
return nil, err
}
return r, nil
}
//获取传输文件名称
func parseFileInfoFrom(resp *http.Response) string {
contentDisposition := resp.Header.Get("Content-Disposition")
if contentDisposition != "" {
_, params, err := mime.ParseMediaType(contentDisposition)
if err != nil {
panic(err)
}
return params["filename"]
}
filename := filepath.Base(resp.Request.URL.Path)
return filename
}
//head 获取要下载的文件的基本信息(header) 使用HTTP Method Head
func (d *FileDownloader) head() (int64, error, string) {
//TODO head request
r, err := d.getNewRequest("HEAD")
r = setNewHeader(r)
if err != nil {
return 0, err, ""
}
resp, err := http.DefaultClient.Do(r)
if err != nil {
return 0, err, ""
}
if resp.StatusCode > 299 {
return 0, errors.New(fmt.Sprintf("Can't process, response is %v", resp.StatusCode)), ""
}
if resp.Header.Get("Accept-Ranges") != "bytes" {
return 0, errors.New("服务器不支持文件断点续传"), ""
}
etag := resp.Header.Get("Etag")
d.outputFileName = parseFileInfoFrom(resp)
length, err := strconv.Atoi(resp.Header.Get("Content-Length"))
return int64(length), err, etag
}
开始下载任务
.tmp
func exists(path string) bool {
_, err := os.Stat(path) //os.Stat获取文件信息
if err != nil {
if os.IsExist(err) {
return true
}
return false
}
return true
}
//Run 开始下载任务
func (d *FileDownloader) Run() error {
fileTotalSize, err, etag := d.head()
if err != nil {
return err
}
d.fileSize = fileTotalSize
jobs := make([]filePart, d.totalPart)
eachSize := fileTotalSize / int64(d.totalPart)
path := filepath.Join(d.outputDir, d.outputFileName+".tmp")
tmpFile := new(os.File)
fByte := make([]byte, d.fileSize)
if exists(path) {
tmpFile, err = os.OpenFile(path, os.O_RDWR, 0)
if err != nil {
return err
}
fByte, err = ioutil.ReadAll(tmpFile)
} else {
tmpFile, err = os.Create(path)
}
if err != nil {
return err
}
defer tmpFile.Close()
for i := range jobs {
jobs[i].Index = i
if i == 0 {
jobs[i].From = 0
} else {
jobs[i].From = jobs[i-1].To + 1
}
if i < d.totalPart-1 {
jobs[i].To = jobs[i].From + eachSize
} else {
//the last filePart
jobs[i].To = fileTotalSize - 1
}
}
for i, j := range jobs {
tmpJob := j
emptyTmp := make([]byte, tmpJob.To-j.From)
if bytes.Compare(emptyTmp, fByte[tmpJob.From:j.To]) != 0 {
tmpJob.Data = fByte[j.From : j.To+1]
tmpJob.Done = true
d.doneFilePart[tmpJob.Index] = tmpJob
} else {
tmpJob.Done = false
}
jobs[i] = tmpJob
}
var wg sync.WaitGroup
for _, j := range jobs {
if !j.Done {
wg.Add(1)
go func(job filePart) {
defer wg.Done()
err := d.downloadPart(job, tmpFile)
if err != nil {
log.Println("下载文件失败:", err, job)
}
}(j)
}
}
wg.Wait()
return d.checkIntegrity(etag, tmpFile)
}
切片下载资源
传入对象为文件切片和临时文件指针,若下载完成,则将完成部分内容通过WriteAt写入临时文件相应的位置,并将该文件切片标注为Done。
//下载切片
func (d FileDownloader) downloadPart(c filePart, f *os.File) error {
r, err := d.getNewRequest("GET")
r = setNewHeader(r)
if err != nil {
return err
}
log.Printf("开始[%d]下载from:%d to:%d\n", c.Index, c.From, c.To)
r.Header.Set("Range", fmt.Sprintf("bytes=%v-%v", c.From, c.To))
resp, err := http.DefaultClient.Do(r)
if err != nil {
return err
}
if resp.StatusCode > 299 {
return errors.New(fmt.Sprintf("服务器错误状态码: %v", resp.StatusCode))
}
defer resp.Body.Close()
bs, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if len(bs) != int(c.To-c.From+1) {
}
c.Data = bs
c.Done = true
d.doneFilePart[c.Index] = c
_, err = f.WriteAt(bs, int64(c.From))
if err != nil {
c.Done = true
}
log.Printf("结束[%d]下载", c.Index)
return err
}
下载文件完整性校验
.tmp
func (d FileDownloader) checkIntegrity(etag string, t *os.File) error {
log.Println("开始合并文件")
defer t.Close()
hash := md5.New()
totalSize := 0
for _, s := range d.doneFilePart {
hash.Write(s.Data)
totalSize += len(s.Data)
}
if int64(totalSize) != d.fileSize {
return errors.New("文件不完整")
}
if hex.EncodeToString(hash.Sum(nil)) != etag[1:len(etag)-1] {
return errors.New("文件损坏")
} else {
log.Println("文件md5校验成功")
}
os.Rename(filepath.Join(d.outputDir, d.outputFileName+".tmp"), filepath.Join(d.outputDir, d.outputFileName))
return nil
}
开始
因涉及到资源的重定向,所以特意加入了重定向后的信息获取
func getRedirectInfo(u, rawCookies, userAgent string) (*http.Response, error) {
log.Println("获取重定向信息")
var a *url.URL
a, _ = url.Parse(u)
header := http.Header{}
//header.Add("Cookie", rawCookies)
header.Add("User-Agent", userAgent)
request := http.Request{
Header: header,
Method: "GET",
URL: a,
}
client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
response, err := client.Do(&request)
if err != nil {
return nil, err
}
return response, nil
}
func setNewHeader(r *http.Request) *http.Request {
r.Header.Add("User-Agent", userAgent)
r.Header.Add("Upgrade-Insecure-Requests", "1")
return r
}
func DownloadFile(URL string) {
a, _ := getRedirectInfo(URL, "", userAgent)
location := a.Header.Get("Location")
startTime := time.Now()
downloader := NewFileDownloader(location, "", "", 10)
if err := downloader.Run(); err != nil {
log.Fatal(err)
}
fmt.Printf("\n 文件下载完成耗时: %f second\n", time.Now().Sub(startTime).Seconds())
}
不足之处
此项目完成了对断点传输和多线程下载的支持,但若切片下载未完成,则会直接丢失整个切片的进度而并非记录上该切片的已下载的信息。