而维持一个连接池,最基本的要求就是要做到:thread safe(线程安全),尤其是在Golang这种特性是goroutine的语言中。
实现简单的连接池
type Pool struct {
m sync.Mutex //保证多个goroutine访问时候,closed的线程安全
res chan io.Closer //连接存储的chan
factory func() (io.Closer,error) //新建连接的工厂方法
closed bool //连接池关闭标志
}
这个简单的连接池,我们利用chan来存储池里的连接。而新建结构体的方法也比较简单:
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size的值太小了。")
}
return &Pool{
factory: fn,
res: make(chan io.Closer, size),
}, nil
}
只需要提供对应的工厂函数和连接池的大小就可以了。
获取连接
那么我们要怎么从中获取资源呢?因为我们内部存储连接的结构是chan,所以只需要简单的select就可以保证线程安全:
//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
select {
case r,ok := <-p.res:
log.Println("Acquire:共享资源")
if !ok {
return nil,ErrPoolClosed
}
return r,nil
default:
log.Println("Acquire:新生成资源")
return p.factory()
}
}
我们先从连接池的res这个chan里面获取,如果没有的话我们就利用我们早已经准备好的工厂函数进行构造连接。同时我们在从res获取连接的时候利用ok先确定了这个连接池是否已经关闭。如果已经关闭的话我们就返回早已经准备好的连接已关闭错误。
关闭连接池
那么既然提到关闭连接池,我们是怎么样关闭连接池的呢?
//关闭资源池,释放资源
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
//关闭通道,不让写入了
close(p.res)
//关闭通道里的资源
for r:=range p.res {
r.Close()
}
}
这边我们需要先进行p.m.Lock()*上锁操作,这么做是因为我们需要对结构体里面的*closed进行读写。需要先把这个标志位设定后,关闭res这个chan,使得Acquire方法无法再获取新的连接。我们再对res这个chan里面的连接进行Close操作。
释放连接
释放连接首先得有个前提,就是连接池还没有关闭。如果连接池已经关闭再往res里面送连接的话就好触发panic。
func (p *Pool) Release(r io.Closer){
//保证该操作和Close方法的操作是安全的
p.m.Lock()
defer p.m.Unlock()
//资源池都关闭了,就省这一个没有释放的资源了,释放即可
if p.closed {
r.Close()
return
}
select {
case p.res <- r:
log.Println("资源释放到池子里了")
default:
log.Println("资源池满了,释放这个资源吧")
r.Close()
}
}
以上就是一个简单且线程安全的连接池实现方式了。我们可以看到的是,现在连接池虽然已经实现了,但是还有几个小缺点:
too many connections
那么我们可以从已经成熟使用的MySQL连接池库和Redis连接池库中看看,它们是怎么解决这些问题的。
Golang标准库的Sql连接池
database/sql/sql.go
db, err := sql.Open("mysql", "xxxx")
的时候,就会打开一个连接池。我们可以看看返回的db的结构体:
type DB struct {
waitDuration int64 // Total time waited for new connections.
mu sync.Mutex // protects following fields
freeConn []*driverConn
connRequests map[uint64]chan connRequest
nextRequest uint64 // Next key to use in connRequests.
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
closed bool
maxIdle int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
maxLifetime time.Duration // maximum amount of time a connection may be reused
cleanerCh chan struct{}
waitCount int64 // Total number of connections waited for.
maxIdleClosed int64 // Total number of connections closed due to idle.
maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}
func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)
获取连接
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
// 先判断db是否已经关闭。
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// 注意检测context是否已经被超时等原因被取消。
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
// 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// 下面的动作相当于往connRequests这个map插入自己的号码牌。
// 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// Timeout the connection request with the context.
select {
case <-ctx.Done():
// context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
case ret, ok := <-req:
// 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
// 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
// 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
复制代码
简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?
释放连接
func(db*DB)putConnDBLocked(dc*driverConn,err error)bool
Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.
我们主要来看看里面重点那几行:
...
// 如果已经超过最大打开数量了,就不需要在回归pool了
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // 删除这个在排队的请求。
if err == nil {
dc.inUse = true
}
// 把连接给这个正在排队的连接。
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
// 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
...
我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。
现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。
值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。
redis Golang实现的Redis客户端
这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。
而它的连接池结构如下
type ConnPool struct {
...
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
poolSize int
idleConnsLen int
stats Stats
_closed uint32 // atomic
closedCh chan struct{}
}
queueconnsidleConnsconnsidleConns
到底连接存在哪里?
新建连接池连接
我们先从新建连接池连接开始看:
func NewConnPool(opt *Options) *ConnPool {
....
p.checkMinIdleConns()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
....
}
初始化连接池的函数有个和前面两个不同的地方。
checkMinIdleConnsgo p.reaper(opt.IdleCheckFrequency)
获取连接
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
//这边和前面sql获取连接函数的流程不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。
err := p.waitTurn(ctx)
if err != nil {
return nil, err
}
//前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。
for {
p.connsMu.Lock()
//从空闲连接里面先获取一个空闲连接。
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
// 没有空闲连接时候直接跳出循环。
break
}
// 判断是否已经过时,是的话close掉了然后继续取出。
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
// 如果没有空闲连接的话,这边就直接新建连接了。
newcn, err := p.newConn(ctx, true)
if err != nil {
// 归还令牌。
p.freeTurn()
return nil, err
}
return newcn, nil
}
cn:=p.popIdle()idleConns
sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。
redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。
而:
func (p *ConnPool) freeTurn() {
<-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
case p.queue <- struct{}{}:
return nil
...
}
就是在靠queue这个chan来维持令牌数量。
conns
新建连接
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
cn, err := p.dialConn(ctx, pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
if pooled {
// 如果连接池满了,会在后面移除。
if p.poolSize >= p.opt.PoolSize {
cn.pooled = false
} else {
p.poolSize++
}
}
p.connsMu.Unlock()
return cn, nil
}
idleConnsconns
归还连接
func (p *ConnPool) Put(cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf("Conn has unread data")
p.Remove(cn, BadConnError{})
return
}
//这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。
if !cn.pooled {
p.Remove(cn, nil)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
//我们可以看到很明显的这个归还号码牌的动作。
p.freeTurn()
}
connsidleConns超卖
等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:
waitTurnqueueidleConnsnewConnconnsputconnsidleConnsnewConnidleConns
我当时疑惑了好久,既然始终都需要获得令牌才能得到连接,令牌数量是定的。为什么还会超卖呢?翻了一下源码,我的答案是:
GetnewConnpooledbool
queue
总结
上面可以看到,连接池的最基本的保证,就是获取连接时候的线程安全。但是在实现诸多额外特性时候却又从不同角度来实现。还是非常有意思的。但是不管存储结构是用chan还是还是slice,都可以很好的实现这一点。如果像sql或者redis那样用slice来存储连接,就得维护一个结构来表示排队等候的效果。
资料下载
点击下方卡片关注公众号,发送特定关键字获取对应精品资料!
回复「电子书」,获取入门、进阶 Go 语言必看书籍。
回复「视频」,获取价值 5000 大洋的视频资料,内含实战项目(不外传)!
回复「路线」,获取最新版 Go 知识图谱及学习、成长路线图。
回复「面试题」,获取四哥精编的 Go 语言面试题,含解析。
回复「后台」,获取后台开发必看 10 本书籍。
对了,看完文章,记得点击下方的卡片。关注我哦~ 👇👇👇
如果您的朋友也在学习 Go 语言,相信这篇文章对 TA 有帮助,欢迎转发分享给 TA,非常感谢!