概述

Golang 提供了database/sql包用于对SQL数据库的访问, 作为操作数据库的入口对象sql.DB, 主要为我们提供了两个重要的功能:
sql.DB 通过数据库驱动为我们提供管理底层数据库连接的打开和关闭操作.
sql.DB 为我们管理数据库连接池

需要注意的是,sql.DB表示操作数据库的抽象访问接口,而非一个数据库连接对象;它可以根据driver打开关闭数据库连接,管理连接池。正在使用的连接被标记为繁忙,用完后回到连接池等待下次使用。所以,如果你
没有把连接释放回连接池,会导致过多连接使系统资源耗尽。

通常来说, 不应该直接使用驱动所提供的方法, 而是应该使用 sql.DB, 因此在导入 mysql 驱动时, 这里使用了匿名导入的方式( 在包路径前添加 _ ), 当导入了一个数据库驱动后, 此驱动会自行初始化并注册自己到
Golang的database/sql的驱动map中, 因此我们就可以通过 database/sql 包提供的方法访问数据库了。

注册驱动

import _ "github.com/go-sql-driver/mysql"

driver.go 文件

找到mysql包的init方法

func init() {
	sql.Register("mysql", &MySQLDriver{})
}

sql 为 database/sql 包,说明mysql包自身就依赖 database/sql 包。

sql采用map的方式存储驱动。

func Register(name string, driver driver.Driver) {
	driversMu.Lock()
	defer driversMu.Unlock()
	if driver == nil {
		panic("sql: Register driver is nil")
	}
	if _, dup := drivers[name]; dup {
		panic("sql: Register called twice for driver " + name)
	}
	drivers[name] = driver
}

type Driver interface {
	Open(name string) (Conn, error)
}

可见驱动需要实现 Open 方法。

type MySQLDriver struct{}

func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
	cfg, err := ParseDSN(dsn)
	if err != nil {
		return nil, err
	}
	c := &connector{
		cfg: cfg,
	}
	return c.Connect(context.Background())
}

连接数据库

db, err := sql.Open("mysql", cfg)

sql.open 方法:

// Open opens a database specified by its database driver name and a
// driver-specific data source name, usually consisting of at least a
// database name and connection information.
//
// Most users will open a database via a driver-specific connection
// helper function that returns a *DB. No database drivers are included
// in the Go standard library. See https://golang.org/s/sqldrivers for
// a list of third-party drivers.
//
// Open may just validate its arguments without creating a connection
// to the database. To verify that the data source name is valid, call
// Ping.
//
// The returned DB is safe for concurrent use by multiple goroutines
// and maintains its own pool of idle connections. Thus, the Open
// function should be called just once. It is rarely necessary to
// close a DB.
func Open(driverName, dataSourceName string) (*DB, error) {
	driversMu.RLock()
	driveri, ok := drivers[driverName]
	driversMu.RUnlock()
	if !ok {
		return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
	}

	if driverCtx, ok := driveri.(driver.DriverContext); ok {
		connector, err := driverCtx.OpenConnector(dataSourceName)
		if err != nil {
			return nil, err
		}
		return OpenDB(connector), nil
	}

	return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

得到的 *DB 是协程安全的,要确保 Open 方法只被调用一次,避免每个请求过来都去 Open ,在使用过程中更加没必要close a DB对象。

func OpenDB(c driver.Connector) *DB {
	ctx, cancel := context.WithCancel(context.Background())
	db := &DB{
		connector:    c,
		openerCh:     make(chan struct{}, connectionRequestQueueSize),
		lastPut:      make(map[*driverConn]string),
		connRequests: make(map[uint64]chan connRequest),
		stop:         cancel,
	}

	go db.connectionOpener(ctx)

	return db
}

会启动一个goroutine来异步的创建连接,在后面会提到。

// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-db.openerCh:
			db.openNewConnection(ctx)
		}
	}
}

当收到 创建连接 的消息时执行 db.openNewConnection 去调用驱动的 Connector 对象的 Connect 方法:

