1.MySQL驱动注册即连接池启动
github.com/go-sql-driver/mysql/driver.go中的init方法实现mysql驱动注册
func init() {
sql.Register("mysql", &MySQLDriver{})
}
Register()
func Register(name string, driver driver.Driver) {
driversMu.Lock()
defer driversMu.Unlock()
if driver == nil {
panic("sql: Register driver is nil")
}
if _, dup := drivers[name]; dup {
panic("sql: Register called twice for driver " + name)
}
// drivers = make(map[string]driver.Driver)
// drivers是个map,注册只是将“mysql”和一个空结构体MySQLDriver的指针存入map
drivers[name] = driver
}
Open()
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
// 获取驱动结构体指针
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
if driverCtx, ok := driveri.(driver.DriverContext); ok {
// 调用驱动中的OpenConnector
// func (d MySQLDriver) OpenConnector(dsn string) (driver.Connector, error) {
// cfg, err := ParseDSN(dsn)
// if err != nil {
// return nil, err
// }
// return &connector{
// cfg: cfg,
// }, nil
// }
// 实际是返回了dsn对应Config结构体指针
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
//func OpenDB(c driver.Connector) *DB {
// ctx, cancel := context.WithCancel(context.Background())
// db := &DB{
// connector: c,
// openerCh: make(chan struct{}, connectionRequestQueueSize),
// 这里默认chan的容量是50,高并发会不会产生大量的阻塞???
// resetterCh: make(chan *driverConn, 50),
// lastPut: make(map[*driverConn]string),
// connRequests: make(map[uint64]chan connRequest),
// stop: cancel,
// }
//
// 启动创建连接gotoutine
// go db.connectionOpener(ctx)
// 启动清理session的goroutine
// go db.connectionResetter(ctx)
//
// return db
//}
// 这里的OpenDB方法是初始化DB结构体
return OpenDB(connector), nil
}
// 这一句是在转换driver.DriverContext接口失败时执行
// 因为go1.10之前默认是一下方法,1.10后使用driver.DriverContext接口
// 下面是为了做兼容
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
2.sql.DB结构体
为什么要介绍这个结构体,因为连接池的主要实现就是基于sql.DB和sql.driverConn来实现的。
type DB struct {
// 是一个包含Connect(context.Context) (Conn, error)和Driver() Driver方法的接口。
// 这两个接口是需要实际的数据库驱动来实现,再使用;
connector driver.Connector
// 是自sql.DB创建后关闭过的连接数;
numClosed uint64
mu sync.Mutex
// 实际空闲连接数;
freeConn []*driverConn
// 存储pending连接的map,当numOpen大于maxOpen时,连接会被暂存到该map中;
connRequests map[uint64]chan connRequest
// 即connRequests的key,记录当前可用的最新的connRequest的key;
nextRequest uint64
// 当前活跃连接数和当前pending的连接数的总和
numOpen int
// 标记创建连接的的通知channel,独立的goroutine异步消费该channel去执行创建;
openerCh chan struct{}
// 负责重置session的channel,独立的goroutine异步消费该channel去执行重置;
resetterCh chan *driverConn
// 标记DB是否关闭;
closed bool
// 记录db与conn之间的依赖关系,维持连接池以及关闭时使用;
dep map[finalCloser]depSet
// 最新入栈的连接,debug时使用,string中存的是栈buf相关信息;
lastPut map[*driverConn]string
// 最大空闲连接数;
maxIdle int
// 最大连接数;
maxOpen int
// 控线连接的最大存活时间;
maxLifetime time.Duration
// 标记超时连接的通知channel;
cleanerCh chan struct{}
// 通过context通知关闭connection opener和session resetter;
stop func()
}
3.申请连接过程
go从连接池中获取连接时有两个策略:
// 请求新的连接
alwaysNewConn connReuseStrategy = iota
// 从连接池中获取连接
cachedOrNewConn
这里以一个普通查询过程为例,看下连接池如何工作:
// 一般查询代码
rows, err := dbt.db.Query("SELECT * FROM test")
// Query如下,包了QueryContext方法
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
// context.Background()是为了创建一个根上下文,以便close的时候能够将资源彻底的释放
return db.QueryContext(context.Background(), query, args...)
}
//QueryContext方法
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
var rows *Rows
var err error
// maxBadConnRetries是个静态变量为2,这里最多会执行两次从连接池中获取连接,如果在两次获取
// 过程中获取到可用连接则直接返回
for i := 0; i < maxBadConnRetries; i++ {
rows, err = db.query(ctx, query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
// 如果两次都获取不到可用连接,则以请求获取一个新连接的方式获取并返回
if err == driver.ErrBadConn {
return db.query(ctx, query, args, alwaysNewConn)
}
return rows, err
}
// query方法如下
func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
// 这里是重点,这是真正申请连接的过程
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
// 这里是实际的查询过程,不过多介绍
return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}
db.conn()
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// 判断context是否超时,因为ctx可以设置有超时时间的也可以设置无超时时间的
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// 尝试获取一个空闲连接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
// 取出第一个连接
conn := db.freeConn[0]
// copy是在原数组上覆盖,所以需要最后尾数项
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
// 标记conn在使用
conn.inUse = true
db.mu.Unlock()
// 如果conn达到超时时间,直接关闭连接,返回nil
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// 加锁确保conn已经reset完毕
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
// 如果实际连接数已经达到最大连接数,则将新到的请求阻塞,并存到db.connRequests(上面提到的)
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// 创建一个req chan作为线连接的缓存,暂存到db.connRequests中
req := make(chan connRequest, 1)
// nextRequestKeyLocked()实际就是去取db.nextRequest作为key
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.mu.Unlock()
// context超时处理
select {
case <-ctx.Done():
// 如果超时保证新增的reqKey会被删除掉
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
select {
default:
case ret, ok := <-req:
if ok {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
// 当req真正接到写入的时候触发下面的操作,其实和上面的逻辑类似
case ret, ok := <-req:
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// 加锁确保conn已经reset完毕
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
// 如果连接数没有大于最大连接数时,进入以下逻辑
// 先把当点连接数加1
db.numOpen++
db.mu.Unlock()
// 获取一个新连接,这里也就是alwaysNewConn策略的实现
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
// 错误就回滚numOpen
db.numOpen--
// 很重要!!!下面说
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
// 构造成driverConn返回
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
// debug时会用到这个方法
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
connRequests
func (db *DB) maybeOpenNewConnections() {
// 当和maxOpen比较还可以继续创建连接时
// 如果阻塞的连接过多,那么也只能建立db.maxOpen - db.numOpen个
numRequests := len(db.connRequests)
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen
}
}
// 循环numRequests次,给openerCh添加占位量,异步goroutine接收到即可创建新的连接
for numRequests > 0 {
db.numOpen++ // optimistically
numRequests--
if db.closed {
return
}
db.openerCh <- struct{}{}
}
}
connRequests
func (db *DB) openNewConnection(ctx context.Context) {
ci, err := db.connector.Connect(ctx)
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
if err == nil {
ci.Close()
}
db.numOpen--
return
}
if err != nil {
db.numOpen--
// 关键点!!!
db.putConnDBLocked(nil, err)
// 这个就是尝试db.openerCh <- struct{}{}这个操作
db.maybeOpenNewConnections()
return
}
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
// 关键点!!!创建好的dc先传入了这个方法
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}
看下db.putConnDBLocked(nil, err)这个方法:
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 这里判断了db.connRequests是否大于0,即是否有阻塞请求
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
// 有阻塞请求,先delete掉一个key
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey)
if err == nil {
dc.inUse = true
}
// 将刚申请好的dc包装成connRequest赋给req
// 这里的req就是range db.connRequests返回的req
// 这里真正的获取到了连接!!!
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
// 如果没有阻塞的请求,将新建连接存入db.freeConn空闲连接池,并执行一次清理连接
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
return false
}
这就是整个连接池的运行原理。
附.go基础包的数据库连接池存在的问题
maxIdle