业务需求:循环遍历FTP文件夹获取文件后,并发下载FTP服务器的文件到本地,删除远程文件,然后继续下一轮扫描,客户端有读写权限。
代码如下:
package FTP
import (
"errors"
"fmt"
"github.com/jlaffaye/ftp"
"runtime"
"sync"
"time"
)
const (
Data_Size = 1024
)
// FTP 文件对象
type FileSource struct {
entry *ftp.Entry // ftp库的entry对象
path string // 文件的全路径+文件名
}
// EntryHandler 遍历ftp目录时的文件handler
type EntryHandler func(e *ftp.Entry, currentPath string) error
// FTP文件信息
type FtpFile struct {
FileName string //FTP文件名
Path string //FTP文件的全路径+文件名
Type int //FTP文件类型,文件:0, 文件夹:1
Size int //FTP文件大小
}
type Deal struct {
ftp *ftp.ServerConn
fileChan chan interface{}
wg sync.WaitGroup
}
func NewDeal() *Deal {
return &Deal{}
}
func (this *Deal) Init(addr, user, passwd string) error {
var err error
this.ftp, err = ftp.Connect(addr)
if err != nil {
return err
}
err = this.ftp.Login(user, passwd)
if err != nil {
return err
}
this.fileChan = make(chan interface{}, Data_Size)
this.wg = sync.WaitGroup{}
fmt.Println("ftp连接成功")
return nil
}
func (this *Deal) Fini() error {
if this.ftp == nil {
return errors.New("FTP客户端指针为空,注销失败")
}
err := this.ftp.Logout()
if err != nil {
fmt.Println("FTP注销失败,error info:", err)
return err
}
return nil
}
func (this *Deal) Process(addr, user, passwd, rootDir string) error {
err := this.Init(addr, user, passwd)
if err != nil {
fmt.Println("初始化失败")
return err
}
for {
//第一种 在递归目录中获取文件列表,耗时最小
this.walk(rootDir)
//第二种 在递归目录中回调函数中获取文件列表,多级目录耗时大
//this.listfiles(rootDir)
//第三种 在递归目录中回调函数中获取文件列表,多级目录耗时大
//this.walkCall(rootDir, this.Handler)
go func() {
defer this.wg.Done()
for {
select {
case data := <-this.fileChan:
stru := data.(FtpFile)
fmt.Println("文件名:", stru.FileName)
break
case <-time.After(time.Millisecond * 100):
runtime.Gosched() //切换任务
break
}
}
}()
this.wg.Wait()
time.Sleep(time.Second)
}
return nil
}
// 函调函数
func (this *Deal) Handler(e *ftp.Entry, currentPath string) error {
stru := FtpFile{}
stru.FileName = e.Name
stru.Path = currentPath + "//" + e.Name //CKK/20191102/10/17/20191113170257659_1d1d19f4-dd2f-4662-af8c-30658bd1e90.zlib
stru.Type = int(e.Type)
stru.Size = int(e.Size)
select {
case this.fileChan <- stru:
//global.Log.Debug("fileChandata: %v", stru)
default:
fmt.Println("fileChan data chan is full")
time.Sleep(time.Second)
break
}
return nil
}
// 遍历ftp目录,获取文件
func (this *Deal) walk(rootDir string) error {
entries, err := this.ftp.List(rootDir)
if err != nil {
return err
}
for _, entry := range entries {
switch entry.Type {
case ftp.EntryTypeFile:
//正在上传的文件,先不进行下载
if entry.Size != 0 {
stru := FtpFile{}
stru.FileName = entry.Name
stru.Path = rootDir + "//" + entry.Name //CKK/20191102/10/17/20191113170257659_1d1d19f4-dd2f-4662-af8c-30658bd1e90.zlib
stru.Type = int(entry.Type)
stru.Size = int(entry.Size)
if len(this.fileChan) > (Data_Size - 1) {
fmt.Println("管道大小超限,完成本地扫描:", len(this.fileChan))
return nil
}
select {
case this.fileChan <- stru:
//global.Log.Debug("fileChan: %v", stru.FileName)
default:
fmt.Println("fileChan data chan is full")
time.Sleep(time.Second)
return nil
}
}
case ftp.EntryTypeFolder:
this.walk(fmt.Sprintf("%s/%s", rootDir, entry.Name))
default:
}
}
return nil
}
// 遍历ftp目录,回调获取文件
func (this *Deal) walkCall(rootDir string, handler EntryHandler) error {
entries, err := this.ftp.List(rootDir)
if err != nil {
return err
}
for _, entry := range entries {
switch entry.Type {
case ftp.EntryTypeFile:
//正在上传的文件,先不进行下载
if entry.Size != 0 {
handler(entry, rootDir)
}
case ftp.EntryTypeFolder:
this.walkCall(fmt.Sprintf("%s/%s", rootDir, entry.Name), handler)
default:
}
}
return nil
}
// 遍历ftp目录,获取文件
func (this *Deal) listfiles(rootDir string) error {
err := this.walkCall(rootDir, func(entry *ftp.Entry, currentPath string) error {
stru := FtpFile{}
stru.FileName = entry.Name
stru.Path = currentPath + "//" + entry.Name //CKK/20191102/10/17/20191113170257659_1d1d19f4-dd2f-4662-af8c-30658bd1e90.zlib
stru.Type = int(entry.Type)
stru.Size = int(entry.Size)
if len(this.fileChan) > Data_Size {
return nil
}
select {
case this.fileChan <- stru:
fmt.Println("fileChan:", stru.FileName)
default:
fmt.Println("fileChan data chan is full")
time.Sleep(time.Second)
break
}
return nil
})
if err != nil {
return err
}
return nil
}
测试用例:
package FTP
import "testing"
func TestDeal_Process(t *testing.T) {
ftp := NewDeal()
ftp.Process("172.20.32.211:21", "ftptest", "sailing@123", "Face")
err := ftp.Fini()
if err != nil {
return
}
}
运行结果: