概述
Golang 提供了database/sql包用于对SQL数据库的访问, 作为操作数据库的入口对象sql.DB, 主要为我们提供了两个重要的功能:
sql.DB 通过数据库驱动为我们提供管理底层数据库连接的打开和关闭操作.
sql.DB 为我们管理数据库连接池
需要注意的是,sql.DB表示操作数据库的抽象访问接口,而非一个数据库连接对象;它可以根据driver打开关闭数据库连接,管理连接池。正在使用的连接被标记为繁忙,用完后回到连接池等待下次使用。所以,如果你
没有把连接释放回连接池,会导致过多连接使系统资源耗尽。
通常来说, 不应该直接使用驱动所提供的方法, 而是应该使用 sql.DB, 因此在导入 mysql 驱动时, 这里使用了匿名导入的方式( 在包路径前添加 _ ), 当导入了一个数据库驱动后, 此驱动会自行初始化并注册自己到
Golang的database/sql的驱动map中, 因此我们就可以通过 database/sql 包提供的方法访问数据库了。
注册驱动
import _ "github.com/go-sql-driver/mysql"
driver.go 文件
找到mysql包的init方法
func init() {
sql.Register("mysql", &MySQLDriver{})
}
sql 为 database/sql 包,说明mysql包自身就依赖 database/sql 包。
sql采用map的方式存储驱动。
func Register(name string, driver driver.Driver) {
driversMu.Lock()
defer driversMu.Unlock()
if driver == nil {
panic("sql: Register driver is nil")
}
if _, dup := drivers[name]; dup {
panic("sql: Register called twice for driver " + name)
}
drivers[name] = driver
}
type Driver interface {
Open(name string) (Conn, error)
}
可见驱动需要实现 Open 方法。
type MySQLDriver struct{}
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
cfg, err := ParseDSN(dsn)
if err != nil {
return nil, err
}
c := &connector{
cfg: cfg,
}
return c.Connect(context.Background())
}
连接数据库
db, err := sql.Open("mysql", cfg)
sql.open 方法:
// Open opens a database specified by its database driver name and a
// driver-specific data source name, usually consisting of at least a
// database name and connection information.
//
// Most users will open a database via a driver-specific connection
// helper function that returns a *DB. No database drivers are included
// in the Go standard library. See https://golang.org/s/sqldrivers for
// a list of third-party drivers.
//
// Open may just validate its arguments without creating a connection
// to the database. To verify that the data source name is valid, call
// Ping.
//
// The returned DB is safe for concurrent use by multiple goroutines
// and maintains its own pool of idle connections. Thus, the Open
// function should be called just once. It is rarely necessary to
// close a DB.
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
得到的 *DB 是协程安全的,要确保 Open 方法只被调用一次,避免每个请求过来都去 Open ,在使用过程中更加没必要close a DB对象。
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
return db
}
会启动一个goroutine来异步的创建连接,在后面会提到。
// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}
当收到 创建连接 的消息时执行 db.openNewConnection 去调用驱动的 Connector 对象的 Connect 方法:
ci, err := db.connector.Connect(ctx)
再来看看mysql的connector
func (d MySQLDriver) OpenConnector(dsn string) (driver.Connector, error) {
cfg, err := ParseDSN(dsn)
if err != nil {
return nil, err
}
return &connector{
cfg: cfg,
}, nil
}
driver.Connector 需要实现两个方法:
type Connector interface {
Connect(context.Context) (Conn, error)
Driver() Driver
}
Connect方法才是真正发起连接的操作。
由 db.openerCh 通道表明,sql.Open并不会立即检验参数和创建连接,而是等待通知过来才去创建。如果想立即验证连接,需要用Ping()方法。
db.Ping()
Ping完之后自动调用 dc.releaseConn 释放了连接。
sql.DB的设计就是用来作为长连接使用的。不要频繁Open, Close。比较好的做法是,为每个不同的datastore建一个DB对象,保持这些对象Open。
数据查询操作
数据库查询的一般步骤如下:
1、调用 db.Query 执行 SQL 语句, 此方法会返回一个 *Rows。
2、通过 *Rows.Next() 迭代查询数据。
3、通过 *Rows.Scan() 读取每一行的值,并填充到变量。
4、调用 *Rows.Close() 关闭资源,并释放连接。
先来看看 db.Query() 方法,最终跟踪到 quertDC() 方法,这个方法中只有出错了才会去调用 releaseConn,否则返回*Rows,并且将 releaseConn 注入到 *Rows 中;我们再来看看 *Rows.Close() 方法,它最终会去调用 releaseConn。
所以,需要手动调用 rows.Close() 方法,或者 defer rows.Close() 来将连接放回连接池。
当然,如果 *Rows 是局部变量的话,离开作用域后会被自动回收,自动调用 close() 方法来释放,但是为了安全起见,使用 defer rows.Close()来处理。
示例:
func qu() {
rows, err := Db.Query("select * from user limit 1")
if err != nil {
fmt.Println(err)
return
}
ds := Db.Stats()
fmt.Println(ds.InUse) // 1
fmt.Println(ds.OpenConnections) // 1
rows.Close()
ds2 := Db.Stats()
fmt.Println(ds2.InUse) // 0
fmt.Println(ds2.OpenConnections) // 1
}
但是值得注意的是,rows 一旦被关闭后,将无法使用 rwos.Next() 迭代数据集。
那么能不能先关闭连接我再来慢慢遍历呢,使连接得到充分利用??貌似没有提供这个功能。
关于 rows:
type Rows struct {
dc *driverConn // owned; must call releaseConn when closed to release
releaseConn func(error)
rowsi driver.Rows
cancel func() // called when Rows is closed, may be nil.
closeStmt *driverStmt // if non-nil, statement to Close on close
closemu sync.RWMutex
closed bool
lasterr error // non-nil only if closed is true
lastcols []driver.Value
}
都是不可导出的。
遍历数据集
func qu() {
rows, err := Db.Query("select id from photo_user_data where id = 8 limit 1")
if err != nil {
fmt.Println(err)
return
}
defer rows.Close()
for rows.Next() {
var id int
err := rows.Scan(&id)
if err != nil{
fmt.Println(err)
break
}
fmt.Println(id)
}
}
rows.Next() 返回 bool,表示是否有数据可以迭代;同时将一条数据放到 rows.lastcols ,它是一个切片,里面元素的顺序就是你查询的 column 顺序,然后 scan 方法遍历 rows.lastcols 来填充变量。
如果一条记录都没有显然就不会进入循环体。
for i, sv := range rs.lastcols {
err := convertAssignRows(dest[i], sv, rs)
if err != nil {
return fmt.Errorf(`sql: Scan error on column index %d, name %q: %v`, i, rs.rowsi.Columns()[i], err)
}
}
通过位置而不是名称来对应,所以 Scan() 中参数的顺序自己要控制好。
当遍历完成之后会自动调用 rows.CLose() 释放连接,因此 rows 只能被遍历一次。
如何在不遍历的情况下知道是否查询到结果???
查询单条
使用 db.QueryRow() 得到 *Row
func (db *DB) QueryRow(query string, args ...interface{}) *Row
调用 *Row 的 Scan() 方法填充变量,如果没有查询到结果则返回 ErrNoRows 的 error。
func (r *Row) Scan(dest ...interface{}) error
func in() {
var id int
err := Db.QueryRow("select id from photo_user_data where id = 7 limit 1").Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
fmt.Println("err no rows")
} else {
fmt.Println(err)
}
}
fmt.Print(id)
}
QueryRow 其实是对 Query 的进一步封装。
Scan() 内部调用了 r.rows.Close() 来释放连接和资源。
增,删,改操作
增,删,改 都使用 db.Exec() 方法。
对于 db.Exec() 方法则不用担心,因为它主动做了释放连接的工作。
defer func() {
release(err)
}()
func (db *DB) Exec(query string, args ...interface{}) (Result, error)
其中 Result 接口包含两个方法
type Result interface {
LastInsertId() (int64, error)
RowsAffected() (int64, error)
}
对于mysql驱动而言,driverResult为:
// github.com\go-sql-driver\mysql\result.go
type mysqlResult struct {
affectedRows int64
insertId int64
}
func (res *mysqlResult) LastInsertId() (int64, error) {
return res.insertId, nil
}
func (res *mysqlResult) RowsAffected() (int64, error) {
return res.affectedRows, nil
}
当执行出错了,err 会有值。否则,通过 LastInsertId() 和 RowsAffected() 判断执行结果。
func ex() {
Result, err := Db.Exec("update photo_user_data set nickName=? where id = ?", "haha", 7)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("update success")
fmt.Println(Result.RowsAffected())
}
连接池的设计
获取连接的方法
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)
db.numOpen:打开着的连接的个数,打开 numOpen++;关闭 numOpen–;要求 numOpen <= maxOpen。
空闲连接池 db.freeConn[] 切片,存储了 inUse=false 的连接,当然这里面包括了超过了 db.maxLifetime 的连接,因此过期的连接不会自动清掉,被从 db.freeConn[] 中获取之后做判断,过期则直接Close。所以很多这样的代码:
var res Result
var err error
for i := 0; i < maxBadConnRetries; i++ {
res, err = db.exec(ctx, query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.exec(ctx, query, args, alwaysNewConn)
}
return res, err
先尝试 maxBadConnRetries 次从 db.freeConn[] 中获取连接,不行就直接创建连接,此时也要判断如果超过了 maxOpen 的话也不能直接创建连接,而是将请求写入到 db.connRequests 的 map 中,然后在释放连接的地方来优先分配给它们,然后多余的连接才放入 freeConn[] 中去。
db.connRequests 类型为 map[uint64]chan connRequest
在 db.conn 方法中
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
// 然后进入等待状态,以及超时
select {
case <-ctx.Done():
......
case ret, ok := <-req:
......
}
}
在 db.putConnDBLocked 方法中
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
// 随机取出一个
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
}
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool
事务处理
开启事务
func (db *DB) Begin() (*Tx, error)
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error)
*Tx 下面的方法:
func (tx *Tx) Commit() error
func (tx *Tx) Rollback() error
func (tx *Tx) Exec(query string, args ...interface{}) (Result, error)
func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
func (tx *Tx) Prepare(query string) (*Stmt, error)
func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error)
func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error)
func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)
func (tx *Tx) QueryRow(query string, args ...interface{}) *Row
func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row
func (tx *Tx) Stmt(stmt *Stmt) *Stmt
func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt
// The rollback will be ignored if the tx has been committed or rolled back
defer tx.Rollback()
默认会执行rollback操作,因此可不用认为指定。
可选配置
// TxOptions holds the transaction options to be used in DB.BeginTx.
type TxOptions struct {
// Isolation is the transaction isolation level.
// If zero, the driver or database's default level is used.
Isolation IsolationLevel
ReadOnly bool
}
isolation 隔离级别
示例:
func tx() {
tx, err := Db.Begin()
if err != nil {
fmt.Println(err)
return
}
defer tx.Rollback()
Result, err := tx.Exec("update photo_user_data set nickName=? where id = ?", "aaaaaaaa", 8)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("update success")
fmt.Println(Result.RowsAffected())
tx.Commit()
}
事务与预编译
func txp() {
tx, err := Db.Begin()
if err != nil {
fmt.Println(err)
return
}
defer tx.Rollback()
stmt, err := tx.Prepare("update photo_user_data set nickName=? where id = ?")
if err != nil {
fmt.Println(err)
return
}
Result, err := stmt.Exec("nnnnnnnnn", 8)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(Result.RowsAffected())
tx.Commit()
}
预编译SQL语句
SQL预编译后相当于得到一个模板,向这个模板填充参数即可被执行。
可以防止SQL注入,因为是模板填充而不是拼接SQL语句。
并且提升了SQL执行效率,因为使用编译后的语句是不用再去做SQL校验和编译工作。
所以,预编译要想达到提升执行效率的效果,前提是在执行相同的SQL语句(参数可以允许不一样)时,共享 stmt 对象,而不是每次都来 prepare。否则,预编译反而降低了性能,只是防止了SQL注入。
先使用SQL语句和占位符定义语句,在使用 Exec() 执行。
Db.Prepare(sql)编译增删改查语句得到 *stmt
func (db *DB) Prepare(query string) (*Stmt, error)
然后,查询使用 *Stmt.Query(), *Stmt.QueryRow() 方法,和上面的用法一样。
增,删,改使用 *Stmt.Exec() 方法,和上面的用法一样。
Prepare() 会自动释放连接。但是要手动释放 stmt
defer stmt.Close()
示例:
func pre() {
stmt, err := Db.Prepare("insert into photo_category(name, activity_id) value(?, ?)")
if err != nil {
fmt.Println(err)
return
}
defer stmt.Close()
Result, err := stmt.Exec("haha", 1)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(Result.LastInsertId())
// stmt 可重复使用,以提升执行效率
Result1, err := stmt.Exec("haha1", 11)
Result2, err := stmt.Exec("haha2", 12)
Result3, err := stmt.Exec("haha3", 13)
}
可见,使用预编译的话,申请了两次连接。
mysql的预编译是针对连接的,也就是说,Prepare操作和Exec操作要使用同一个连接才可以,否则将没法执行,因为模板SQL和普通的SQL不一样。
关于SQL预编译不会在连接之间共享这一点也很好证明。
连接1
mysql> prepare ins from 'select * from show_product where name=?';
Query OK, 0 rows affected
Statement prepared
mysql> set @a='qqq';
Query OK, 0 rows affected
mysql> execute ins using @a;
...
连接2
mysql> set @a='bbb';
Query OK, 0 rows affected
mysql> execute ins using @a;
1243 - Unknown prepared statement handler (ins) given to EXECUTE
为什么mysql要这样设计呢,暂时不清楚。
那么该如何去管理这种连接呢?
在database/sql中使用了额外的结构 []connStmt{} 来存储当前 stmt 和其对应的连接,在 Prepare操作之后正常释放了连接,并记录到 []connStmt{},这样此连接依然可以被多处使用,
当执行 stmt.Exec / stmt.Query / stmt.QueryRow 操作时,获取连接的方式就不一样了,不是从连接池获取,而是从 []connStmt{} 去拿,此时如果 stmt 对应的连接正在别处被使用,
那么需要阻塞等待,如果此连接关闭了,则需要从连接池获取新的连接,然后需要额外再次编译并运行。
即便如此,在一个高并发的情况下,连接被占用的概率很大,那么预编译的方式来执行SQL,是否能提高性能还不一定,但是从防止SQL注入的角度来看,预编译还是必要的。
下面来看一下database/sql中预编译相关的代码:
// Prepare creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the
// returned statement.
// The caller must call the statement's Close method
// when the statement is no longer needed.
func (db *DB) Prepare(query string) (*Stmt, error) {
return db.PrepareContext(context.Background(), query)
}
func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
// TODO: check if db.driver supports an optional
// driver.Preparer interface and call that instead, if so,
// otherwise we make a prepared statement that's bound
// to a connection, and to execute this prepared statement
// we either need to use this connection (if it's free), else
// get a new connection + re-prepare + execute on that one.
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
}
// prepareDC prepares a query on the driverConn and calls release before
// returning. When cg == nil it implies that a connection pool is used, and
// when cg != nil only a single driver connection is used.
func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
var ds *driverStmt
var err error
defer func() {
release(err)
}()
withLock(dc, func() {
ds, err = dc.prepareLocked(ctx, cg, query)
})
if err != nil {
return nil, err
}
stmt := &Stmt{
db: db,
query: query,
cg: cg,
cgds: ds,
}
// When cg == nil this statement will need to keep track of various
// connections they are prepared on and record the stmt dependency on
// the DB.
if cg == nil {
stmt.css = []connStmt{{dc, ds}}
stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
db.addDep(stmt, stmt)
}
return stmt, nil
}
// connStmt returns a free driver connection on which to execute the
// statement, a function to call to release the connection, and a
// statement bound to that connection.
func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
if err = s.stickyErr; err != nil {
return
}
s.mu.Lock()
if s.closed {
s.mu.Unlock()
err = errors.New("sql: statement is closed")
return
}
// In a transaction or connection, we always use the connection that the
// stmt was created on.
if s.cg != nil {
s.mu.Unlock()
dc, releaseConn, err = s.cg.grabConn(ctx) // blocks, waiting for the connection.
if err != nil {
return
}
return dc, releaseConn, s.cgds, nil
}
s.removeClosedStmtLocked()
s.mu.Unlock()
dc, err = s.db.conn(ctx, strategy)
if err != nil {
return nil, nil, nil, err
}
s.mu.Lock()
for _, v := range s.css {
if v.dc == dc {
s.mu.Unlock()
return dc, dc.releaseConn, v.ds, nil
}
}
s.mu.Unlock()
// No luck; we need to prepare the statement on this connection
withLock(dc, func() {
ds, err = s.prepareOnConnLocked(ctx, dc)
})
if err != nil {
dc.releaseConn(err)
return nil, nil, nil, err
}
return dc, dc.releaseConn, ds, nil
}