1.1.项目介绍
cache2go是一款由golang实现的本地缓存库,提供并发安全的读写操作,具有过期时间控制等特性。项目地址:https://github.com/muesli/cache2go
1.2.使用方法
go get github.com/muesli/cache2go
- 核心操作API:
- Cache:创建Cache Table
- Add:添加Cache
- Value:读取Cache
- Delete:删除指定Key Cache
- Flush:清空整个Cache Table
package main
import (
"github.com/muesli/cache2go"
"log"
"time"
)
type Item struct {
Name string `json:"name"`
Prices int64 `json:"prices"`
Stocks int64 `json:"stocks"`
}
func basicOpTest() {
// 初始化itemCache本地缓存
itemCache := cache2go.Cache("itemCache")
item := &Item{
Name: "MacBookPro",
Prices: 10000,
Stocks: 1,
}
// 添加item1缓存,过期时间为5秒钟
itemCache.Add("item1", 5*time.Second, item)
// 读取item1缓存
if v, err := itemCache.Value("item1"); err != nil {
log.Printf("item1 err = %v", err)
} else {
log.Printf("读取item1缓存:%#v", v.Data())
}
// 睡眠6s后读取
time.Sleep(6 * time.Second)
if v, err := itemCache.Value("item1"); err != nil {
log.Printf("item1 err = %v", err)
} else {
log.Printf("6s后读取item1缓存:%#v", v.Data())
}
// 添加item2,不设置过期时间
itemCache.Add("item2", 0, item)
// 读取item2缓存
if v, err := itemCache.Value("item2"); err != nil {
log.Printf("item2 err = %v", err)
} else {
log.Printf("读取item2缓存:%#v", v.Data())
}
// 删除掉item2缓存
itemCache.Delete("item2")
// 再读取item2缓存
if v, err := itemCache.Value("item2"); err != nil {
log.Printf("item2 err = %v", err)
} else {
log.Printf("读取item2缓存:%#v", v.Data())
}
// 添加item3缓存,并删除所有缓存
itemCache.Add("item3", 0, item)
itemCache.Flush()
// 读取item3缓存
if v, err := itemCache.Value("item3"); err != nil {
log.Printf("item3 err = %v", err)
} else {
log.Printf("读取item3缓存:%#v", v.Data())
}
}
运行结果:
2022/10/17 20:52:00 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 20:52:06 item1 err = Key not found in cache
2022/10/17 20:52:06 读取item2缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 20:52:06 item2 err = Key not found in cache
2022/10/17 20:52:06 item3 err = Key not found in cache
- 操作回调API:
- AddAddedItemCallback:新增Cache回调函数
- AddAboutToDeleteItemCallback:删除Cache回调函数
- AddAboutToExpireCallback:过期CacheItem回调函数
- 以上三个方法分别再对应着RemoveXXX,表示删去对应操作的全部回调函数
func callBackTest() {
// 初始化itemCache本地缓存
itemCache := cache2go.Cache("itemCache")
// 设置各操作回调函数
itemCache.AddAddedItemCallback(func(item *cache2go.CacheItem) {
log.Printf("added callback, item = %#v", item)
})
itemCache.AddAboutToDeleteItemCallback(func(item *cache2go.CacheItem) {
log.Printf("deleted callback, item = %#v", item)
})
item := itemCache.Add("expire_item", 1*time.Second, Item{
Name: "expire_item",
Prices: 1,
Stocks: 1,
})
item.AddAboutToExpireCallback(func(item interface{}) {
log.Printf("expired callback, item = %#v", item)
})
// 执行基本操作
basicOpTest()
}
输出结果
2022/10/17 21:12:09 added callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
2022/10/17 21:12:09 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 21:12:10 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"expire_item", data:main.Item{Name:"expire_item", Prices:1, Stocks:1}, lifeSpan:1000000000, createdOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {}){(func(interface {}))(0x10a0530)}}
2022/10/17 21:12:10 expired callback, item = "expire_item"
2022/10/17 21:12:14 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55eaa820, ext:755728, loc:(*time.Location)(0x1187880)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
// ...
- 设置自定义缓存加载器:SetDataLoader
func dataLoaderTest() {
// 初始化itemCache本地缓存
redisItemCache := cache2go.Cache("redisItemCache")
// 设置自定义的cache加载逻辑
redisItemCache.SetDataLoader(func(key interface{}, args ...interface{}) *cache2go.CacheItem {
// 如果是redis开头的key,先从redis中获取
if strings.HasPrefix(key.(string), "redis") {
return cache2go.NewCacheItem(key, 0, Item{
Name: "redis_item",
})
}
return nil
})
// 写入一条数据
redisItemCache.Add("item1", 0, Item{
Name: "item1",
})
item1, _ := redisItemCache.Value("item1")
log.Printf("item1 = %#v", item1)
redisItem, _ := redisItemCache.Value("redis_item")
log.Printf("redisItem = %#v", redisItem)
}
输出结果
2022/10/17 21:59:37 item1 = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:main.Item{Name:"item1", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601954b8, ext:492934, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d260196840, ext:497913, loc:(*time.Location)(0x11858c0)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
2022/10/17 21:59:37 redisItem = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"redis_item", data:main.Item{Name:"redis_item", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
2.源码分析
2.1.项目结构
核心代码文件为:
- cachetable.go:封装了CacheTable结构体,实现Cache列表相关操作API
- cacheitem.go:封装了CacheItem结构体,实现了Cache对象相关操作API
- cache.go:提供cache全局map,存储了CacheTable结构体与table名称映射
2.2.数据结构
- CacheTable
type CacheTable struct {
sync.RWMutex
// The table's name.
name string
// All cached items.
items map[interface{}]*CacheItem
// Timer responsible for triggering cleanup.
cleanupTimer *time.Timer
// Current timer duration.
cleanupInterval time.Duration
// The logger used for this table.
logger *log.Logger
// Callback method triggered when trying to load a non-existing key.
loadData func(key interface{}, args ...interface{}) *CacheItem
// Callback method triggered when adding a new item to the cache.
addedItem []func(item *CacheItem)
// Callback method triggered before deleting an item from the cache.
aboutToDeleteItem []func(item *CacheItem)
}
- CacheItem
type CacheItem struct {
sync.RWMutex
// The item's key.
key interface{}
// The item's data.
data interface{}
// How long will the item live in the cache when not being accessed/kept alive.
lifeSpan time.Duration
// Creation timestamp.
createdOn time.Time
// Last access timestamp.
accessedOn time.Time
// How often the item was accessed.
accessCount int64
// Callback method triggered right before removing the item from the cache
aboutToExpire []func(key interface{})
}
2.3.API代码流程
1.Cache
位于cache.go文件,维护了全局CacheTable Map。
var (
// 全局cache map
cache = make(map[string]*CacheTable)
// cache map 读写锁
mutex sync.RWMutex
)
// 从cache map中获取对应的CacheTable,不存在则创建新的
func Cache(table string) *CacheTable {
// 先上读锁,获取cacheTable
mutex.RLock()
t, ok := cache[table]
mutex.RUnlock()
// 不存在,则新建
if !ok {
// 写操作需要上写锁
mutex.Lock()
t, ok = cache[table]
// 双重校验是否存在
if !ok {
// 不存在则新建cacheTable
t = &CacheTable{
name: table,
items: make(map[interface{}]*CacheItem),
}
cache[table] = t
}
mutex.Unlock()
}
return t
}
2.Add
位于cachetable.go文件,是CacheTable结构体的方法之一,实现了添加KV缓存的逻辑。
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
// 封装一个item
item := NewCacheItem(key, lifeSpan, data)
// 锁表,将item添加进去
table.Lock()
table.addInternal(item)
return item
}
func (table *CacheTable) addInternal(item *CacheItem) {
// 添加kv值到map中
table.items[item.key] = item
expDur := table.cleanupInterval
addedItem := table.addedItem
// 添加完成解除写锁
table.Unlock()
// 触发Add回调函数
if addedItem != nil {
for _, callback := range addedItem {
callback(item)
}
}
// 如果一个item有设置过期时间,且比检查失效间隔小,则进行过期key清理(懒加载思想,只有存在这类Key才会启动清理,而不是定时任务)
if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
table.expirationCheck()
}
}
func (table *CacheTable) expirationCheck() {
table.Lock()
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
if table.cleanupInterval > 0 {
table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
} else {
table.log("Expiration check installed for table", table.name)
}
now := time.Now()
smallestDuration := 0 * time.Second
for key, item := range table.items {
// 遍历该table下的所有items
item.RLock()
lifeSpan := item.lifeSpan
accessedOn := item.accessedOn
item.RUnlock()
if lifeSpan == 0 {
continue
}
if now.Sub(accessedOn) >= lifeSpan {
// 该item已超出存活时间,删除key
table.deleteInternal(key)
} else {
// 找到最小的需要过期的item,计算最优时间间隔
if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
smallestDuration = lifeSpan - now.Sub(accessedOn)
}
}
}
// 在最优时间间隔后启动定时任务检查table的过期key
table.cleanupInterval = smallestDuration
if smallestDuration > 0 {
table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
go table.expirationCheck()
})
}
table.Unlock()
}
3.Value
Value用于读取Key匹配的CacheItem。
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
table.RLock()
// 先尝试从items中获取该item
r, ok := table.items[key]
loadData := table.loadData
table.RUnlock()
if ok {
// 如果存在,则更新该item的accessOn时间和accessCount计数
r.KeepAlive()
return r, nil
}
// 如果不存在,则从loadData自定义加载函数中尝试获取
if loadData != nil {
item := loadData(key, args...)
if item != nil {
// 如果自定义加载函数中存在该item,则添加到table中并返回
table.Add(key, item.lifeSpan, item.data)
return item, nil
}
return nil, ErrKeyNotFoundOrLoadable
}
return nil, ErrKeyNotFound
}
4.Delete
Delete函数用于删除指定Key的Item。
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
table.Lock()
defer table.Unlock()
return table.deleteInternal(key)
}
func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
// 判断key是否存在
r, ok := table.items[key]
if !ok {
return nil, ErrKeyNotFound
}
aboutToDeleteItem := table.aboutToDeleteItem
table.Unlock()
// 先触发删除回调函数
if aboutToDeleteItem != nil {
for _, callback := range aboutToDeleteItem {
callback(r)
}
}
r.RLock()
defer r.RUnlock()
// 触发item的过期回调函数
if r.aboutToExpire != nil {
for _, callback := range r.aboutToExpire {
callback(key)
}
}
table.Lock()
table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
// 将item从table中删除
delete(table.items, key)
return r, nil
}
5.Flush
清空整个table的cache。
func (table *CacheTable) Flush() {
table.Lock()
defer table.Unlock()
table.log("Flushing table", table.name)
// 直接将items重新初始化
table.items = make(map[interface{}]*CacheItem)
table.cleanupInterval = 0
if table.cleanupTimer != nil {
// 如果此时还有清理过期定时器,则终止其运行
table.cleanupTimer.Stop()
}
}
3.总结
cache2go这个项目写得很精简,本质上就是使用到了map来作为本地缓存kv存储结构,但是有一些值得学习的地方:
- 使用map时,由于其是非线程安全的,所以在并发场景下需要上锁,可以选择RWMutex读写锁来控制并发的读和串行写,避免panic
- 对于需要清理过期Key的场景,如果使用定时任务定时遍历整个集合来做清理,会耗费较多时间和资源,可以由写入时判断是否存在需要清理的Key,再启动定时任务来做清理,避免频繁遍历
- 可以利用golang函数式的特性,方便地实现各操作回调函数,比如添加、删除、失效操作回调等
- 另外个人觉得这个项目可以优化的空间:由于使用的是本地缓存,为了避免内存oom可以在创建时指定限制key的最大数量,以及在内存不足时的写入策略(如直接报错或者随机清理掉一批Key等)。