golang对数据库的请求,抽象出来一套通用的连接池,用go的机制来说,golang只需要提供一个驱动(driver)的interface,底层不同数据库协议,由用户根据自己的数据库实现对应的驱动即可。

本文从源码实现的角度,探索这里的细节以及需要避免的坑,基于1.14代码分析,部分bug在1.15中有修复或优化,这里也会提及。

golang版本:1.14

目录结构说明

└── sql

├── convert.go # 结果行的读取与转换

├── convert_test.go

├── ctxutil.go # 绑定上下文的一些通用方法

├── doc.txt

├── driver # driver 定义来实现数据库驱动所需要的接口

│ ├── driver.go

│ ├── types.go # 数据类型别名和转换

│ └── types_test.go

├── example_cli_test.go

├── example_service_test.go

├── example_test.go

├── fakedb_test.go

├── sql.go # 通用的接口和类型,包括事物,连接等

└── sql_test.go

主要数据结构

1. sql.DB

type DB struct {

// Atomic access only. At top of struct to prevent mis-alignment

// on 32-bit platforms. Of type time.Duration.

waitDuration int64 // 等待新的连接所需要的总时间

connector driver.Connector // 数据库驱动自己实现

// numClosed is an atomic counter which represents a total number of

// closed connections. Stmt.openStmt checks it before cleaning closed

// connections in Stmt.css.

numClosed uint64 // 关闭的连接数

mu sync.Mutex // protects following fields

freeConn []*driverConn

connRequests map[uint64]chan connRequest

nextRequest uint64 // Next key to use in connRequests.

numOpen int // number of opened and pending open connections

// Used to signal the need for new connections

// a goroutine running connectionOpener() reads on this chan and

// maybeOpenNewConnections sends on the chan (one send per needed connection)

// It is closed during db.Close(). The close tells the connectionOpener

// goroutine to exit.

openerCh chan struct{} // 用于通知需要创建新的连接

// resetterCh chan *driverConn // 已废弃

closed bool

dep map[finalCloser]depSet // map[一级对象]map[二级对象]bool,一个外部以来,用于自动关闭

lastPut map[*driverConn]string // stacktrace of last conn's put; debug only

maxIdle int // zero means defaultMaxIdleConns(2); negative means 0

maxOpen int // <= 0 means unlimited

maxLifetime time.Duration // maximum amount of time a connection may be reused

cleanerCh chan struct{} // 用于通知清理过期的连接,maxlife时间改变或者连接被关闭时会通过该channel通知

waitCount int64 // Total number of connections waited for. // 这些状态数据,可以通过db.Stat() 获取

maxIdleClosed int64 // Total number of connections closed due to idle.

maxLifetimeClosed int64 // Total number of connections closed due to max free limit.

stop func() // stop cancels the connection opener and the session resetter.

}

sql.DB不是一个连接,它是数据库的抽象接口,也是整个连接池的句柄,对多个goroutine是并发安全的。它可以根据driver打开关闭数据库连接,管理连接池。这对不同的数据库来说都是一样的。

2. sql.driverConn

// driverConn wraps a driver.Conn with a mutex, to

// be held during all calls into the Conn. (including any calls onto

// interfaces returned via that Conn, such as calls on Tx, Stmt,

// Result, Rows)

type driverConn struct {

db *DB

createdAt time.Time

sync.Mutex // guards following

ci driver.Conn // 由不同的驱动自己实现,对应一条具体的数据库连接

needReset bool // The connection session should be reset before use if true.

closed bool // 当前连接的状态,是否已经关闭

finalClosed bool // ci.Close has been called

openStmt map[*driverStmt]bool

// guarded by db.mu

inUse bool

onPut []func() // code (with db.mu held) run when conn is next returned // 归还连接的时候调用

dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked

}

对单个连接的封装,包含了实际的数据库连接以及相关的状态信息等

3. driver.Conn

// Conn is a connection to a database. It is not used concurrently

// by multiple goroutines.

//

// Conn is assumed to be stateful.

type Conn interface {

// Prepare returns a prepared statement, bound to this connection.

Prepare(query string) (Stmt, error)

// Close invalidates and potentially stops any current

// prepared statements and transactions, marking this

// connection as no longer in use.

//

// Because the sql package maintains a free pool of

// connections and only calls Close when there's a surplus of

// idle connections, it shouldn't be necessary for drivers to

// do their own connection caching.

Close() error

// Begin starts and returns a new transaction.

//

// Deprecated: Drivers should implement ConnBeginTx instead (or additionally).

Begin() (Tx, error)

}

一条具体的数据库连接,需要由不同驱动自己去实现接口

4. driver.Driver

type Driver interface {

Open(name string) (Conn, error)

}

Driver 只包含一个函数,Open()用来返回一个可用连接,可能是新建立的,也可能是之前缓存的关闭的连接。

5. driver.DriverContext

type DriverContext interface {

// OpenConnector must parse the name in the same format that Driver.Open

// parses the name parameter.

OpenConnector(name string) (Connector, error)

}

DriverContext 的目的是维护drievr上下文信息,避免了每次新建连接的时候都需要解析一遍 dsn。需要有Driver对象自己去实现。

6. driver.Connector

type Connector interface {

// Connect returns a connection to the database.

// Connect may return a cached connection (one previously

// closed), but doing so is unnecessary; the sql package

// maintains a pool of idle connections for efficient re-use.

//

// The provided context.Context is for dialing purposes only

// (see net.DialContext) and should not be stored or used for

// other purposes.

//

// The returned connection is only used by one goroutine at a

// time.

Connect(context.Context) (Conn, error)

// Driver returns the underlying Driver of the Connector,

// mainly to maintain compatibility with the Driver method

// on sql.DB.

Driver() Driver

}

driver.Connector 是driver的插口,是一个接口类型的对象,由不同类型的数据库来实现。

driver.Connector 包含两个函数。

Connect 用来建立连接

Driver 用来返回一个 Driver 对象,Driver也是个接口类型对象,需要不同的数据库自己去实现。

主要操作流程

1. 注册驱动

import (

_ "github.com/go-sql-driver/mysql"

)

var (

driversMu sync.RWMutex

drivers = make(map[string]driver.Driver)

)

func Register(name string, driver driver.Driver) {

driversMu.Lock()

defer driversMu.Unlock()

if driver == nil {

panic("sql: Register driver is nil")

}

if _, dup := drivers[name]; dup {

panic("sql: Register called twice for driver " + name)

}

drivers[name] = driver

}

/database/sql 提供的是一个通用的数据库连接池,当我们连接不同的数据库时,只需要将对应的数据库驱动注册进去就可以使用。

这里的注册,实际上就是将数据库名称和对应的数据库驱动(数据库连接包装器)添加的一个map中,每个import进来的库,需要在init函数中调用注册函数来实现。

2. 创建连接池句柄 sql.Open()

func Open(driverName, dataSourceName string) (*DB, error) {

driversMu.RLock()

driveri, ok := drivers[driverName] // 1

driversMu.RUnlock()

if !ok {

return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)

}

if driverCtx, ok := driveri.(driver.DriverContext); ok { // 2

connector, err := driverCtx.OpenConnector(dataSourceName)

if err != nil {

return nil, err

}

return OpenDB(connector), nil // 3

}

return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil // 4

}

func OpenDB(c driver.Connector) *DB {

ctx, cancel := context.WithCancel(context.Background())

db := &DB{

connector: c,

openerCh: make(chan struct{}, connectionRequestQueueSize),

lastPut: make(map[*driverConn]string),

connRequests: make(map[uint64]chan connRequest),

stop: cancel,

}

go db.connectionOpener(ctx) // 通过channel通知来创建连接

// go db.connectionResetter(ctx) // 用于重置连接,1.14废弃

return db

}

Open函数通常解释为初始化db,这里只是通过驱动名称,获取到对应的驱动,并对驱动进行一系列的初始化操作,需要注意的是,Open并不会和db建立连接,只是在操作这些数据结构,启动后台协程之类的动作。

这里的dataSourceName简称dsn,包含了连接数据库所必须的参数,用户名密码ip端口等信息,由不同的驱动自己实现解析,当然,有些驱动也支持在dsn中配置一些数据库参数,如autocommit等。由于解析字符串得到这些信息会有一定的资源消耗,因此,还提供了对解析后的结果缓存的功能,避免了每次建立新的连接都需要解析一次,要做到这一点,需要驱动实现 driver.DriverContext 接口。

这个时候你就有了这样一个结构,不过此时的连接池中并没有连接,也就是说没有真正访问db

64ec92817f1f26f956952a55a27a8b9f.png

3. 设置数据库连接参数

最大空闲连接数,空闲连接数超过该值就会被关闭,默认为defaultMaxIdleConns(2)

func (db *DB) SetMaxIdleConns(n int) {}

最大允许打开的连接数,超过该数量后,不允许建立新的连接,工作协程只能阻塞等待连接的释放

func (db *DB) SetMaxOpenConns(n int) {}

连接可以被重用的最大时间,换言之,一个连接多久后会被关闭,不过会等待当前的请求完成后才会关闭,be closed lazily,一个很鸡肋的参数

func (db *DB) SetConnMaxLifetime(d time.Duration) {

// 通过启动一个单独的协程 connectionCleaner 来实现

startCleanerLocked {

go db.connectionCleaner(db.shortestIdleTimeLocked())

}

}

1.15 之后新增参数,连接最大空闲时间,idle时间超过该值会被关闭,不过会等待当前的请求完成后才会关闭,be closed lazily

func (db *DB) SetConnMaxIdleTime(d time.Duration) {

// 1.15 实现了对空闲连接的超时回收,复用了SetConnMaxLifetime的部分逻辑,也是在connectionCleaner协程中实现的

}

SetConnMaxLifetime 和 SetConnMaxIdleTime 细节实现

1.14 实现

func (db *DB) startCleanerLocked() {

if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {

db.cleanerCh = make(chan struct{}, 1)

go db.connectionCleaner(db.maxLifetime)

}

}

func (db *DB) connectionCleaner(d time.Duration) {

const minInterval = time.Second

if d < minInterval {

d = minInterval

}

t := time.NewTimer(d)

for {

// 当maxlife时间到达

// 或者maxlife发生改变及db被close

select {

case

case

}

db.mu.Lock()

d = db.maxLifetime

if db.closed || db.numOpen == 0 || d <= 0 {

db.cleanerCh = nil

db.mu.Unlock()

return

}

// 循环处理free状态的连接

expiredSince := nowFunc().Add(-d)

var closing []*driverConn

for i := 0; i < len(db.freeConn); i++ {

c := db.freeConn[i]

if c.createdAt.Before(expiredSince) {

closing = append(closing, c)

last := len(db.freeConn) - 1

db.freeConn[i] = db.freeConn[last]

db.freeConn[last] = nil

db.freeConn = db.freeConn[:last]

i--

}

}

db.maxLifetimeClosed += int64(len(closing))

db.mu.Unlock()

for _, c := range closing {

c.Close()

}

// 如果maxlife被重置,需要更新定时器时间

if d < minInterval {

d = minInterval

}

t.Reset(d)

}

}

1.15 实现

func (db *DB) startCleanerLocked() {

if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {

db.cleanerCh = make(chan struct{}, 1)

go db.connectionCleaner(db.shortestIdleTimeLocked()) // maxidle和maxlife取较小值

}

}

func (db *DB) connectionCleaner(d time.Duration) {

const minInterval = time.Second

if d < minInterval {

d = minInterval

}

t := time.NewTimer(d)

for {

select {

case

case

}

db.mu.Lock()

d = db.shortestIdleTimeLocked()

if db.closed || db.numOpen == 0 || d <= 0 {

db.cleanerCh = nil

db.mu.Unlock()

return

}

closing := db.connectionCleanerRunLocked()

db.mu.Unlock()

for _, c := range closing {

c.Close()

}

if d < minInterval {

d = minInterval

}

t.Reset(d)

}

}

// 对idle超时和life超时的连接分别收集,统一返回

func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) {

if db.maxLifetime > 0 {

expiredSince := nowFunc().Add(-db.maxLifetime)

for i := 0; i < len(db.freeConn); i++ {

c := db.freeConn[i]

if c.createdAt.Before(expiredSince) {

closing = append(closing, c)

last := len(db.freeConn) - 1

db.freeConn[i] = db.freeConn[last]

db.freeConn[last] = nil

db.freeConn = db.freeConn[:last]

i--

}

}

db.maxLifetimeClosed += int64(len(closing))

}

if db.maxIdleTime > 0 {

expiredSince := nowFunc().Add(-db.maxIdleTime)

var expiredCount int64

for i := 0; i < len(db.freeConn); i++ {

c := db.freeConn[i]

if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) {

closing = append(closing, c)

expiredCount++

last := len(db.freeConn) - 1

db.freeConn[i] = db.freeConn[last]

db.freeConn[last] = nil

db.freeConn = db.freeConn[:last]

i--

}

}

db.maxIdleTimeClosed += expiredCount

}

