1. 功能分析
接口中已有Get函数,用于下载HDFS文件,我们可以使用如下代码进行下载操作
fs, err := gowfs.NewFileSystem(gowfs.Configuration{Addr: "localhost:50070", User: "root"})
shell := gowfs.FsShell{FileSystem:fs}
ok, err := shell.Get("hdfs/file/path", "local/file/name")
用以上方式,下载小文件是没有问题的,但当下载大文件(比如1G以上)时,程序运行却出现了卡顿,查看接口中的源码发现:
// Retrieves a remote HDFS file and saves as the specified local file.
func (shell FsShell) Get(hdfsPath, localFile string) (bool, error) {
//创建本地目标文件
file, err := os.Create(localFile)
if err != nil {
return false, err
}
defer file.Close()
//向远程文件读取数据
reader, err := shell.FileSystem.Open(Path{Name: hdfsPath}, 0, 0, 0)
if err != nil {
return false, err
}
//将读到的数据转化为byte数组
data, err := ioutil.ReadAll(reader)
if err != nil {
return false, err
}
defer reader.Close()
//将byte数组写入本地文件
_, err = file.Write(data)
if err != nil {
return false, err
}
file.Sync()
return true, nil
}
此函数将所有数据一次性读取,并加入内存进行写文件操作,使得网络io及缓存大量占用导致程序卡死,显然不适合大文件。我们这里借用Open函数分批对HDFS远程进行按批次处理,使得网络和内存不被瞬间大量占用。
2. 编码实现
package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"time"
"github.com/vladimirvivien/gowfs"
)
func main() {
//与HDFS建立连接
fs, err := gowfs.NewFileSystem(gowfs.Configuration{Addr: "localhost:50070", User: "root"})
if err != nil {
log.Fatal(err)
}
//HDFS文件路径
var path gowfs.Path
path.Name = "/upload/data/test.txt"
//本地文件路径,以只写模式打开
strDstFile := "./test.txt"
fl, err := os.OpenFile(strDstFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("open file fail: %v\n", err)
}
defer fl.Close()
//HDFS文件读取位置
var iOffSet int64 = 0
//HDFS文件读取长度
var readLen int64 = 1024 * 1024 * 16 //一次读取文件长度 16M
readBufSZ := 1024 * 4096 * 16 //文件读取缓冲区 64M
//记录请求读取次数
reqNum := 0
//记录读写消耗的时间
var readCost time.Duration
var writeCost time.Duration
for {
startTime := time.Now()
//读取hdfs文件
rc, err := fs.Open(path, iOffSet, readLen, readBufSZ)
reqNum++
if err != nil {
fmt.Printf("open file fail: %v\n", err)
} else {
fmt.Printf("open %s success!\n", path.Name)
}
//转化为byte数组
rcvdData, err := ioutil.ReadAll(rc)
if err != nil {
fmt.Printf("read file fail: %v", err)
} else {
fmt.Printf("read success! has read %d bytes\n", len(rcvdData))
}
readTime := time.Now()
readCost += readTime.Sub(startTime)
// fmt.Printf("read content: [%s]\n", string(rcvdData[:]))
//写入目标文件
n, err := fl.Write(rcvdData)
if err != nil {
fmt.Printf("write file fail: %v\n", err)
} else if n < len(rcvdData) {
err = io.ErrShortWrite
fmt.Printf("write file fail: %v\n", err)
}
writeTime := time.Now()
writeCost += writeTime.Sub(readTime)
//当读到的数据小于设置的长度时,hdfs文件读取结束
if int64(len(rcvdData)) < readLen {
break
}
//读取下一个缓存
iOffSet += int64(len(rcvdData))
}
fmt.Printf("read file end! read length:[%d] readbuf:[%d] request num: [%d]\n", readLen, readBufSZ, reqNum)
fmt.Printf("read spend time: [%f]s, write spend time [%f]s\n", readCost.Seconds(), writeCost.Seconds())
}