作为公司平台团队的一员,我接触了很多文件处理的场景,比如管理一个通用文件上传中心服务,处理邮件附件,处理和导出大文件。在过去,这项工作要容易得多,因为我们可以完全支配整个服务器。我们可以写入一个文件让它持久化在服务器磁盘上,尽管这个作业所需的资源是非常多的。而现在,你的代码库是在更小的处理单元上发布的,比如 pods 。它的资源是虚拟分配的,并且在许多情况下是有限的,所以你需要知道如何有效地使用它们。实现优雅的处理和解决 OOM 退出问题也许对于那些已经熟悉自由地使用内存的人来说是一个大麻烦。

ReaderWriter

Multipart 文件转发

1ab859ac661e4ddc11b03dff885b68bd.png


reader
r := strings.NewReader("Go is a general-purpose language designed with systems 
programming in mind.")
b, err := ioutil.ReadAll(r)
if err != nil {
   log.Fatal(err)
}
// Playing with your loaded bytes
fmt.Printf("%s", b)
Reader
io.Copy
r := strings.NewReader("some io.Reader stream to be read\n")
if _, err := io.Copy(os.Stdout, r); err != nil {
  log.Fatal(err)
}
// The data have been copied from Reader r to Stdout
Copy

从 src 复制副本到 dst,直到在 src 上到达 EOF 或发生错误。它返回复制的字节数和复制时遇到的第一个错误(如果有的话)。— Go 官方文档

writerreaderCopyWriterReaderwriter
buf := new(bytes.Buffer)
writer := multipart.NewWriter(buf)
defer writer.Close()
part, err := writer.CreateFormFile("file", "textFile.txt")
if err != nil {
   return err
}
file, err := os.Open(name)
if err != nil {
   return err
}
defer file.Close()
if _, err = io.Copy(part, file); err != nil {
   return err
}
http.Post(url, writer.FormDataContentType(), buf)
io.Pipe
r, w := io.Pipe()
go func() {
   fmt.Fprint(w, "some io.Reader stream to be read\n")
   w.Close()
}()
if _, err := io.Copy(os.Stdout, r); err != nil {
   log.Fatal(err)
}
Pipewriterreaderwriterwriter
PipePipeCopy
r, w := io.Pipe()
m := multipart.NewWriter(w)
go func() {
   defer w.Close()
   defer m.Close()
   part, err := m.CreateFormFile("file", "textFile.txt")
   if err != nil {
      return
   }
   file, err := os.Open(name)
   if err != nil {
      return
   }
   defer file.Close()
   if _, err = io.Copy(part, file); err != nil {
      return
   }
}()
http.Post(url, m.FormDataContentType(), r)
os.Open()multipart reader

预取和补偿文件流

b1a55bc2fb4d52b456a68402f46abe12.png

reader
io.TeeReaderwriterTeeReaderreader
var r io.Reader = strings.NewReader("some io.Reader stream to be read\n")
var buf = bytes.NewBufferString("")
r = io.TeeReader(r, buf)
// Everything read from r will be copied to buf.
_, _ = io.ReadAtLeast(r, mimeType, 512)
// Continue to copy the stream to write it to buf, to use buf in the following operation
io.Copy(io.Discard, r)
PipeTeeReaderPipe
reader
package services

import (
   "fmt"
   "io"
)

type prefetchReader struct {
   reader   io.Reader
   prefetch []byte
   size     int
}

func newPrefetchReader(reader io.Reader, prefetch []byte) *prefetchReader {
   return &prefetchReader{
      reader:   reader,
      prefetch: prefetch,
   }
}

func (r *prefetchReader) Read(p []byte) (n int, err error) {
   if len(p) == 0 {
      return 0, fmt.Errorf("empty buffer")
   }
   defer func() {
      r.size += n
   }()
   if len(r.prefetch) > 0 {
      if len(p) >= len(r.prefetch) {
         copy(p, r.prefetch)
         n := len(r.prefetch)
         r.prefetch = nil
         return n, nil
      } else {
         copy(p, r.prefetch[:len(p)])
         r.prefetch = r.prefetch[len(p):]
         return len(p), nil
      }
   }
   return r.reader.Read(p)
}
readerreader

结论

bf16514e95127ccc68a4e0658dda8314.png

readerswriters

原文信息

7c31bb50b5f408e829af33c05274ef72.png

本文永久链接:https://github.com/gocn/translator/blob/master/2022/w04_Large_stream_processing_in_Golang_with_minimal_memory_usage.md

译者:haoheipi

校对:watermelo

想要了解关于 Go 的更多资讯,还可以通过扫描的方式,进群一起探讨哦~

09811c29c01bff45a60681742a1f69c8.png