一、并发问题

redigoReceive()
1
Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.
Do()Receive()Do()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func incr(i int) {
    fmt.Println("Start thread", i)
    err := gCoon.Send("incrby", "nKey", fmt.Sprintf("%d", i))
    checkErr(err)
 
    err = gCoon.Flush()
    checkErr(err)
 
    time.Sleep(time.Second * time.Duration(i))
 
    rs, err := redis.Int(gCoon.Receive())
    checkErr(err)
 
    fmt.Printf("Thread %d Incr success, result is %d
", i, rs)
}
INCRBYnKey + iSend()Flush()Receive()

主函数中开启两个gorutine运行这段代码:

1
2
go incr(5)
go incr(1)

运行结果:

1
2
3
4
Start thread 5
Start thread 1
Thread 1 Incr success, result is 5
Thread 5 Incr success, result is 6
线程5线程1线程1线程11incr nKey 1。线程5
线程安全的连接池
1
For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

二、连接池

redigo
1
2
3
4
5
6
7
8
ype Pool struct {
    Dial func() (Conn, error) // 连接redis的函数
    TestOnBorrow func(c Conn, t time.Time) error  // 测试空闲线程是否正常运行的函数
    MaxIdle int  //最大的空闲连接数,可以看作是最大连接数
    MaxActive int  //最大的活跃连接数,默认为0不限制
    IdleTimeout time.Duration  //连接超时时间,默认为0表示不做超时限制
    Wait bool  //当连接超出数量先之后,是否等待到空闲连接释放
}

一个连接池创建的示例为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func newPool(addr string) *redis.Pool {
  return &redis.Pool{
    MaxIdle: 3,
    IdleTimeout: 240 * time.Second,
    Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
  }
}
 
var (
  pool *redis.Pool
  redisServer = flag.String("redisServer", ":6379", "")
)
 
func main() {
  flag.Parse()
  pool = newPool(*redisServer)
  ...
}
pool.Get()Do()conn.Close()
NewPool()
1
2
3
4
5
6
// NewPool creates a new pool.
//
// Deprecated: Initialize the Pool directory as shown in the example.
func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
    return &Pool{Dial: newFn, MaxIdle: maxIdle}
}

三、示例代码

一个完整的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main
 
import (
    "github.com/garyburd/redigo/redis"
    "fmt"
)
 
//连接池的连接到服务的函数
func newPoolFunc()(redis.Conn, error){
    return redis.Dial("tcp", ":6379")
}
 
//生成一个连接池对象
func newPool()(* redis.Pool){
    return &redis.Pool{
        MaxIdle: 10,
        Dial: newPoolFunc,
        Wait: true,
    }
}
 
//错误检测
func checkErr(err error){
    if err != nil{
        panic(err)
    }
}
 
func main(){
    pool := newPool()
    conn := pool.Get()
    defer conn.Close()
 
    rs, err := redis.Strings(conn.Do("KEYS", "*"))
    checkErr(err)
 
    fmt.Println(rs)  // [nKey]
    fmt.Println(pool.ActiveCount())  // 1
 
    conn.Close()
}