在分析完minio请求的路由后golang 源码分析:minio(part I)路由,我们看下一个文件是如何落盘的,不考虑gateway情况,我们从serverMain开始:

cmd/server-main.go

func serverMain(ctx *cli.Context) {
      handler, err := configureServerHandler(globalEndpoints)
        registerAPIRouter(router)
      newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
      initServer(GlobalContext, newObject)

在注册完路由后,会创建一个object,然后initServer,在initServer里将object赋值给自己的属性:

func initServer(ctx context.Context, newObject ObjectLayer) error 
      setObjectLayer(newObject)

在创建object的时候,有两个分支,我们重点看只有一个endpoint的情况:

cmd/erasure-server-pool.go

func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServerPools) (ObjectLayer, error) 
      commonParityDrives = ecDrivesNoConfig(ep.DrivesPerSet)
      err = storageclass.ValidateParity(commonParityDrives, ep.DrivesPerSet)
      z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i)

cmd/server-main.go

func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error)
      return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
      return newErasureServerPools(ctx, endpointServerPools)

object 存储在全局变量globalObjectAPI里,对应的get和set方法定义在cmd/api-router.go,由于是全局变量,所以需要读锁

func setObjectLayer(o ObjectLayer) {
      globalObjectAPI = o
func newObjectLayerFn() ObjectLayer {
  globalObjLayerMutex.RLock()
  defer globalObjLayerMutex.RUnlock()
  return globalObjectAPI
}

在registerAPIRouter方法内部会初始化api对象,它的一个接口用newObjectLayerFn 来赋值的,这样就实现了object对象和router的关联,相关的操作最终都是调用的object对象的方法来完成的

func registerAPIRouter(router *mux.Router) 
        api := objectAPIHandlers{
    ObjectAPI: newObjectLayerFn,
    CacheAPI:  newCachedObjectLayerFn,
  }

下面以三个路由为例看下文件的操作相关流程

// GetObject - note gzip compression is *not* added due to Range requests.
    router.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
      collectAPIStats("getobject", maxClients(httpTraceHdrs(api.GetObjectHandler))))
router.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
      collectAPIStats("putobjectpart", maxClients(gz(httpTraceHdrs(api.PutObjectPartHandler))))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
router.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
      collectAPIStats("putobject", maxClients(gz(httpTraceHdrs(api.PutObjectHandler)))))

globalObjectAPI 定义在cmd/object-api-common.go

var globalObjectAPI ObjectLayer
    bucketMetaPrefix = "buckets"

和router对应的handler定义在:cmd/object-handlers.go

func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request) 
      objectAPI := api.ObjectAPI()
      api.getObjectHandler(ctx, objectAPI, bucket, object, w, r)

先获取handler getObjectInfo 然后调用handler

func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request)
      getObjectInfo := objectAPI.GetObjectInfo
      _, err = getObjectInfo(ctx, bucket, object, opts)
      getObjectNInfo := objectAPI.GetObjectNInfo
      gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
    
    sendEvent(eventArgs{
    EventName:    event.ObjectAccessedGet,
    BucketName:   bucket,
    Object:       objInfo,
    ReqParams:    extractReqParams(r),
    RespElements: extractRespElements(w),
    UserAgent:    r.UserAgent(),
    Host:         handlers.GetSourceIP(r),
  })

部分上传也一样的

func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request)
      objectAPI := api.ObjectAPI()
      mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
      putObjectPart := objectAPI.PutObjectPart
      partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)

全量上传:

func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) 
      objectAPI := api.ObjectAPI()
      putObject = objectAPI.PutObject
      objInfo, err := putObject(ctx, bucket, object, pReader, opts)
      getObjectInfo := objectAPI.GetObjectInfo
      _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts);

单机版本的minio 实现的存储对象定义在:cmd/fs-v1.go

func NewFSObjectLayer(fsPath string) (ObjectLayer, error) 
      fsPath, err = getValidPath(fsPath);
      fsUUID := mustGetUUID()
       err = initMetaVolumeFS(fsPath, fsUUID);
      rlk, err := initFormatFS(ctx, fsPath)
   
   fs := &FSObjects{
    fsPath:       fsPath,
    metaJSONFile: fsMetaJSONFile,
    fsUUID:       fsUUID,
    rwPool: &fsIOPool{
      readersMap: make(map[string]*lock.RLockedFile),
    },
    nsMutex:       newNSLock(false),
    listPool:      NewTreeWalkPool(globalLookupTimeout),
    appendFileMap: make(map[string]*fsAppendFile),
    diskMount:     mountinfo.IsLikelyMountPoint(fsPath),
  }
      fs.fsFormatRlk = rlk

会创建一个自己的文件系统

func initMetaVolumeFS(fsPath, fsUUID string) error 
      metaBucketPath := pathJoin(fsPath, minioMetaBucket)
       err := os.MkdirAll(metaBucketPath, 0777);
      metaTmpPath := pathJoin(fsPath, minioMetaTmpBucket, fsUUID)
      err := os.MkdirAll(metaTmpPath, 0777);
      metaMultipartPath := pathJoin(fsPath, minioMetaMultipartBucket)
      os.MkdirAll(metaMultipartPath, 0777)