return

}

1.14 和 1.15的实现逻辑基本一致,只是增加了对idle超时的判断做了兼容

4. 访问数据库

当我们做完上面这些初始化动作后,按照我们的习惯,通常会尝试性连接下db,用来判断连接参数是否正常,如用户名密码是否正确,但并不是发送用户请求,一般的做法是调用 db.Ping(),

func (db *DB) Ping() error {

return db.PingContext(context.Background())

}

func (db *DB) PingContext(ctx context.Context) error {

var dc *driverConn

var err error

// 获取一个可用连接,后面会看到一样的逻辑,这里先跳过细节

for i := 0; i < maxBadConnRetries; i++ {

dc, err = db.conn(ctx, cachedOrNewConn)

if err != driver.ErrBadConn {

break

}

}

if err == driver.ErrBadConn {

dc, err = db.conn(ctx, alwaysNewConn) // db.conn 是来获取可用连接的,是数据库连接池较为核心的一部分

}

if err != nil {

return err

}

// 发送ping命令

return db.pingDC(ctx, dc, dc.releaseConn)

}

func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {

var err error

if pinger, ok := dc.ci.(driver.Pinger); ok {

withLock(dc, func() {

err = pinger.Ping(ctx) // 这里需要驱动自己去实现,对应mysql来说,发送的是sql_type=14(COM_PING)的请求包

})

}

release(err) // 将该连接放回到free池

return err

}

5. 发送sql请求

这里看几个最简单的发送sql的方法

// 没有结果集,值返回ok/error包

func (db *DB) Exec(query string, args ...interface{}) (Result, error) {}

func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {}

// 返回大于0条结果集

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {}

func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {}

// 预期结果集只有一行,没有结果集Scan时报ErrNoRows,Scan结果如果有多行,只取第一行,多余的数据行丢弃

func (db *DB) QueryRow(query string, args ...interface{}) *Row {}

func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {}

这里有几个注意事项:

我们可以发现,每一个方法都会同时有另外一个带 Context 后缀的方法,查看调用关系的话,会发现,不带Context的函数(Exec/Query/QueryRow)其实里面就是调用的带Context的函数(ExecContext/QueryContext/QueryRowContext),这里的Context和大多数库函数一样,用来进行信号的同步,例如超时限制等,一般不需要单独设置

我们可以发现,每个函数参数都是支持可变参数列表,用法和prepare用法一样,用 ? 做占位符,那我们直接拼好sql和使用占位符哪种更优呢?

rows1, err := db.Query("select * from t1 where a = 1”)

rows2, err := db.Query("select * from t1 where a = ?", 1)

这两条sql执行的结果是一样的,但是底层是不一样的,与不同驱动的具体实现略有差别。

以mysql为例,区别在于第一个Query,实际发送了一条sql(sql_type:3),第二条Query,实际发送了两条sql(sql_type:22 和 sql_tyep:23),先prepare,再execute,虽说二进制协议要快些,但是每次都会发送两条sql,第一次发送的prepare,之后只会execute一次且不会主动回收这个prepare信息。

这个接口设计之初,应该就是按照prepare+execute的思想设计的,当占位符参数个数为0时,能否优化直接发送一条sql,要看底层的驱动接口是否支持,换言之,prepare+execute

接下来,以Query为例,看下具体的实现流程

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {

return db.QueryContext(context.Background(), query, args...)

}

func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {

var rows *Rows

var err error

// 执行query,优先从连接池获取连接,如果获取到badconn(以及关闭的连接),重试,最多重试maxBadConnRetries(2)次

for i := 0; i < maxBadConnRetries; i++ {

rows, err = db.query(ctx, query, args, cachedOrNewConn)

if err != driver.ErrBadConn {

break

}

}

// 一定创建新的连接执行query

if err == driver.ErrBadConn {

return db.query(ctx, query, args, alwaysNewConn)

}

return rows, err

}

func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {

// 获取连接

dc, err := db.conn(ctx, strategy)

if err != nil {

return nil, err

}

// 使用获取的连接执行查询

return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)

}

