MinIO的命令行启动只有2个命令,一个是server、一个是gateway,分别用于启动服务和网关,而整个MinIO的启动是从main.go文件开始的

引入了两个包

_ "github.com/minio/minio/internal/init"
_  "github.com/minio/minio/cmd/gateway

看下对应的init函数:

internal/init/init.go

internal/init/init_darwin_amd64.go

 cpuid.CPU.Disable(cpuid.AVX512F

然而,在gateway里面引入了几个常用的实现

cmd/gateway/gateway.go

      _ "github.com/minio/minio/cmd/gateway/nas"
_ "github.com/minio/minio/cmd/gateway/azure"
_ "github.com/minio/minio/cmd/gateway/s3"
_ "github.com/minio/minio/cmd/gateway/hdfs"
_ "github.com/minio/minio/cmd/gateway/gcs

以s3为例

cmd/gateway/s3/gateway-s3.go

func init() {
const s3GatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
   
   
   minio.RegisterGatewayCommand(cli.Command{
Name: minio.S3BackendGateway,
Usage: "Amazon Simple Storage Service (S3)",
Action: s3GatewayMain,
CustomHelpTemplate: s3GatewayTemplate,
HideHelpCommand: true,
})

调用了RegisterGatewayCommand将对应的命令注册为gatewayCmd的子命令。而对应的处理函数为:

func s3GatewayMain(ctx *cli.Context)
   minio.StartGateway(ctx, &S3{
host: args.First(),
debug: env.Get("_MINIO_SERVER_DEBUG", config.EnableOff) == config.EnableOn,
  })

调用了StartGateway方法,它的实现在cmd/gateway-main.go

func StartGateway(ctx *cli.Context, gw Gateway) 
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
registerSTSRouter(router)
registerAdminRouter(router, false)
registerAPIRouter(router)
      httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(router)), getCert)
newAllSubsystems()
buckets, err := newObject.ListBuckets(GlobalContext)
globalConsoleSrv, err = initConsoleServer()
(globalConsoleSrv.Serve()

注册了STS,Admin和API等router,然后起了一个httpserver,最后调用Serve方法开启端口监听。

          sts路由注册函数实现在cmd/sts-handlers.go

func registerSTSRouter(router *mux.Router)
    stsRouter.Methods(http.MethodPost).MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool {
ctypeOk := wildcard.MatchSimple("application/x-www-form-urlencoded*", r.Header.Get(xhttp.ContentType))
noQueries := len(r.URL.RawQuery) == 0
return ctypeOk && noQueries
}).HandlerFunc(httpTraceAll(sts.AssumeRoleWithSSO))
    
    stsRouter.Methods(http.MethodPost).HandlerFunc(httpTraceAll(sts.AssumeRoleWithClientGrants)).
Queries(stsAction, clientGrants).
Queries(stsVersion, stsAPIVersion).
Queries(stsToken, "{Token:.*}")

           这里定义了sts常见的一些接口

    const (
// STS API version.
stsAPIVersion = "2011-06-15"
stsVersion = "Version"
stsAction = "Action"
stsPolicy = "Policy"

       admin路由定义了一系列后台操作的接口 cmd/admin-router.go

func registerAdminRouter(router *mux.Router, enableConfigOps bool)
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/service").HandlerFunc(gz(httpTraceAll(adminAPI.ServiceHandler))).Queries("action", "{action:.*}")

apirouter 定义了我们真正操作对象存储的接口cmd/api-router.go

func registerAPIRouter(router *mux.Router) 
gz, err := gzhttp.NewWrapper(gzhttp.MinSize(1000), gzhttp.CompressionLevel(gzip.BestSpeed))
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
      
      router.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))

以"/{object:.+}" 为例子,对应的handlerFunc其实被middleware包裹了很多层:cmd/handler-api.go

    func maxClients(f http.HandlerFunc) http.HandlerFunc 
f.ServeHTTP(w, r)

github.com/klauspost/compress@v1.13.6/gzhttp/gzip.go

func NewWrapper(opts ...option) (func(http.Handler) http.HandlerFunc, error) 
h.ServeHTTP(gwcn, r)

cmd/handler-utils.go

func httpTraceAll(f http.HandlerFunc) http.HandlerFunc 
f.ServeHTTP(w, r)

最后调用了headObjectHandler这个接口cmd/object-handlers.go

func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request) 
objectAPI := api.ObjectAPI()
api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)

对应于s3,它的实现在:cmd/s3-zip-handlers.go

func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) 
_, err = getObjectInfo(ctx, bucket, zipPath, opts)
file, err := zipindex.FindSerialized(zipInfo, object)

cmd/object-api-interface.go

type ObjectLayer interface {
// Locking operations on object.
NewNSLock(bucket string, objects ...string) RWLocker
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)

可以看到最终调用的是minio的SDK,去请求远程的s3服务cmd/gateway/s3/gateway-s3.go

func (l *s3Objects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) 
oi, err := l.Client.StatObject(ctx, bucket, object, miniogo.StatObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
})

github.com/minio/minio-go/v7@v7.0.15/core.go

