go-redis 的代码实现是一个明显的命令模式,命令模式的优点有将调用操作的对象与知道如何实现该操作的对象解耦,增加新的命令不需要修改现有的类。go-redis 支撑单节点,哨兵模式和集群模式,每一种模式对应的命令接口其实几乎一样的,go-redis就是通过命令模式将命令和每一种client的实现解耦的。

redisdb := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379", // use default Addr
    Password: "",               // no password set
    DB:       0,                // use default DB
})
redisdb := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "master",
    SentinelAddrs: []string{":26379"},
})
client := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs:    []string{"redis-cluster.test.com:port"}, //set redis cluster url
        Password: "mypassword",                    //set password
    })

我们以普通client为例进行源码分析,它代码位于:

github.com/go-redis/redis/v8@v8.11.5/redis.go

type Client struct {
  *baseClient
  cmdable
  hooks
  ctx context.Context
}

它通过匿名属性的方式继承了 baseClient,里面包含了链接池的实现。cmdable属性是命令模式的精髓所在。可以看到它其实是一个函数

type cmdable func(ctx context.Context, cmd Cmder) error

但是它比较诡异,它同时也是一个类,因为在go里面函数也是一种基本类型。源码位于github.com/go-redis/redis/v8@v8.11.5/commands.go,以ZRevRange为例,可以看到,这个函数类,在它的属性里面调用了自己。

func (c cmdable) ZRevRange(ctx context.Context, key string, start, stop int64) *StringSliceCmd 
        cmd := NewStringSliceCmd(ctx, "zrevrange", key, start, stop)
        _ = c(ctx, cmd)

那么这个函数是如何初始化的呢?答案是Process方法

func NewClient(opt *Options) *Client {
    c.cmdable = c.Process

它调用了hooks的process方法

func (c *Client) Process(ctx context.Context, cmd Cmder) error {
  return c.hooks.process(ctx, cmd, c.baseClient.process)
}

它的参数是基类的process方法

type baseClient struct {
  opt      *Options
  connPool pool.Pooler
  onClose func() error // hook called when client is closed
}

在基类里面实现请求的收发:

func (c *baseClient) process(ctx context.Context, cmd Cmder) error 
        retry, err := c._process(ctx, cmd, attempt)
        err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
              err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
      return writeCmd(wr, cmd)
             return wr.WriteArgs(cmd.Args())
             
       err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
 func (cmd *baseCmd) Args() []interface{} {
  return cmd.args
}

withConn从连接池里取出连接,然后发送请求:

func (c *baseClient) withConn(
  ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
            cn, err := c.getConn(ctx)
            err = fn(ctx, cn)
 }
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error)
              cn, err := c._getConn(ctx)
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error)
                cn, err := c.connPool.Get(ctx)
                if err := c.initConn(ctx, cn); err != nil {

连接池的代码位于:

github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool.go

type Pooler interface {
  NewConn(context.Context) (*Conn, error)
  CloseConn(*Conn) error	

我们回来看下hooks ,它是一个数组

type hooks struct {
  hooks []Hook
}

它在调用fn之前,调用了所有的前置钩子,调用完成后调用了后置钩子,我们可以在钩子中,对调用redis前后进行一些切面处理。

func (hs hooks) process()
          ctx, retErr = hs.hooks[hookIndex].BeforeProcess(ctx, cmd)
          retErr = fn(ctx, cmd)
          if err := hs.hooks[hookIndex].AfterProcess(ctx, cmd); err != nil {

hook的process方法传入以Cmder,它就是最终要执行的命令

func (hs hooks) process(
  ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error,
) error
type Hook interface {
  BeforeProcess(ctx context.Context, cmd Cmder) (context.Context, error)
  AfterProcess(ctx context.Context, cmd Cmder) error


  BeforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error)
  AfterProcessPipeline(ctx context.Context, cmds []Cmder) error
}

Cmder是一个接口

type Cmder interface {
  Name() string
  FullName() string
  Args() []interface{}
  String() string
  stringArg(int) string
  firstKeyPos() int8
  SetFirstKeyPos(int8)
  readTimeout() *time.Duration
  readReply(rd *proto.Reader) error
  SetErr(error)
  Err() error
}

每一个具体命令都实现了上述接口,继承了baseCmd

type IntCmd struct {
  baseCmd
  val int64
}
type baseCmd struct {
  ctx    context.Context
  args   []interface{}
  err    error
  keyPos int8

  _readTimeout *time.Duration
}

以Result()接口为例

func (cmd *StringSliceCmd) Result() ([]string, error) 
      return cmd.Val(), cmd.Err()

StringSliceCmd类似:

type StringSliceCmd struct {
  baseCmd
  val []string
}