可以发现,执行一条普通sql,需要两步,第一步,获取连接(db.conn),第二步,执行查询(db.queryDC)

6. 获取连接

// 提供了两种获取连接的策略,alwaysNewConn & cachedOrNewConn,字面意思,总是新建 & 优先复用free连接

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {

// 全局加锁 这里有个连接池的大锁,需要注意

db.mu.Lock()

if db.closed {

db.mu.Unlock()

return nil, errDBClosed

}

// context 超时检测

select {

default:

case

db.mu.Unlock()

return nil, ctx.Err()

}

lifetime := db.maxLifetime

// 优先从free池中获取连接

numFree := len(db.freeConn)

if strategy == cachedOrNewConn && numFree > 0 {

// 取第一个free连接

conn := db.freeConn[0]

// 切片拷贝

copy(db.freeConn, db.freeConn[1:])

// 调整切片长度

db.freeConn = db.freeConn[:numFree-1]

conn.inUse = true

db.mu.Unlock()

// 检查连接是否超时,超时则返回错误

if conn.expired(lifetime) {

conn.Close()

return nil, driver.ErrBadConn

}

// 对连接状态进行重置,通常是使用过的连接需要重置,避免连接已经处于不可用状态

if err := conn.resetSession(ctx); err == driver.ErrBadConn {

conn.Close()

return nil, driver.ErrBadConn

}

return conn, nil

}

// 已经没有free连接,或者策略要求创建一个新连接

// 当前打开的连接已经达到了允许打开连接数的上限,需要阻塞等待

if db.maxOpen > 0 && db.numOpen >= db.maxOpen {

// Make the connRequest channel. It's buffered so that the

// connectionOpener doesn't block while waiting for the req to be read.

// 建立一个唯一key和请求连接connRequest channel的映射

req := make(chan connRequest, 1)

reqKey := db.nextRequestKeyLocked()

db.connRequests[reqKey] = req

db.waitCount++

db.mu.Unlock()

waitStart := time.Now()

// Timeout the connection request with the context.

select {

// 如果超时,从map中删除该key,记录统计信息,并检查连接是否已经就绪

case

// Remove the connection request and ensure no value has been sent

// on it after removing.

db.mu.Lock()

delete(db.connRequests, reqKey)

db.mu.Unlock()

atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

// 如果已经生成了可用连接,将新连接放回到free池中

select {

default:

case ret, ok :=

if ok && ret.conn != nil {

db.putConn(ret.conn, ret.err, false)

}

}

return nil, ctx.Err()

case ret, ok :=

atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

if !ok {

return nil, errDBClosed

}

// Only check if the connection is expired if the strategy is cachedOrNewConns.

// If we require a new connection, just re-use the connection without looking

// at the expiry time. If it is expired, it will be checked when it is placed

// back into the connection pool.

// This prioritizes giving a valid connection to a client over the exact connection

// lifetime, which could expire exactly after this point anyway.

// 对cachedOrNewConn策略的连接请求,需要判断连接是否过期

// 如果是请求新连接,则不做判断,等连接被放回free池中时再回收

if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {

ret.conn.Close()

return nil, driver.ErrBadConn

}

if ret.conn == nil {

return nil, ret.err

}

// Reset the session if required.

if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {

ret.conn.Close()

return nil, driver.ErrBadConn

}

return ret.conn, ret.err

}

}

// 由于未达到连接数上限,直接创建新连接

db.numOpen++ // optimistically

db.mu.Unlock()

ci, err := db.connector.Connect(ctx)

if err != nil {

db.mu.Lock()

db.numOpen-- // correct for earlier optimism

db.maybeOpenNewConnections()

db.mu.Unlock()

return nil, err

}

db.mu.Lock()

dc := &driverConn{

db: db,

createdAt: nowFunc(),

ci: ci,

inUse: true,

}

db.addDepLocked(dc, dc)

db.mu.Unlock()

return dc, nil

}

综上,当我们向连接池申请连接时,

如果策略是 cachedOrNewConn,free连接池中有,则直接取出;

如果连接池没有空闲连接或者策略为alwaysNewConn,当前连接不超过上限,则直接创建;

否则通过channel去异步创建建立,调用点阻塞等待连接。

7. 执行查询

Query

// ctx 是调用sql设置的上下文

