golang.org/x/sync/singleflight
建议结合源码阅读本文
缓存击穿
在做高并发的服务时,不可避免的会遇到缓冲击穿的问题。缓冲击穿一般是说,当高并发流量缓存过期的情况下,出现大量请求从数据库读取相同数据的情况。这种情况下数据库的压力将瞬间增大。为了避免这种情况,一般有几种解决方案: 1. 缓存永不过期,缓存做主动更新。 2. 在使用缓存时,先检查缓存的过期时间,如果将要过期时,将过期时间延长到指定时间(避免其他服务也主动更新),再主动做缓存更新(更新后,设置新的超时时间)。 3. 加互斥锁,在db查询结束后,统一返回数据。(本文主要使用介绍用singleflight 来实现该方法) 这种方法的弊端是,只是单进程限制同时只能有一个请求。
如何使用 singleflight 解决缓存击穿
fn
1
2
3
4
5
6
7
8
9
10
11
12
type Result struct {
Val interface{}
Err error
Shared bool
}
// 同步返回结果
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {}
// 返回channel,异步返回结果
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {}
// 取新结果,不使用正在请求的结果
func (g *Group) Forget(key string){}
一个简单的例子
包中提供了同步访问和异步访问两种调用。我们需要用一个简单的例子来做说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
var count int32
g := &singleflight.Group{}
res := []<-chan singleflight.Result{}
for i := 0; i < 10; i++ {
key := "hello"
res = append(res, g.DoChan("getdata", func() (interface{}, error) {
// mock db query
iter := atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(time.Microsecond))
// return val
return key + strconv.Itoa(int(iter)), nil
}))
}
for i := 0; i < 10; i++ {
dat := <-res[i]
fmt.Println(dat.Val)
}
}
hello1
singleflight 的实现
singleflightsignleflightfnfnfnkeyfnfn
错误类型的定义
panicruntime.Goexit
1
2
3
4
5
var errGoexit = errors.New("runtime.Goexit was called")
type panicError struct {
value interface{}
stack []byte
}
执行中程序的调用
对于每一次执行fn,会构造一个call 结构体,用于将结果返回给等待的协程。doCall 则为 fn 的调用执行方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
type call struct {
wg sync.WaitGroup
val interface{} // 调用的返回值
err error // 调用执行失败后的错误
forgotten bool // 标记是否下次调用时不使用正在调用的fn的结果
dups int // 标记有多少调用方在等待fn 的结果
chans []chan<- Result // 等待结果的channal
}
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// 为了能捕获到 goexit, 需要使用defer 来判断 (与panic 错误区分)
// 实际上 goexit 无法捕获,只能通过标记 panic 和正常退出来排除
// 第一个defer 是对执行结果的处理
defer func() {
// 非正常退出和panic, 则为 goexit 退出
if !normalReturn && !recovered {
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok { // Panic 错误, 这种panic 无法捕获
if len(c.chans) > 0 {
//对于 DoChan 的调用方式
go panic(e)
select {} // 保证 `go panic(e)` 的执行,并且 panic 无法被捕获。
} else {
panic(e)
}
} else if c.err == errGoexit { // errGoexit 已经用排除法处理
} else {
// 正常返回, 分发返回结果
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
// 执行 fn 方法,捕获 panic
func() {
defer func() {
if !normalReturn {
// 捕获 recover 错误
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
// 如果非正常返回,则是通过 recover 的方式执行的 | 因为 goexit 方式不会走到这里
recovered = true
}
}
接口的实现
同步方式的调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
// 执行中,则加入等待
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 调用执行
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
异步方式的调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
// 如果正在执行,则加入到chan中,等待
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
// 构造call
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 异步执行
go g.doCall(c, key, fn)
return ch
}
设置下次调用不适用正在执行的结果
1
2
3
4
5
6
7
8
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
总结
从上述代码可以看出,做一个防止穿透的小功能,简单而不简约。需要考量的地方还是挺多(如何截获panic,如何判断goexit 等)。如下是内容总结:
goexit 用于退出某个协程,但是之前注册的defer 方法仍然将会被执行。
- 返回的error,不仅可能是正常逻辑错误,或者goexit 错误, 还有可能直接panic。
- DoChan 调用方式,如果fn出现panic,该panic将无法被捕获,程序将退出。(Do 方式可以捕获panic), 而 Do 调用则可以捕获。例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go func() {
defer wg.Done()
key := "hello"
defer func() {
if r := recover(); r != nil {
fmt.Println("[[[", r, "]]]") // 此处可以捕获到panic. 由 doCall 方法中捕获后再次抛出的异常
}
}()
_, _, _ = g.Do("getdata", func() (interface{}, error) {
iter := atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(time.Microsecond))
panic("panic")
return key + strconv.Itoa(int(iter)), nil
})
}()