背景

noi项目中,涉及多个模型(数据表)的多种计数(count值),目前主要包含:

  • 习题:题解数、提交数、通过数
  • 评论:点赞数、回复数
  • 题解:浏览数、点赞数、评论数

用户在使用过程中,会频繁地操作这些计数,比如:点赞、取消点赞、评论、回复、写题解、做题等等,包括读取、写入,写入主要包含递增和递减,如果每次操作都等主业务流程结束后,同步去修改数据库,一是会影响性能,二是会增加数据库锁竞争的开销,所以,将这些琐碎、零散的操作单独提取出来,使用redis作为中间缓存,异步更新数据库是很有必要的。

注意:

  • redis使用的是集群模式
  • 数据库中,每张表有存储这些计数的字段
方案

由于对这些计数的操作都很简单,要么是读取、要么是递增或递减,不会涉及到业务的处理,所以可以很好地结合redis的incr和decr两个命令来保存这些计数,并通过定时任务,将变化的数据更新到数据库,但是存在以下问题:

  • 针对多个模型、不同种类的计数,redis中的key如何设计,才方便业务上识别
  • 一次请求,需要同时获取多个计数时,如何只请求一次redis
  • 如何判断哪些数据是脏数据,哪些数据需要修改
  • 对于缓存中的数据,怎么判断哪些是已经同步到数据库的
  • redis故障或数据丢失,如何保证每次操作数据的正确性

key设计

由于涉及到多个模型,并且每个模型含有多种计数需要缓存,只能通过key来做区分,所以可以在key中包含模型名、计数类型、主键id,比如:problems:1001:like_count,这样设计之后,就可以对应上problems表中,id为1001这条记录的like_count字段的值,当页面发起点赞或取消点赞的操作时,只需要incr或decr这个key即可。

但是存在一个问题,查询时,需要同时查询多个count值,比如同时查询一个习题的点赞数、评论数,就要从redis查询两次,key分别为:problems:1001:like_count、problems:1001:comment_count,这两个key经过计算,会被分配到不同的槽位,集群模式下如果跨槽位进行mget的话是不允许的,会报错:

MGET : CROSSSLOT Keys in request don't hash to the same slot

redis中有一个机制,Hash Tag机制:允许用key的部分字符串来计算hash,当一个key包含 {} 的时候,就不对整个key做hash,而仅对 {} 包括的字符串做hash,所以我们的key可以这么设计:{模型名:模型主键id}:计数类型,这样就可以保证同一条数据对应的不同计数都分配到同一个槽位,在查询是就可以使用mget了。

同步机制

刚开始想的同步机制是通过定时任务,分批扫描数据库,遍历扫描的数据库记录,mget这条记录对应的缓存,对比每一个count值,相同则跳过,不同则认为是脏数据。但是这种方案太重,不管缓存数据有无变化、变化多少,都需要全量扫描数据库,并且在扫描过程中,还会产生新的脏数据。

最后采取的方案是:针对每个模型,在redis中再缓存一份数据,使用hash结构,用来记录该模型哪些数据的计数发生了变更,避免一个模型的数据都存到一个hash内成为大key,对每个模型使用的hash key进行分片,比如一个模型分成16个hash key,用模型主键id取模分片数,决定分配到哪个分片

hash中存储的数据结构为:模型主键id-计数类型:count值

总体hash存储的结构举例如下:

		problems_1: {
							1001-like_count=12,
							1001-comment_count=12,
							1002-like_count=10
					}

这样一来,每次定时任务就遍历所有的hash key,然后对每个hash内的数据进行scan,解析scan的结果,修改数据库即可,每次完成同步操作之后,删除对应的key,这样就保证下次定任务再scan时,不会再重复操作。

如果redis故障或者数据丢失,由于项目内对所有计数都修改都会在查询操作之后,所以统一在查询的时候,如果在缓存中查不到缓存的count值,就将数据库中的count值通过setnx设置到缓存中。

流程图

写缓存

定时任务

读缓存

代码实现

相关约定

这里申明了所有模型、计数类型(字段名)的常量,key的生成规则等。

const (
	//涉及到count的模型
	Problems  = "problems"
	Solutions = "solutions"
	Comments  = "comments"

	//count相关的字段
	LikeCount       = "like_count"
	DislikeCount    = "dislike_count"
	CommentCount    = "comment_count"
	SolutionCount   = "solution_count"
	PassedCount     = "passed_count"
	SubmissionCount = "submission_count"
	ViewCount       = "view_count"
	AwesomeCount    = "awesome_count"
	ConfusedCount   = "confused_count"
	ReplyCount      = "reply_count"

	//操作类型
	Add = 1  //加
	Sub = -1 //减
)

var (
	//缓存count的key模板,eg:{problems:1001}:like_count
	keyPattern = "{%s:%d}:%s"
	//hash字典key模板,eg:problems_1
	hashKeyPattern = "%s_%d"
	//hash字典域名模板,eg:1001-like_count
	hashFieldPattern = "%d-%s"
	//redis hash key分片数(避免大key)
	KeySliceSize = 16
)

func GetKeyName(modelName string, modelId uint, fieldName string) string {
	return fmt.Sprintf(keyPattern, modelName, modelId, fieldName)
}

func GetHashKeyName(modelName string, keySlice int) string {
	return fmt.Sprintf(hashKeyPattern, modelName, keySlice)
}

func GetHashFieldName(modelId uint, fieldName string) string {
	return fmt.Sprintf(hashFieldPattern, modelId, fieldName)
}

写缓存

这里主要是根据约定将count值,set到对应的key,并记录到hash中

func updateCachedCount(ctx context.Context, modelName, fieldName string, modelId uint, operatorType int) (int64, error) {
	var key = types.GetKeyName(modelName, modelId, fieldName)
	var resultCount int64
	var err error
	//1.修改缓存中count的值
	if operatorType == types.Add {
		resultCount, err = db.Incr(ctx, key)
	} else {
		resultCount, err = db.Decr(ctx, key)
	}
	if err != nil {
		return resultCount, err
	}
	//2.count修改成功后,修改对应的hash字典,该字典就用来表示发生变化的数据
	//2.1.根据id,找到hash字典的分片key
	var hashKey = types.GetHashKeyName(modelName, int(modelId)%types.KeySliceSize)
	var hashFieldName = types.GetHashFieldName(modelId, fieldName)
	//eg: hset problems_1 1001-like_count 10
	_, err = db.HSet(ctx, hashKey, hashFieldName, resultCount)
	if err != nil {
		//手动回滚(没用redis事务:redis的事务只是将命令打包执行,并不能保证原子性)
		var e1 error
		if operatorType == types.Add {
			resultCount, e1 = db.Decr(ctx, key)
		} else {
			resultCount, e1 = db.Incr(ctx, key)
		}
		if e1 != nil {
			return resultCount, err
		}
		return resultCount, err
	}
	return resultCount, err
}

定时任务

在main函数中开启定时任务:

	// scan redis count to mysql
	go task.ExecuteCron(context.Background())

定时任务:使用github.com/robfig/cron/v3实现

func ExecuteCron(ctx context.Context) {

	c := newWithSeconds()
	//定时任务:如果没有配置,默认每小时执行一次
	spec := configs.Config.Cron.Spec
	if spec == "" {
		spec = "@every 1h"
	}
	var e error
  //习题相关
	_, e = c.AddFunc(spec, func() {
		service.ScanChangedCountAndUpdateDB(ctx, types.Problems)
	})
	if e != nil {
		klog.Error(e)
	}
  //题解相关
	_, e = c.AddFunc(spec, func() {
		service.ScanChangedCountAndUpdateDB(ctx, types.Solutions)
	})
	if e != nil {
		klog.Error(e)
	}
  //评论相关
	_, e = c.AddFunc(spec, func() {
		service.ScanChangedCountAndUpdateDB(ctx, types.Comments)
	})
	if e != nil {
		klog.Error(e)
	}
	c.Start()
}

//在linux环境下,cron表达式不支持秒级别的,需要设置下
func newWithSeconds() *cron.Cron {
	secondParser := cron.NewParser(cron.Second | cron.Minute |
		cron.Hour | cron.Dom | cron.Month | cron.DowOptional | cron.Descriptor)
	return cron.New(cron.WithParser(secondParser), cron.WithChain())
}

扫描hash key