// txctx 是事务的上下文,如果有

// releaseConn 上层传递的函数句柄,连接使用完后,将该连接放回到连接池

func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {

queryerCtx, ok := dc.ci.(driver.QueryerContext)

var queryer driver.Queryer

if !ok {

queryer, ok = dc.ci.(driver.Queryer)

}

if ok {

var nvdargs []driver.NamedValue

var rowsi driver.Rows

var err error

withLock(dc, func() {

nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)

if err != nil {

return

}

rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)

})

// err要么为nil,要么为ErrSkip以外的其他错误

// ErrSkip 通常为某些可选接口不存在,可以尝试其他接口

if err != driver.ErrSkip {

if err != nil {

releaseConn(err)

return nil, err

}

// err != nil

// 数据库连接的所有权转交给了rows,rows需要主动Close,以将该连接放回到free连接池中

rows := &Rows{

dc: dc,

releaseConn: releaseConn,

rowsi: rowsi,

}

// 通过context,当收到上层事件或者事务关闭的消息,rows能够自动调用Close释放连接

rows.initContextClose(ctx, txctx)

return rows, nil

}

}

// prepare

var si driver.Stmt

var err error

withLock(dc, func() {

si, err = ctxDriverPrepare(ctx, dc.ci, query)

})

if err != nil {

releaseConn(err)

return nil, err

}

// execute

ds := &driverStmt{Locker: dc, si: si}

rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)

if err != nil {

ds.Close()

releaseConn(err)

return nil, err

}

// Note: ownership of ci passes to the *Rows, to be freed

// with releaseConn.

rows := &Rows{

dc: dc,

releaseConn: releaseConn,

rowsi: rowsi,

closeStmt: ds,

}

// 同上

rows.initContextClose(ctx, txctx)

return rows, nil

}

可以发现,在sql包这一层,已经做好了所有的连接管理的动作,具体的收发包/包协议逻辑给了不同的驱动自己实现,当执行完查询后,连接的所有权转交给了rows对象,意味着需要rows主动调用 Close() 函数,才会将当前使用的连接放回连接池中去。

QueryRow

同样的,QueryRow() 和 Query() 其实底层是用的一套方法,返回值也仅仅是多包了一层

func (db *DB) QueryRow(query string, args ...interface{}) *Row {

return db.QueryRowContext(context.Background(), query, args...)

}

func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {

rows, err := db.QueryContext(ctx, query, args...)

return &Row{rows: rows, err: err}

}

// Row 和 Rows 的关系

type Row struct {

// One of these two will be non-nil:

err error // deferred error for easy chaining

rows *Rows

}

细心的话,能够发现 Row 仅仅提供了 Scan 一个方法,甚至 Close() 都没有,相比 Rows,看着又些单薄,那如何释放连接呢?

在 Row 的 Scan() 方法里,会从rows读取第一条数据,在最后,调用了rows的Close() 方法

func (r *Row) Scan(dest ...interface{}) error {

if r.err != nil {

return r.err

}

defer r.rows.Close()

for _, dp := range dest {

if _, ok := dp.(*RawBytes); ok {

return errors.New("sql: RawBytes isn't allowed on Row.Scan")

}

}

if !r.rows.Next() {

if err := r.rows.Err(); err != nil {

return err

}

return ErrNoRows

}

err := r.rows.Scan(dest...)

if err != nil {

return err

}

// Make sure the query can be processed to completion with no errors.

return r.rows.Close()

}

意味着,当我们使用 QueryRow() 时,必须使用row.Scan( ) 来获取结果,否则该连接就不会放回连接池中去。

Exec

Exec 由于不需要结果集,因此,对连接的release就不像前两个那么麻烦,除此之外的处理流程基本一样。

func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {

// 调用 Exec 函数就不需要额外关心连接的release,在函数结束之前就放回free池中

defer func() {

release(err)

}()

execerCtx, ok := dc.ci.(driver.ExecerContext)

var execer driver.Execer

if !ok {

execer, ok = dc.ci.(driver.Execer)

}

// 和Query一样,如果驱动有实现这两个接口,就直接调用,否则由sql包主动触发调用prepare+execute

if ok {

var nvdargs []driver.NamedValue

var resi driver.Result

withLock(dc, func() {

nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)

if err != nil {

return

}

resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)

})

