redigo对于连接池支持稍弱
连接池
应用程序调用Get方法从池中获取连接,并使用连接的Close方法将连接的资源返回到池。
提供方法:
func NewPool
func (*Pool) ActiveCount
func (*Pool) Close
func (*Pool) Get
func (*Pool) GetContext
func (*Pool) IdleCount
func (*Pool) Stats
例如:
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}
var (
pool *redis.Pool
redisServer = flag.String("127.0.0.1", ":6379", "")
)
func main() {
flag.Parse()
pool = newPool(*redisServer)
...
}
应用程序使用如下:
func serveHome(w http.ResponseWriter, r *http.Request) {
conn := pool.Get()
defer conn.Close()
...
}
使用Dial功能验证与AUTH命令的连接,或使用SELECT命令选择数据库
pool := &redis.Pool{
// Other pool configuration not shown in this example.
Dial: func () (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if _, err := c.Do("AUTH", password); err != nil {
c.Close()
return nil, err
}
if _, err := c.Do("SELECT", db); err != nil {
c.Close()
return nil, err
}
return c, nil
},
}
使用TestOnBorrow函数检查空闲连接的运行状况
pool := &redis.Pool{
// Other pool configuration not shown in this example.
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
示例:
package main
import (
"fmt"
"github.com/gomodule/redigo/redis"
"time"
"flag"
)
var (
pool *redis.Pool
//redisServer = flag.String("127.0.0.1", ":6379", "")
)
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}
func Get() redis.Conn {
return pool.Get()
}
func main() {
flag.Parse()
pool = newPool("127.0.0.1:6379")
connections := pool.Get()
defer connections.Close()
set_res, err := connections.Do("SET", "new_test", "redigo")
if err != nil {
fmt.Println("err while set key :", err)
}else {
fmt.Println(set_res)
}
is_exists, err := redis.Bool(connections.Do("EXISTS", "new_test"))
if err != nil {
fmt.Println(err)
} else {
fmt.Println(is_exists)
}
get_res, err := redis.String(connections.Do("GET", "new_test"))
if err != nil {
fmt.Println("get err:", err)
} else {
fmt.Println(get_res)
}
}
输出:
OK
true
redigo
go-redis 是一个 Go 语言实现的 Redis 客户端,既然是网络服务的客户端,为了高效利用有限资源,避免重复创建和销毁网络连接,就必需对其进行管理。而资源管理又是编程领域中的一个重点难点,抱着对是否能利用 Go 语言语法简洁的特点来优雅实现连接池的好奇,笔者决定阅读并分析 go-redis 连接池部分的源码,一探究竟。以下是对源码的分析,分为接口与结构体、连接池管理、建立与关闭连接、获取与放回连接、监控统计等5大部分,源码链接。
连接结构体:
type Conn struct {
netConn net.Conn // 基于 tcp 的网络连接
rd *proto.Reader // 根据 Redis 通信协议实现的 Reader
wr *proto.Writer // 根据 Redis 通信协议实现的 Writer
Inited bool // 是否完成初始化
pooled bool // 是否放进连接池
createdAt time.Time // 创建时间
usedAt int64 // 使用时间,atomic
}
连接池接口:
type Pooler interface {
NewConn(context.Context) (*Conn, error) // 创建连接
CloseConn(*Conn) error // 关闭连接
Get(context.Context) (*Conn, error) // 获取连接
Put(*Conn) // 放回连接
Remove(*Conn, error) // 移除连接
Len() int // 连接池长度
IdleLen() int // 空闲连接数量
Stats() *Stats // 连接池统计
Close() error // 关闭连接池
}
连接池结构体:
type ConnPool struct {
opt *Options // 连接池配置
dialErrorsNum uint32 // 连接错误次数,atomic
lastDialErrorMu sync.RWMutex // 上一次连接错误锁,读写锁
lastDialError error // 上一次连接错误
queue chan struct{} // 工作连接队列
connsMu sync.Mutex // 连接队列锁
conns []*Conn // 连接队列
idleConns []*Conn // 空闲连接队列
poolSize int // 连接池大小
idleConnsLen int // 空闲连接队列长度
stats Stats // 连接池统计
_closed uint32 // 连接池关闭标志,atomic
closedCh chan struct{} // 通知连接池关闭通道
}
初始化
var _ Pooler = (*ConnPool)(nil)
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
closedCh: make(chan struct{}),
}
p.checkMinIdleConns()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
return p
}
- 创建连接池,传入连接池配置选项参数 opt,工厂函数根据 opt 创建连接池实例。连接池主要依靠以下四个数据结构实现管理和通信:
- queue:存储工作连接的缓冲通道
- conns:存储所有连接的切片
- idleConns:存储空闲连接的切片
- closed:用于通知所有协程连接池已经关闭的通道
- 检查连接池的空闲连接数量是否满足最小空闲连接数量要求,若不满足,则创建足够的空闲连接。
- 若连接池配置选项规定了空闲连接超时和检查空闲连接频率,则开启一个清理空闲连接的协程。
关闭
func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
return ErrClosed
}
close(p.closedCh)
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
p.conns = nil
p.poolSize = 0
p.idleConns = nil
p.idleConnsLen = 0
p.connsMu.Unlock()
return firstErr
}
- 原子性检查连接池是否已经关闭,若没关闭,则将关闭标志置为1
- 关闭 closedCh 通道,连接池中的所有协程都可以通过判断该通道是否关闭来确定连接池是否已经关闭。
- 连接队列锁上锁,关闭队列中的所有连接,并置空所有维护连接池状态的数据结构,解锁。
过滤
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if fn(cn) {
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
}
p.connsMu.Unlock()
return firstErr
}
实质上是遍历连接池中的所有连接,并调用传入的 fn 过滤函数作用在每个连接上,过滤出符合业务要求的连接。
清理
func (p *ConnPool) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// It is possible that ticker and closedCh arrive together,
// and select pseudo-randomly pick ticker case, we double
// check here to prevent being executed after closed.
if p.closed() {
return
}
_, err := p.ReapStaleConns()
if err != nil {
internal.Logger.Printf("ReapStaleConns failed: %s", err)
continue
}
case <-p.closedCh:
return
}
}
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
p.getTurn()
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()
p.freeTurn()
if cn != nil {
_ = p.closeConn(cn)
n++
} else {
break
}
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
return n, nil
}
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
p.removeConn(cn)
return cn
}
- 开启一个用于检查并清理过期连接的 goroutine 每隔 frequency 时间遍历检查连接池中是否存在过期连接,并清理。
- 创建一个时间间隔为 frequency 的计时器,在连接池关闭时关闭该计时器
- 循环判断计时器是否到时和连接池是否关闭
- 移除空闲连接队列中的过期连接
建立连接
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
cn, err := p.dialConn(ctx, pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
if pooled {
// If pool is full remove the cn on next Put.
if p.poolSize >= p.opt.PoolSize {
cn.pooled = false
} else {
p.poolSize++
}
}
p.connsMu.Unlock()
return cn, nil
}
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
return nil, p.getLastDialError()
}
netConn, err := p.opt.Dialer(ctx)
if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
go p.tryDial()
}
return nil, err
}
cn := NewConn(netConn)
cn.pooled = pooled
return cn, nil
}
func (p *ConnPool) tryDial() {
for {
if p.closed() {
return
}
conn, err := p.opt.Dialer(context.Background())
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second)
continue
}
atomic.StoreUint32(&p.dialErrorsNum, 0)
_ = conn.Close()
return
}
}
创建连接流程图:
newConn流程图.png
DialConn流程图.png
移除与关闭连接
func (p *ConnPool) Remove(cn *Conn, reason error) {
p.removeConnWithLock(cn)
p.freeTurn()
_ = p.closeConn(cn)
}
func (p *ConnPool) CloseConn(cn *Conn) error {
p.removeConnWithLock(cn)
return p.closeConn(cn)
}
func (p *ConnPool) removeConnWithLock(cn *Conn) {
p.connsMu.Lock()
p.removeConn(cn)
p.connsMu.Unlock()
}
func (p *ConnPool) removeConn(cn *Conn) {
for i, c := range p.conns {
if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...)
if cn.pooled {
p.poolSize--
p.checkMinIdleConns()
}
return
}
}
}
func (p *ConnPool) closeConn(cn *Conn) error {
if p.opt.OnClose != nil {
_ = p.opt.OnClose(cn)
}
return cn.Close()
}
连接池无论移除还是关闭连接,底层调用的都是 removeConnWithLock 函数。removeConnWithLock 函数的工作流程如下:
- 连接队列上锁
- 遍历连接队列找到要关闭的连接,并将其移除出连接队列
- 更新连接池统计数据
- 检查连接池最小空闲连接数量
- 连接队列解锁
- 关闭连接,先执行关闭连接时的回调函数(创建连接池时的配置选项传入),再关闭连接
获取
// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
err := p.waitTurn(ctx)
if err != nil {
return nil, err
}
for {
p.connsMu.Lock()
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
break
}
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.newConn(ctx, true)
if err != nil {
p.freeTurn()
return nil, err
}
return newcn, nil
}
func (p *ConnPool) getTurn() {
p.queue <- struct{}{}
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
select {
case p.queue <- struct{}{}:
return nil
default:
}
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return ctx.Err()
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return nil
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return ErrPoolTimeout
}
}
func (p *ConnPool) freeTurn() {
<-p.queue
}
func (p *ConnPool) popIdle() *Conn {
if len(p.idleConns) == 0 {
return nil
}
idx := len(p.idleConns) - 1
cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx]
p.idleConnsLen--
p.checkMinIdleConns()
return cn
}
获取连接流程图:
Get流程图.png
放回
func (p *ConnPool) Put(cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf("Conn has unread data")
p.Remove(cn, BadConnError{})
return
}
if !cn.pooled {
p.Remove(cn, nil)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
p.freeTurn()
}
- 检查连接中是否还有数据没被读取,若有,移除连接并返回 BadConnError
- 判断连接是否已经放入连接池中,若无,直接移除连接
- 连接队列上锁,将该连接加入空闲连接队列中,连接队列解锁,工作连接通道移除一个元素
监控统计对调整连接池配置选项,优化连接池性能有很大的帮助。
Dial 错误统计
func (p *ConnPool) setLastDialError(err error) {
p.lastDialErrorMu.Lock()
p.lastDialError = err
p.lastDialErrorMu.Unlock()
}
func (p *ConnPool) getLastDialError() error {
p.lastDialErrorMu.RLock()
err := p.lastDialError
p.lastDialErrorMu.RUnlock()
return err
}
由于一般情况下,连接错误记录是读多写少的,所以采用读写锁来保证该记录的并发安全(读写锁在该场景下性能更佳)。
状态统计
// Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
n := len(p.conns)
p.connsMu.Unlock()
return n
}
// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
p.connsMu.Lock()
n := p.idleConnsLen
p.connsMu.Unlock()
return n
}
func (p *ConnPool) Stats() *Stats {
idleLen := p.IdleLen()
return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
- func (p *ConnPool) Len() int {...}:返回连接池连接数量总数
- func (p *ConnPool) IdleLen() int {...}:返回连接池空闲连接数量
- Stats: Hits 连接池命中空闲连接次数 Misses 连接池没有空闲连接可用次数 Timeouts 请求连接等待超时次数 TotalConns 连接池总连接数量 IdleConns 连接池空闲连接数量 StaleConns 移除过期连接数量