ci, err := db.connector.Connect(ctx) 

再来看看mysql的connector

func (d MySQLDriver) OpenConnector(dsn string) (driver.Connector, error) {
	cfg, err := ParseDSN(dsn)
	if err != nil {
		return nil, err
	}
	return &connector{
		cfg: cfg,
	}, nil
}

driver.Connector 需要实现两个方法:

type Connector interface {
	Connect(context.Context) (Conn, error)
	Driver() Driver
}

Connect方法才是真正发起连接的操作。

由 db.openerCh 通道表明,sql.Open并不会立即检验参数和创建连接,而是等待通知过来才去创建。如果想立即验证连接,需要用Ping()方法。

db.Ping()

Ping完之后自动调用 dc.releaseConn 释放了连接。

sql.DB的设计就是用来作为长连接使用的。不要频繁Open, Close。比较好的做法是,为每个不同的datastore建一个DB对象,保持这些对象Open。

数据查询操作

数据库查询的一般步骤如下:

1、调用 db.Query 执行 SQL 语句, 此方法会返回一个 *Rows。
2、通过 *Rows.Next() 迭代查询数据。
3、通过 *Rows.Scan() 读取每一行的值,并填充到变量。
4、调用 *Rows.Close() 关闭资源,并释放连接。

先来看看 db.Query() 方法,最终跟踪到 quertDC() 方法,这个方法中只有出错了才会去调用 releaseConn,否则返回*Rows,并且将 releaseConn 注入到 *Rows 中;我们再来看看 *Rows.Close() 方法,它最终会去调用 releaseConn。

所以,需要手动调用 rows.Close() 方法,或者 defer rows.Close() 来将连接放回连接池。

当然,如果 *Rows 是局部变量的话,离开作用域后会被自动回收,自动调用 close() 方法来释放,但是为了安全起见,使用 defer rows.Close()来处理。

示例:

func qu() {
	rows, err := Db.Query("select * from user limit 1")
	if err != nil {
		fmt.Println(err)
		return
	}

	ds := Db.Stats()
	fmt.Println(ds.InUse) // 1
	fmt.Println(ds.OpenConnections) // 1

	rows.Close()

	ds2 := Db.Stats()
	fmt.Println(ds2.InUse) // 0
	fmt.Println(ds2.OpenConnections) // 1
}

但是值得注意的是,rows 一旦被关闭后,将无法使用 rwos.Next() 迭代数据集。

那么能不能先关闭连接我再来慢慢遍历呢,使连接得到充分利用??貌似没有提供这个功能。

关于 rows:

type Rows struct {
	dc          *driverConn // owned; must call releaseConn when closed to release
	releaseConn func(error)
	rowsi       driver.Rows
	cancel      func()      // called when Rows is closed, may be nil.
	closeStmt   *driverStmt // if non-nil, statement to Close on close

	closemu sync.RWMutex
	closed  bool
	lasterr error // non-nil only if closed is true

	lastcols []driver.Value
}

都是不可导出的。

遍历数据集

func qu() {
	rows, err := Db.Query("select id from photo_user_data where id = 8 limit 1")
	if err != nil {
		fmt.Println(err)
		return
	}

	defer rows.Close()

	for rows.Next() {
		var id int
		err := rows.Scan(&id)
		if err != nil{
			fmt.Println(err)
			break
		}
		fmt.Println(id)
	}
}

rows.Next() 返回 bool,表示是否有数据可以迭代;同时将一条数据放到 rows.lastcols ,它是一个切片,里面元素的顺序就是你查询的 column 顺序,然后 scan 方法遍历 rows.lastcols 来填充变量。

如果一条记录都没有显然就不会进入循环体。

for i, sv := range rs.lastcols {
	err := convertAssignRows(dest[i], sv, rs)
	if err != nil {
		return fmt.Errorf(`sql: Scan error on column index %d, name %q: %v`, i, rs.rowsi.Columns()[i], err)
	}
}