if err != driver.ErrSkip {

if err != nil {

return nil, err

}

return driverResult{dc, resi}, nil

}

}

var si driver.Stmt

withLock(dc, func() {

si, err = ctxDriverPrepare(ctx, dc.ci, query)

})

if err != nil {

return nil, err

}

ds := &driverStmt{Locker: dc, si: si}

defer ds.Close()

// 从 statement 中保存结果

return resultFromStatement(ctx, dc.ci, ds, args...)

}

8. 优雅地使用stmt

上面提到,直接使用占位符的方式来执行二进制sql,实际每次会发送两条sql,并不能提高执行效率,那statement的正确执行方式是什么呢?

stmt, err := db.Prepare("select * from t1 where a = ?”) // prepare,sql_type=22

if err != nil {

return

}

_, err = stmt.Exec(1) // 第一次执行,sql_type=23

if err != nil {

return

}

rows, err := stmt.Query(1) // 第二次执行,连接所有权转交给rows,sql_type=23

if err != nil {

return

}

_ = rows.Close() // 归还连接的所有权

_ = stmt.Close() // sql_type=25

我们知道,db是一个连接池对象,这里prepare只需要显示调用一次,之后stmt在执行时,如果获取到了新的连接或者没有执行过prepare的连接,那么它会首先调用prepare,之后再去执行execute,因此,我们无需担心是否会在一个没有prepare过的连接上execute。

同样,stmt在调用Close()时,会对所有连接上都执行close,关闭掉这个stmt,因此,关闭之前,要保证这个stmt不会再被执行。

9. 释放连接

前面提到,我们连接执行完一次普通查询,就需要及时放回到freeConn连接池中,中间连接的拥有权虽然会转移,但最终都需要被回收,其实,开启事务的请求也类似,会在事务提交或回滚后释放连接。连接释放的方法从上层不断向下传递,所有可能拥有连接所有权的对象,都可能接受到该释放连接到方法。

// 用来将使用完的连接放回到free连接池中

func (dc *driverConn) releaseConn(err error) {

dc.db.putConn(dc, err, true)

}

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {

// 检查连接是否还能复用

if err != driver.ErrBadConn {

if !dc.validateConnection(resetSession) {

err = driver.ErrBadConn

}

}

// debugGetPut 是测试信息

db.mu.Lock()

if !dc.inUse {

db.mu.Unlock()

if debugGetPut {

fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])

}

panic("sql: connection returned that was never out")

}

if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {

err = driver.ErrBadConn

}

if debugGetPut {

db.lastPut[dc] = stack()

}

dc.inUse = false

// 在这个连接上注册的一些statement的关闭函数

for _, fn := range dc.onPut {

fn()

}

dc.onPut = nil

// 如果当前连接已经不可用,意味着可能会有新的连接请求,调用maybeOpenNewConnections进行检测

if err == driver.ErrBadConn {

// Don't reuse bad connections.

// Since the conn is considered bad and is being discarded, treat it

// as closed. Don't decrement the open count here, finalClose will

// take care of that.

db.maybeOpenNewConnections()

db.mu.Unlock()

dc.Close()

return

}

// hook 的一个函数,用于测试,默认为nil

if putConnHook != nil {

putConnHook(db, dc)

}

added := db.putConnDBLocked(dc, nil)

db.mu.Unlock()

if !added {

dc.Close()

return

}

}

10. 连接管理

对连接的管理,主要包括连接的申请,连接的回收及复用,异步释放超时的连接。

连接管理的整个流程如下

076df16c6634a27954957a37fc85a495.png

11. 不开启事务,如何固定占用一条连接

通过前面这些内容,能够发现,在不开启事务的情况下,连接完成一笔请求,回被放回到free池里去,所以哪怕连续执行两条select,也有可能用的不是同一个实际的数据库连接,某些特殊场景,比如我们执行完存储过程,想要select输出型结果时,这里就不满足要求。

简化下需求,其实是我们想要长时间占用一个连接,开启事务是一种解决方案,不过额外引入事务,可能会造成锁的延迟释放(以mysql两阶段锁为例), 这里可以用Context方法来实现,用法举例