func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) 
        oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
          fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
         err = fs.createFsJSON(object, fsMetaPath)

读取的时候需要加锁

func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error)
      lk := fs.NewNSLock(bucket, object)
      _, err := fs.statBucketDir(ctx, bucket);
      return fs.getObjectInfo(ctx, bucket, object)

最终调用系统调用fsStat获取文件的信息:

func (fs *FSObjects) statBucketDir(ctx context.Context, bucket string) (os.FileInfo, error)
      bucketDir, err := fs.getBucketDir(ctx, bucket)
      st, err := fsStatVolume(ctx, bucketDir)
        fi, err := fsStat(ctx, volume)
func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error)
      fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object))
      fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
      rlk, err := fs.rwPool.Open(fsMetaPath)
      _, rerr := fsMeta.ReadFrom(ctx, rlk.LockedFile)
      fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
      return fsMeta.ToObjectInfo(bucket, object, fi), nil

创建文件的过程中,先在tmp目录下创建文件,等待文件创建完毕后,rename到目标目录,能够尽可能地减少锁冲突:

func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) 
      return fs.putObject(ctx, bucket, object, r, opts)

创建文件的过程中,会先创建meta文件,保存文件的元数据信息:

func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error) 
       fs.statBucketDir(ctx, bucket);
       isObjectDir(object, data.Size()) 
      if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nil
        var wlk *lock.LockedFile
  if bucket != minioMetaBucket {
    bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
    fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
    wlk, err = fs.rwPool.Write(fsMetaPath)
      wlk, err = fs.rwPool.Create(fsMetaPath)
      fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
      bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, data.Size())
      fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
      if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath);
       _, err = fsMeta.WriteTo(wlk);
        if err = jsonSave(lk, m); 
      fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))

cmd/fs-v1-helpers.go

func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, fallocSize int64) (int64, error) 
      if err := checkPathLength(filePath);
      writer, err := lock.Open(filePath, flags, 0666)
      bytesWritten, err := xioutil.Copy(writer, reader)

cmd/object-api-utils.go问及里定义minio文件系统中目录的名字

const (
  // MinIO meta bucket.
    minioMetaBucket = ".minio.sys"
    minioMetaTmpBucket = minioMetaBucket + "/tmp"
    mpartMetaPrefix = "multipart"
    minioMetaMultipartBucket = minioMetaBucket + SlashSeparator + mpartMetaPrefix

cmd/data-usage.go

dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix

cmd/format-fs.go

func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, err error) 
      fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile)
      err := formatFSFixDeploymentID(ctx, fsFormatPath); err != nil
        formatBackend, err := formatMetaGetFormatBackendFS(rlk)
        err = jsonLoad(rlk, format)
        return jsonSave(wlk, format)
      rlk, err := lock.RLockedOpenFile(fsFormatPath)
      formatBackend, err := formatMetaGetFormatBackendFS(rlk)
      wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)

cmd/format-meta.go

formatConfigFile = "format.json"

每一个操作都会发相应的通知,通知是存在一个map里cmd/notification.go

func sendEvent(args eventArgs)
    globalNotificationSys.Send(args)
    func (sys *NotificationSys) Send(args eventArgs) 
      targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
      sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)

internal/event/targetlist.go

type TargetList struct {
  sync.RWMutex
  targets map[TargetID]Target
}

元文件的定义如下:cmd/fs-v1-metadata.go

type fsMetaV1 struct {
  Version string `json:"version"`
  // checksums of blocks on disk.
  Checksum FSChecksumInfoV1 `json:"checksum,omitempty"`
  // Metadata map for current object.
  Meta map[string]string `json:"meta,omitempty"`
  // parts info for current object - used in encryption.
  Parts []ObjectPartInfo `json:"parts,omitempty"`
}

cmd/xl-storage-format-v1.go

type ObjectPartInfo struct {
  ETag       string `json:"etag,omitempty"`
  Number     int    `json:"number"`
  Size       int64  `json:"size"`
  ActualSize int64  `json:"actualSize"`
}

cmd/fs-v1-helpers.go

func fsStatDir(ctx context.Context, statDir string) (os.FileInfo, error)
      fi, err := fsStat(ctx, statDir)

分片上传相对复杂,代码路径在:cmd/fs-v1-multipart.go

func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, e error)
      if _, err := fs.statBucketDir(ctx, bucket); err != nil {
      uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
      _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
      tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"."+mustGetUUID()+"."+strconv.Itoa(partID))
      bytesWritten, err := fsCreateFile(ctx, tmpPartPath, data, data.Size())
      defer fsRemoveFile(ctx, tmpPartPath)
      partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag, data.ActualSize()))
      err = fsSimpleRenameFile(ctx, tmpPartPath, partPath);
      go fs.backgroundAppend(ctx, bucket, object, uploadID)
      fi, err := fsStatFile(ctx, partPath)