func (c Core) StatObject(ctx context.Context, bucketName, objectName string, opts StatObjectOptions) (ObjectInfo, error) {
return c.statObject(ctx, bucketName, objectName, opts)
}

        接着,我们回到main.go,main函数实现很简单:

func main() {
minio.Main(os.Args)
}

cmd/main.go

func Main(args []string)
if err := newApp(appName).Run(args); err != nil {

创建了一个app对象

func newApp(name string) *cli.App 
commandsTree := trie.NewTrie()
    registerCommand := func(command cli.Command)
commands = append(commands, command)
    commandsTree.Insert(command.Name)
    
    findClosestCommands := func(command string) []string {
    for _, value := range commandsTree.Walk(commandsTree.Root()) {
    registerCommand(serverCmd)
    registerCommand(gatewayCmd)
    app := cli.NewApp()

其中父子命令以前缀树的形式存储:trie/trie.go

func NewTrie() *Trie {
return &Trie{
root: newNode(),
size: 0,
}
}

app对象定义在cli包里cli/app.go

type App struct {
// The name of the program. Defaults to path.Base(os.Args[0])
Name string
func NewApp() *App {

       可以看到在newApp方法里注册了serverCmd和gatewayCmd,首先看下serverCmd,代码位置:cmd/server-main.go

var serverCmd = cli.Command{
Name: "server",
Usage: "start object storage server",
Flags: append(ServerFlags, GlobalFlags...),
Action: serverMain,
CustomHelpTemplate:

对应的处理方法是:

func serverMain(ctx *cli.Context)
      bitrotSelfTest()
      erasureSelfTest()
      compressSelfTest()
globalConsoleSys.SetNodeName(globalLocalNodeName)
newAllSubsystems()
globalNotificationSys = NewNotificationSys(globalEndpoints)
globalBucketMetadataSys = NewBucketMetadataSys()
globalBucketMetadataSys.Reset()
globalBucketMonitor = bandwidth.NewMonitor(GlobalContext, totalNodeCount())
globalConfigSys = NewConfigSys()
globalIAMSys = NewIAMSys()
globalPolicySys = NewPolicySys()
globalLifecycleSys = NewLifecycleSys()
globalBucketSSEConfigSys = NewBucketSSEConfigSys()
globalBucketObjectLockSys = NewBucketObjectLockSys()
globalBucketQuotaSys = NewBucketQuotaSys()
globalBucketVersioningSys = NewBucketVersioningSys()
globalBucketVersioningSys.Reset()
globalBucketTargetSys = NewBucketTargetSys()
globalTierConfigMgr = NewTierConfigMgr()
checkUpdate(getMinioMode())
setMaxResources()
sys.GetMaxThreads()
sys.GetMaxOpenFileLimit()
sys.GetMaxMemoryLimit()
handler, err := configureServerHandler(globalEndpoints)
httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(handler)), getCert)
setHTTPServer(httpServer)
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
initBackgroundExpiry(GlobalContext, newObject)
err = initServer(GlobalContext, newObject); err != nil
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
initDataScanner(GlobalContext, newObject)
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
globalTierJournal, err = initTierDeletionJournal(GlobalContext)
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
setCacheObjectLayer(cacheAPI)
globalConsoleSrv, err = initConsoleServer()
globalConsoleSrv.Serve()

核心思想是注册路由,最后启动http server,而对应的gatewayCmd的实现在

cmd/gateway-main.go

gatewayCmd = cli.Command{
Name: "gateway",
Usage: "start object storage gateway",
Flags: append(ServerFlags, GlobalFlags...),
HideHelpCommand: true,
}

可以看到,它并没有对应的处理方法,很奇怪对吧,那是应为对于不同的对象存储服务,它是以子命令的形式注册进来的,注册的位置在init函数中,也就是前面介绍的注册逻辑。注册函数如下:

func RegisterGatewayCommand(cmd cli.Command) error {
cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
return nil
}

注册完路由会启动一个httpserver,minio对httpserver 进行了简单的包装,代码位置是 internal/http/server.go

func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificateFunc) *Server

路由用的是现成的路由包:github.com/gorilla/mux@v1.8.0/mux.go

func NewRouter() *Router {
return &Router{namedRoutes: make(map[string]*Route)}
}

serverCmd的路由在cmd/routers.go,同样注册了STS,admin和api路由:

func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error) 
registerAdminRouter(router, true)
registerSTSRouter(router)
registerAPIRouter(router)
router.Use(globalHandlers...)

以api router为例看下具体实现:

func registerAPIRouter(router *mux.Router)
routers = append(routers, apiRouter.MatcherFunc(func(r *http.Request, match *mux.RouteMatch) bool
routers = append(routers, apiRouter.Host("{bucket:.+}."+domainName).Subrouter())
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
    
    router.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))

cmd/object-handlers.go

func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request)
api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)

cmd/s3-zip-handlers.go

func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) 
getObjectInfo := objectAPI.GetObjectInfo
_, err = getObjectInfo(ctx, bucket, zipPath, opts)

cmd/object-api-interface.go

type ObjectLayer interface {
// Locking operations on object.
NewNSLock(bucket string, objects ...string) RWLocker

整体路由serverCmd和gatewayCmd实现上是非常相似的。