{

var a int

ctx := context.Background()

cn, err := db.Conn(ctx) // 绑定一个连接

if err != nil {

return

}

// 执行第一次查询,将连接所有权转交给rows1

rows1, err := cn.QueryContext(ctx, "select * from t1")

if err != nil {

return

}

_ = rows1.Scan(&a)

_ = rows1.Close() // rows1 close,将连接所有权交给cn

// 执行第二次查询,将连接所有权转交给rows2

rows2, err = cn.QueryContext(ctx, "select * from t1")

if err != nil {

return

}

_ = rows2.Scan(&a)

_ = rows2.Close() // rows1 close,将连接所有权交给cn

// cn close,连接回收,放回free队列

_ = cn.Close()

}

关于db.Conn( ) 返回的sql.Conn对象,需要和driver.Conn 做区分,sql.Conn 是对driverConn的再一次封装,是为里提供连续的单个数据库连接,driver.Conn 是不同驱动要实现的接口

// Conn represents a single database connection rather than a pool of database

// connections. Prefer running queries from DB unless there is a specific

// need for a continuous single database connection.

//

// A Conn must call Close to return the connection to the database pool

// and may do so concurrently with a running query.

//

// After a call to Close, all operations on the

// connection fail with ErrConnDone.

type Conn struct {

db *DB

// closemu prevents the connection from closing while there

// is an active query. It is held for read during queries

// and exclusively during close.

closemu sync.RWMutex

// dc is owned until close, at which point

// it's returned to the connection pool.

dc *driverConn

// done transitions from 0 to 1 exactly once, on close.

// Once done, all operations fail with ErrConnDone.

// Use atomic operations on value when checking value.

done int32

}

12. 监控连接池状态

由于mysql协议是同步的,因此,当客户端游大量的并发请求,但是连接数要小于并发数的情况下,是会有一部分请求被阻塞,等待其它请求释放连接,在某些场景或使用不当的情况下,这里也可能会成为瓶颈。不过库中并没有详细记录每一笔请求的连接等待时间,只提供了累计的等待时间之和,以及其它的监控指标,在定位问题时可以用做参考。

库提供了 db.Stats( ) 方法,会从db对象中获取所有的监控指标,并生成对象 DBStats 对象

func (db *DB) Stats() DBStats {

wait := atomic.LoadInt64(&db.waitDuration)

db.mu.Lock()

defer db.mu.Unlock()

stats := DBStats{

MaxOpenConnections: db.maxOpen,

Idle: len(db.freeConn),

OpenConnections: db.numOpen,

InUse: db.numOpen - len(db.freeConn),

WaitCount: db.waitCount,

WaitDuration: time.Duration(wait),

MaxIdleClosed: db.maxIdleClosed,

MaxLifetimeClosed: db.maxLifetimeClosed,

}

return stats

}

一个简单的使用例子

func monitorConn(db *sql.DB) {

go func(db *sql.DB) {

mt := time.NewTicker(monitorDbInterval * time.Second)

for {

select {

case

stat := db.Stats()

logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+

"wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)",

db,

stat.MaxOpenConnections, stat.OpenConnections,

stat.InUse, stat.Idle,

stat.WaitCount, stat.MaxIdleClosed,

stat.MaxLifetimeClosed, stat.WaitDuration)

}

}

}(db)

}

需要注意的是,1.15 之前,对 stat.MaxLifetimeClosed 对象统计会有异常,1.15 之后做了修复。

Attention

注意连接所有者的传递关系,使用完成后要及时回收,如rows.Close(),row.Scan()等,不回收会造成连接泄漏,新的请求会被一直阻塞

尽量避免使用占位符的方式执行sql,推荐自己完成sql的拼接或正常使用stmt

1.15 后支持了对单个连接空闲时间的限制

db.Conn( ) 能够持续占用一条连接,但是在该连接中,就没有办法调用之前prepare生成的stmt,但是在事务中可以,tx.Stmt( )可以生成特定于该事务的stmt

go提供了数据库连接池回收策略,是针对freeConn的,换句话说,连接如果被一直占用,哪怕已经超过了生存时间,也不会被回收

我们注意到,每次对连接池操作时,都要先加一把全局大锁,因此,当连接数较多(>1000),且请求量较大时,会存在较为严重的锁竞争,这一点通过top(sys)指标,以及pprof也能发现,因为,一个简单的方式,是将一个大的连接池拆分为多个小的连接池,一般情况下,通过简单的轮询将请求打散在多个连接池上,能有效降低锁的粒度

【完】