redigo连接池的使用

前言

最近使用redigo的时候遇到了并发问题,于是想起了redigo并发是否存在安全性的问题,查了下源码,发现可以用连接池解决,简单介绍一下。

conn

redigo实现了不止一种的Conn对象,一般初次使用,会用redis.Dial()获取一条连接对象。
它是在conn.go中定义的对象。

// conn is the low-level implementation of Conn
type conn struct {// Sharedmu      sync.Mutexpending interr     errorconn    net.Conn// ReadreadTimeout time.Durationbr          *bufio.Reader// WritewriteTimeout time.Durationbw           *bufio.Writer// Scratch space for formatting argument length.// '*' or '$', length, "\r\n"lenScratch [32]byte// Scratch space for formatting integers and floats.numScratch [40]byte
}

它实际上是一种低水平的实现,在遇到多协程需要并发操作时是不能用它的 :

[外链图片转存失败(img-nrxuA0pv-1563279188074)(./1563275388184.png)]

连接池

连接池其实就是一种线程池模型,预先开辟并申请一定数量的资源放在池子中,用的时候取一条出来,用完在放回去。
不清楚线程池的可以看看我以前写的c++实现的线程池文章了解下。

接着看看pool,pool的Get()方法可以获取一条Conn对象,它是activeConn对象,不同于上面的conn,它也实现了Conn接口。

// NewPool creates a new pool.
//
// Deprecated: Initialize the Pool directory as shown in the example.
func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {return &Pool{Dial: newFn, MaxIdle: maxIdle}
}// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {pc, err := p.get(nil)if err != nil {return errorConn{err}}return &activeConn{p: p, pc: pc}
}// Close releases the resources used by the pool.
func (p *Pool) Close() error {....
type activeConn struct {p     *Poolpc    *poolConnstate int
}
// Conn represents a connection to a Redis server.
type Conn interface {// Close closes the connection.Close() error// Err returns a non-nil value when the connection is not usable.Err() error// Do sends a command to the server and returns the received reply.Do(commandName string, args ...interface{}) (reply interface{}, err error)// Send writes the command to the client's output buffer.Send(commandName string, args ...interface{}) error// Flush flushes the output buffer to the Redis server.Flush() error// Receive receives a single reply from the Redis serverReceive() (reply interface{}, err error)
}

例程

connection pool exhausted

另外pool的Coon由两种状态Idle和Active,使用中的连接是Active状态超时(IdleTimeout)后变为Idle状态,idle状态的连接数目最多为MaxIdle条,active状态的连接数最多为MaxActive条。

package mainimport("fmt""time""./redigo/redis"
)func TestPing(){conn := redisPool.Get()res, _ := conn.Do("ping")fmt.Println(res.(string))conn.Close()
}func newRedisPool(addr string) *redis.Pool {return &redis.Pool{MaxIdle: 1,MaxActive: 2,IdleTimeout: 6 * time.Second,// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },}
}var (redisPool *redis.PoolredisServer = "127.0.0.1:6379"
)func TestPoolMaxIdle() {go func(){fmt.Println("go---1---")conn := redisPool.Get()fmt.Println("go---1---: Get Conn")defer conn.Close()res, err := conn.Do("ping")if err != nil {fmt.Println("go---1---error:", err)return}fmt.Println("go---1---:", res.(string))time.Sleep(3*time.Second)}()go func(){fmt.Println("go---2---")conn := redisPool.Get()fmt.Println("go---2---: Get Conn")defer conn.Close()res, err := conn.Do("ping")if err != nil {fmt.Println("go---2---error:", err)return}fmt.Println("go---2---:", res.(string))time.Sleep(3*time.Second)}()go func(){fmt.Println("go---3---")conn := redisPool.Get()fmt.Println("go---3---: Get Conn")defer conn.Close()time.Sleep(5*time.Second)res, err := conn.Do("ping")if err != nil {fmt.Println("go---3---error:", err)return}fmt.Println("go---3---:", res.(string))time.Sleep(3*time.Second)}()
}func main()  {redisPool = newRedisPool(redisServer)fmt.Println("-------------PoolStats------------")fmt.Println("Active/Idle:", redisPool.ActiveCount(), ":", redisPool.IdleCount())fmt.Println("-------------TestPing------------")TestPing()fmt.Println("-------------PoolStats------------")fmt.Println("Active/Idle:", redisPool.ActiveCount(), ":", redisPool.IdleCount())time.Sleep(8*time.Second)fmt.Println("-------------PoolStats------------")fmt.Println("Active/Idle:", redisPool.ActiveCount(), ":", redisPool.IdleCount())fmt.Println("-------------TestPoolMaxIdle------------")TestPoolMaxIdle()time.Sleep(1*time.Second)fmt.Println("-------------PoolStats------------")fmt.Println("Active/Idle:", redisPool.ActiveCount(), ":", redisPool.IdleCount())time.Sleep(20*time.Second)fmt.Println("-------------PoolStats------------")fmt.Println("Active/Idle:", redisPool.ActiveCount(), ":", redisPool.IdleCount())
}

[外链图片转存失败(img-e9krZj69-1563279188080)(./1563279050566.png)]