MinIO的命令行启动只有2个命令,一个是server、一个是gateway,分别用于启动服务和网关,而整个MinIO的启动是从main.go文件开始的
引入了两个包
_ "github.com/minio/minio/internal/init"
_ "github.com/minio/minio/cmd/gateway
看下对应的init函数:
internal/init/init.go
internal/init/init_darwin_amd64.go
cpuid.CPU.Disable(cpuid.AVX512F
然而,在gateway里面引入了几个常用的实现
cmd/gateway/gateway.go
_ "github.com/minio/minio/cmd/gateway/nas"
_ "github.com/minio/minio/cmd/gateway/azure"
_ "github.com/minio/minio/cmd/gateway/s3"
_ "github.com/minio/minio/cmd/gateway/hdfs"
_ "github.com/minio/minio/cmd/gateway/gcs
以s3为例
cmd/gateway/s3/gateway-s3.go
func init() {
const s3GatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
minio.RegisterGatewayCommand(cli.Command{
Name: minio.S3BackendGateway,
Usage: "Amazon Simple Storage Service (S3)",
Action: s3GatewayMain,
CustomHelpTemplate: s3GatewayTemplate,
HideHelpCommand: true,
})
调用了RegisterGatewayCommand将对应的命令注册为gatewayCmd的子命令。而对应的处理函数为:
func s3GatewayMain(ctx *cli.Context)
minio.StartGateway(ctx, &S3{
host: args.First(),
debug: env.Get("_MINIO_SERVER_DEBUG", config.EnableOff) == config.EnableOn,
})
调用了StartGateway方法,它的实现在cmd/gateway-main.go
func StartGateway(ctx *cli.Context, gw Gateway)
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
registerSTSRouter(router)
registerAdminRouter(router, false)
registerAPIRouter(router)
httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(router)), getCert)
newAllSubsystems()
buckets, err := newObject.ListBuckets(GlobalContext)
globalConsoleSrv, err = initConsoleServer()
(globalConsoleSrv.Serve()
注册了STS,Admin和API等router,然后起了一个httpserver,最后调用Serve方法开启端口监听。
sts路由注册函数实现在cmd/sts-handlers.go
func registerSTSRouter(router *mux.Router)
stsRouter.Methods(http.MethodPost).MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool {
ctypeOk := wildcard.MatchSimple("application/x-www-form-urlencoded*", r.Header.Get(xhttp.ContentType))
noQueries := len(r.URL.RawQuery) == 0
return ctypeOk && noQueries
}).HandlerFunc(httpTraceAll(sts.AssumeRoleWithSSO))
stsRouter.Methods(http.MethodPost).HandlerFunc(httpTraceAll(sts.AssumeRoleWithClientGrants)).
Queries(stsAction, clientGrants).
Queries(stsVersion, stsAPIVersion).
Queries(stsToken, "{Token:.*}")
这里定义了sts常见的一些接口
const (
// STS API version.
stsAPIVersion = "2011-06-15"
stsVersion = "Version"
stsAction = "Action"
stsPolicy = "Policy"
admin路由定义了一系列后台操作的接口 cmd/admin-router.go
func registerAdminRouter(router *mux.Router, enableConfigOps bool)
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/service").HandlerFunc(gz(httpTraceAll(adminAPI.ServiceHandler))).Queries("action", "{action:.*}")
apirouter 定义了我们真正操作对象存储的接口cmd/api-router.go
func registerAPIRouter(router *mux.Router)
gz, err := gzhttp.NewWrapper(gzhttp.MinSize(1000), gzhttp.CompressionLevel(gzip.BestSpeed))
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
router.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))
以"/{object:.+}" 为例子,对应的handlerFunc其实被middleware包裹了很多层:cmd/handler-api.go
func maxClients(f http.HandlerFunc) http.HandlerFunc
f.ServeHTTP(w, r)
github.com/klauspost/compress@v1.13.6/gzhttp/gzip.go
func NewWrapper(opts ...option) (func(http.Handler) http.HandlerFunc, error)
h.ServeHTTP(gwcn, r)
cmd/handler-utils.go
func httpTraceAll(f http.HandlerFunc) http.HandlerFunc
f.ServeHTTP(w, r)
最后调用了headObjectHandler这个接口cmd/object-handlers.go
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request)
objectAPI := api.ObjectAPI()
api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)
对应于s3,它的实现在:cmd/s3-zip-handlers.go
func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request)
_, err = getObjectInfo(ctx, bucket, zipPath, opts)
file, err := zipindex.FindSerialized(zipInfo, object)
cmd/object-api-interface.go
type ObjectLayer interface {
// Locking operations on object.
NewNSLock(bucket string, objects ...string) RWLocker
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
可以看到最终调用的是minio的SDK,去请求远程的s3服务cmd/gateway/s3/gateway-s3.go
func (l *s3Objects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error)
oi, err := l.Client.StatObject(ctx, bucket, object, miniogo.StatObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
})
github.com/minio/minio-go/v7@v7.0.15/core.go
func (c Core) StatObject(ctx context.Context, bucketName, objectName string, opts StatObjectOptions) (ObjectInfo, error) {
return c.statObject(ctx, bucketName, objectName, opts)
}
接着,我们回到main.go,main函数实现很简单:
func main() {
minio.Main(os.Args)
}
cmd/main.go
func Main(args []string)
if err := newApp(appName).Run(args); err != nil {
创建了一个app对象
func newApp(name string) *cli.App
commandsTree := trie.NewTrie()
registerCommand := func(command cli.Command)
commands = append(commands, command)
commandsTree.Insert(command.Name)
findClosestCommands := func(command string) []string {
for _, value := range commandsTree.Walk(commandsTree.Root()) {
registerCommand(serverCmd)
registerCommand(gatewayCmd)
app := cli.NewApp()
其中父子命令以前缀树的形式存储:trie/trie.go
func NewTrie() *Trie {
return &Trie{
root: newNode(),
size: 0,
}
}
app对象定义在cli包里cli/app.go
type App struct {
// The name of the program. Defaults to path.Base(os.Args[0])
Name string
func NewApp() *App {
可以看到在newApp方法里注册了serverCmd和gatewayCmd,首先看下serverCmd,代码位置:cmd/server-main.go
var serverCmd = cli.Command{
Name: "server",
Usage: "start object storage server",
Flags: append(ServerFlags, GlobalFlags...),
Action: serverMain,
CustomHelpTemplate:
对应的处理方法是:
func serverMain(ctx *cli.Context)
bitrotSelfTest()
erasureSelfTest()
compressSelfTest()
globalConsoleSys.SetNodeName(globalLocalNodeName)
newAllSubsystems()
globalNotificationSys = NewNotificationSys(globalEndpoints)
globalBucketMetadataSys = NewBucketMetadataSys()
globalBucketMetadataSys.Reset()
globalBucketMonitor = bandwidth.NewMonitor(GlobalContext, totalNodeCount())
globalConfigSys = NewConfigSys()
globalIAMSys = NewIAMSys()
globalPolicySys = NewPolicySys()
globalLifecycleSys = NewLifecycleSys()
globalBucketSSEConfigSys = NewBucketSSEConfigSys()
globalBucketObjectLockSys = NewBucketObjectLockSys()
globalBucketQuotaSys = NewBucketQuotaSys()
globalBucketVersioningSys = NewBucketVersioningSys()
globalBucketVersioningSys.Reset()
globalBucketTargetSys = NewBucketTargetSys()
globalTierConfigMgr = NewTierConfigMgr()
checkUpdate(getMinioMode())
setMaxResources()
sys.GetMaxThreads()
sys.GetMaxOpenFileLimit()
sys.GetMaxMemoryLimit()
handler, err := configureServerHandler(globalEndpoints)
httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(handler)), getCert)
setHTTPServer(httpServer)
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
initBackgroundExpiry(GlobalContext, newObject)
err = initServer(GlobalContext, newObject); err != nil
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
initDataScanner(GlobalContext, newObject)
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
globalTierJournal, err = initTierDeletionJournal(GlobalContext)
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
setCacheObjectLayer(cacheAPI)
globalConsoleSrv, err = initConsoleServer()
globalConsoleSrv.Serve()
核心思想是注册路由,最后启动http server,而对应的gatewayCmd的实现在
cmd/gateway-main.go
gatewayCmd = cli.Command{
Name: "gateway",
Usage: "start object storage gateway",
Flags: append(ServerFlags, GlobalFlags...),
HideHelpCommand: true,
}
可以看到,它并没有对应的处理方法,很奇怪对吧,那是应为对于不同的对象存储服务,它是以子命令的形式注册进来的,注册的位置在init函数中,也就是前面介绍的注册逻辑。注册函数如下:
func RegisterGatewayCommand(cmd cli.Command) error {
cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
return nil
}
注册完路由会启动一个httpserver,minio对httpserver 进行了简单的包装,代码位置是 internal/http/server.go
func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificateFunc) *Server
路由用的是现成的路由包:github.com/gorilla/mux@v1.8.0/mux.go
func NewRouter() *Router {
return &Router{namedRoutes: make(map[string]*Route)}
}
serverCmd的路由在cmd/routers.go,同样注册了STS,admin和api路由:
func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error)
registerAdminRouter(router, true)
registerSTSRouter(router)
registerAPIRouter(router)
router.Use(globalHandlers...)
以api router为例看下具体实现:
func registerAPIRouter(router *mux.Router)
routers = append(routers, apiRouter.MatcherFunc(func(r *http.Request, match *mux.RouteMatch) bool
routers = append(routers, apiRouter.Host("{bucket:.+}."+domainName).Subrouter())
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
router.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))
cmd/object-handlers.go
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request)
api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)
cmd/s3-zip-handlers.go
func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request)
getObjectInfo := objectAPI.GetObjectInfo
_, err = getObjectInfo(ctx, bucket, zipPath, opts)
cmd/object-api-interface.go
type ObjectLayer interface {
// Locking operations on object.
NewNSLock(bucket string, objects ...string) RWLocker
整体路由serverCmd和gatewayCmd实现上是非常相似的。