func ScanChangedCountAndUpdateDB(ctx context.Context, modelName string) {
	var hashKey string
	//遍历每个分片
	klog.Infof("Scan redis count cache of [%s] starting...", modelName)
	start := time.Now()
	for i := 0; i < types.KeySliceSize; i++ {
		hashKey = types.GetHashKeyName(modelName, i)
		var cursor uint64
		for {
			fields, cur, err := db.HScan(ctx, hashKey, cursor, "*-*_count", 10)
			cursor = cur
			if err != nil {
				klog.Errorf("HScan Error:%q", err)
				break
			}
			handleScannedData(fields, modelName, ctx, hashKey)
			if cursor == 0 {
				break
			}
		}
		//key对应的缓存都scan完毕后,删除key
		size, _ := db.HLen(ctx, hashKey)
		if size != 0 {
			_, e := db.Del(ctx, hashKey)
			if e != nil {
				klog.Errorf("删除key失败:%s , [error]:%q", hashKey, e)
			}
		}
	}
	cost := time.Now().Sub(start).Seconds()
	klog.Infof("Scan redis count cache of [%s] finished,cost: [%.2f] second", modelName, cost)
}

解析扫描的数据

func handleScannedData(fields []string, modelName string, ctx context.Context, hashKey string) {
	var id int
	var fieldName string
	var fieldValue int
	var param = make(map[string]interface{})
	for i, key := range fields {
		//所有的k-v会封装为[]string返回:当前index为key index+1就为value
		if strings.Contains(key, "count") {
			split := strings.Split(key, "-")
			id, _ = strconv.Atoi(split[0])
			fieldName = split[1]
			//val, _ := db.HGet(ctx, hashKey, fieldName)
			fieldValue, _ = strconv.Atoi(fields[i+1])
			//封装为Map,更新数据库
			param[fieldName] = fieldValue
			err := updateDB(modelName, uint(id), param)
			if err != nil {
				klog.Errorf("更新数据库Count失败,表名:%s, ID:%d, 参数:%s , [error]:%q", modelName, id, param, err)
				return
			}
			//如果没有发生异常,删除hash的field
			_, _ = db.HDel(ctx, hashKey, fieldName)
		}
	}
}

修改数据库(这一步违反了开闭原则,可优化)

func updateDB(modelName string, id uint, param map[string]interface{}) error {
	var err error
	switch modelName {
	case types.Problems:
		var problem = &model.Problem{ID: id}
		err = problem.UpdateByMap(param)
	case types.Solutions:
		var solution = &model.Solution{ID: id}
		err = solution.UpdateByMap(param)
	case types.Comments:
		var comment = &model.Comment{ID: id}
		err = comment.UpdateByMap(param)
	}
	return err
}

gorm部分举例:

func (cmt *Comment) UpdateByMap(params map[string]interface{}) error {
	conn := db.GetDBConnection()
	return conn.Model(cmt).Updates(params).Error
}

读缓存

该方案不足的点,每次读取时,去查询缓存时,需要交互多次

	var data []interface{}
	var likeCountKey string
	var replyCountKey string
	//list为从数据库查询的列表
	for i, value := range list {
		likeCountKey = types.GetKeyName(types.Comments, value.ID, types.LikeCount)
		replyCountKey = types.GetKeyName(types.Comments, value.ID, types.ReplyCount)
		//点赞数、回复数
		ret, err := db.MGet(ctx, likeCountKey, replyCountKey)
		if err != nil {
			klog.Errorf("QueryCommentByConditionPage.MGet Error:%q", err)
		} else {
			list[i].LikeCount = uint(db.CheckRedisExist(ctx, likeCountKey, ret[0], int(value.LikeCount)))
			list[i].ReplyCount = db.CheckRedisExist(ctx, replyCountKey, ret[1], value.ReplyCount)
		}
		data = append(data, list[i])
	}

判断缓存中是否有值,没有则以数据库中的为准

func CheckRedisExist(ctx context.Context, key string, redisValue interface{}, dbValue int) int {
	var count int
	//如果缓存中没有,将数据库中的值set到缓存
	if redisValue == nil {
		_, _ = SetStrNxCacheTtl(ctx, key, dbValue, 0)
		count = dbValue
	} else {
		c, _ := strconv.Atoi(redisValue.(string))
		count = c
	}
	return count
}