通过位置而不是名称来对应,所以 Scan() 中参数的顺序自己要控制好。

当遍历完成之后会自动调用 rows.CLose() 释放连接,因此 rows 只能被遍历一次。

如何在不遍历的情况下知道是否查询到结果???

查询单条

使用 db.QueryRow() 得到 *Row

func (db *DB) QueryRow(query string, args ...interface{}) *Row

调用 *Row 的 Scan() 方法填充变量,如果没有查询到结果则返回 ErrNoRows 的 error。

func (r *Row) Scan(dest ...interface{}) error
func in() {
	var id int
	err := Db.QueryRow("select id from photo_user_data where id = 7 limit 1").Scan(&id)
	if err != nil {
		if err == sql.ErrNoRows {
			fmt.Println("err no rows")
		} else {
			fmt.Println(err)
		}
	}
	fmt.Print(id)
}

QueryRow 其实是对 Query 的进一步封装。
Scan() 内部调用了 r.rows.Close() 来释放连接和资源。

增,删,改操作

增,删,改 都使用 db.Exec() 方法。

对于 db.Exec() 方法则不用担心,因为它主动做了释放连接的工作。

defer func() {
	release(err)
}()
func (db *DB) Exec(query string, args ...interface{}) (Result, error)

其中 Result 接口包含两个方法

type Result interface {
	LastInsertId() (int64, error)
	RowsAffected() (int64, error)
}

对于mysql驱动而言,driverResult为:

// github.com\go-sql-driver\mysql\result.go

type mysqlResult struct {
	affectedRows int64
	insertId     int64
}

func (res *mysqlResult) LastInsertId() (int64, error) {
	return res.insertId, nil
}

func (res *mysqlResult) RowsAffected() (int64, error) {
	return res.affectedRows, nil
}

当执行出错了,err 会有值。否则,通过 LastInsertId() 和 RowsAffected() 判断执行结果。

func ex() {
	Result, err := Db.Exec("update photo_user_data set nickName=? where id = ?", "haha", 7)
	if err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println("update success")
	fmt.Println(Result.RowsAffected())
}

连接池的设计

获取连接的方法

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)

db.numOpen:打开着的连接的个数,打开 numOpen++;关闭 numOpen–;要求 numOpen <= maxOpen。
空闲连接池 db.freeConn[] 切片,存储了 inUse=false 的连接,当然这里面包括了超过了 db.maxLifetime 的连接,因此过期的连接不会自动清掉,被从 db.freeConn[] 中获取之后做判断,过期则直接Close。所以很多这样的代码:

var res Result
var err error
for i := 0; i < maxBadConnRetries; i++ {
	res, err = db.exec(ctx, query, args, cachedOrNewConn)
	if err != driver.ErrBadConn {
		break
	}
}
if err == driver.ErrBadConn {
	return db.exec(ctx, query, args, alwaysNewConn)
}
return res, err

先尝试 maxBadConnRetries 次从 db.freeConn[] 中获取连接,不行就直接创建连接,此时也要判断如果超过了 maxOpen 的话也不能直接创建连接,而是将请求写入到 db.connRequests 的 map 中,然后在释放连接的地方来优先分配给它们,然后多余的连接才放入 freeConn[] 中去。

db.connRequests 类型为 map[uint64]chan connRequest

在 db.conn 方法中

if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
	req := make(chan connRequest, 1)
	reqKey := db.nextRequestKeyLocked()
	db.connRequests[reqKey] = req
	// 然后进入等待状态,以及超时
	select {
	case <-ctx.Done():
		......
	case ret, ok := <-req:
		......
	}
}

在 db.putConnDBLocked 方法中

if c := len(db.connRequests); c > 0 {
	var req chan connRequest
	var reqKey uint64

	// 随机取出一个
	for reqKey, req = range db.connRequests {
		break
	}
	delete(db.connRequests, reqKey) // Remove from pending requests.
	if err == nil {
		dc.inUse = true
	}
	req <- connRequest{
		conn: dc,
		err:  err,
	}
	return true
}
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool

事务处理

开启事务

func (db *DB) Begin() (*Tx, error)
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error)

*Tx 下面的方法:

func (tx *Tx) Commit() error
func (tx *Tx) Rollback() error

func (tx *Tx) Exec(query string, args ...interface{}) (Result, error)
func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
func (tx *Tx) Prepare(query string) (*Stmt, error)
func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error)
func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error)
func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)
func (tx *Tx) QueryRow(query string, args ...interface{}) *Row
func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row
func (tx *Tx) Stmt(stmt *Stmt) *Stmt
func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt
// The rollback will be ignored if the tx has been committed or rolled back
defer tx.Rollback() 

默认会执行rollback操作,因此可不用认为指定。

可选配置

// TxOptions holds the transaction options to be used in DB.BeginTx.
type TxOptions struct {
	// Isolation is the transaction isolation level.
	// If zero, the driver or database's default level is used.
	Isolation IsolationLevel
	ReadOnly  bool
}

isolation 隔离级别

示例:

func tx() {
	tx, err := Db.Begin()
	if err != nil {
		fmt.Println(err)
		return
	}
	defer tx.Rollback()

	Result, err := tx.Exec("update photo_user_data set nickName=? where id = ?", "aaaaaaaa", 8)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("update success")
	fmt.Println(Result.RowsAffected())

	tx.Commit()
}

事务与预编译

func txp() {
	tx, err := Db.Begin()
	if err != nil {
		fmt.Println(err)
		return
	}
	defer tx.Rollback()

	stmt, err := tx.Prepare("update photo_user_data set nickName=? where id = ?")
	if err != nil {
		fmt.Println(err)
		return
	}
	Result, err := stmt.Exec("nnnnnnnnn", 8)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(Result.RowsAffected())

	tx.Commit()
}

预编译SQL语句

SQL预编译后相当于得到一个模板,向这个模板填充参数即可被执行。
可以防止SQL注入,因为是模板填充而不是拼接SQL语句。
并且提升了SQL执行效率,因为使用编译后的语句是不用再去做SQL校验和编译工作。

所以,预编译要想达到提升执行效率的效果,前提是在执行相同的SQL语句(参数可以允许不一样)时,共享 stmt 对象,而不是每次都来 prepare。否则,预编译反而降低了性能,只是防止了SQL注入。

先使用SQL语句和占位符定义语句,在使用 Exec() 执行。
Db.Prepare(sql)编译增删改查语句得到 *stmt

func (db *DB) Prepare(query string) (*Stmt, error)

然后,查询使用 *Stmt.Query(), *Stmt.QueryRow() 方法,和上面的用法一样。
增,删,改使用 *Stmt.Exec() 方法,和上面的用法一样。

Prepare() 会自动释放连接。但是要手动释放 stmt

defer stmt.Close()

示例:

func pre() {
	stmt, err := Db.Prepare("insert into photo_category(name, activity_id) value(?, ?)")
	if err != nil {
		fmt.Println(err)
		return
	}
	defer stmt.Close()

	Result, err := stmt.Exec("haha", 1)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(Result.LastInsertId())

	// stmt 可重复使用,以提升执行效率
	Result1, err := stmt.Exec("haha1", 11)
	Result2, err := stmt.Exec("haha2", 12)
	Result3, err := stmt.Exec("haha3", 13)
}

可见,使用预编译的话,申请了两次连接。

mysql的预编译是针对连接的,也就是说,Prepare操作和Exec操作要使用同一个连接才可以,否则将没法执行,因为模板SQL和普通的SQL不一样。

关于SQL预编译不会在连接之间共享这一点也很好证明。
连接1

mysql> prepare ins from 'select * from show_product where name=?';
Query OK, 0 rows affected
Statement prepared

mysql> set @a='qqq';
Query OK, 0 rows affected

mysql> execute ins using @a;
...

连接2

mysql> set @a='bbb';
Query OK, 0 rows affected

mysql> execute ins using @a;
1243 - Unknown prepared statement handler (ins) given to EXECUTE

为什么mysql要这样设计呢,暂时不清楚。

那么该如何去管理这种连接呢?

在database/sql中使用了额外的结构 []connStmt{} 来存储当前 stmt 和其对应的连接,在 Prepare操作之后正常释放了连接,并记录到 []connStmt{},这样此连接依然可以被多处使用,

当执行 stmt.Exec / stmt.Query / stmt.QueryRow 操作时,获取连接的方式就不一样了,不是从连接池获取,而是从 []connStmt{} 去拿,此时如果 stmt 对应的连接正在别处被使用,

那么需要阻塞等待,如果此连接关闭了,则需要从连接池获取新的连接,然后需要额外再次编译并运行。

即便如此,在一个高并发的情况下,连接被占用的概率很大,那么预编译的方式来执行SQL,是否能提高性能还不一定,但是从防止SQL注入的角度来看,预编译还是必要的。

下面来看一下database/sql中预编译相关的代码:

// Prepare creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the
// returned statement.
// The caller must call the statement's Close method
// when the statement is no longer needed.
func (db *DB) Prepare(query string) (*Stmt, error) {
	return db.PrepareContext(context.Background(), query)
}

func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
	// TODO: check if db.driver supports an optional
	// driver.Preparer interface and call that instead, if so,
	// otherwise we make a prepared statement that's bound
	// to a connection, and to execute this prepared statement
	// we either need to use this connection (if it's free), else
	// get a new connection + re-prepare + execute on that one.
	dc, err := db.conn(ctx, strategy)
	if err != nil {
		return nil, err
	}
	return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
}

// prepareDC prepares a query on the driverConn and calls release before
// returning. When cg == nil it implies that a connection pool is used, and
// when cg != nil only a single driver connection is used.
func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
	var ds *driverStmt
	var err error
	defer func() {
		release(err)
	}()
	withLock(dc, func() {
		ds, err = dc.prepareLocked(ctx, cg, query)
	})
	if err != nil {
		return nil, err
	}
	stmt := &Stmt{
		db:    db,
		query: query,
		cg:    cg,
		cgds:  ds,
	}

	// When cg == nil this statement will need to keep track of various
	// connections they are prepared on and record the stmt dependency on
	// the DB.
	if cg == nil {
		stmt.css = []connStmt{{dc, ds}}
		stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
		db.addDep(stmt, stmt)
	}
	return stmt, nil
}

// connStmt returns a free driver connection on which to execute the
// statement, a function to call to release the connection, and a
// statement bound to that connection.
func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
	if err = s.stickyErr; err != nil {
		return
	}
	s.mu.Lock()
	if s.closed {
		s.mu.Unlock()
		err = errors.New("sql: statement is closed")
		return
	}

	// In a transaction or connection, we always use the connection that the
	// stmt was created on.
	if s.cg != nil {
		s.mu.Unlock()
		dc, releaseConn, err = s.cg.grabConn(ctx) // blocks, waiting for the connection.
		if err != nil {
			return
		}
		return dc, releaseConn, s.cgds, nil
	}

	s.removeClosedStmtLocked()
	s.mu.Unlock()

	dc, err = s.db.conn(ctx, strategy)
	if err != nil {
		return nil, nil, nil, err
	}

	s.mu.Lock()
	for _, v := range s.css {
		if v.dc == dc {
			s.mu.Unlock()
			return dc, dc.releaseConn, v.ds, nil
		}
	}
	s.mu.Unlock()

	// No luck; we need to prepare the statement on this connection
	withLock(dc, func() {
		ds, err = s.prepareOnConnLocked(ctx, dc)
	})
	if err != nil {
		dc.releaseConn(err)
		return nil, nil, nil, err
	}

	return dc, dc.releaseConn, ds, nil
}