需求背景
我们的服务近期刚开始上线,之前在测试环境一直都是单实例运行。其中有一项功能是搜集kubernetes pod中的日志,上线之后发现,日志搜集有重复,问题定位较容易,因为重复数量恰好是我们线上服务的副本数,想到没有做分布式锁。线上运行的每个实例都在搜集每个pod的日志,导致日志重复。
对于实现分布式锁,最常见的是通过zk、redis来实现,因为项目刚开始上线,目前暂不打算额外引入其他中间件,所以打算通过mysql来实现分布式锁。参考了以下两篇文章:
mysql实现分布式锁
简单实现依赖 MySQL 的分布式锁
通过对比 上面第一篇文章中提高的集中方式,我们采用了文章中提到的第一种方式,也就是基于表主键唯一做分布式锁。
文章提到的集中方式中,基于mysql排他锁来实现是最简单的方式,缺点是可能引起表锁。再者,我们要搜集日志的pod数量是不断变化的,所以通过select~for update这种方式前提还需要不断的更新维护数据。既然如此,不如直接insert更新数据时,利用主键唯一直接来实现分布式锁。这两点最大区别是一个利用了mysql自身的排他锁,一个则是自己像mysql写入主键相同的数据来实现。
另外,文章中针对这种方式提出的几个问题,我们也有一些常见的场景:
- mysql单点问题:我们项目直接使用了公司层面的数据库,本身是高可用的,单点问题不用我们考虑。
- 可重入锁问题:项目中对于pod日志收集的实现,是通过每个pod单独开启一个协程去处理的,并且搜集日志时,保持一个长链接,read流不断搜集日志,即在我们目前的场景中是可以忽略分布式锁不可重入的问题的。
针对这一点,扩展到golang项目中,在golang开发中使用多个协程处理公共资源一般情况下不需要考虑锁的重入问题,因为golang 协程相对java中的线程轻量,一般不像在java中那些使用线程池对线程进行复用,协程处理完任务golang进行回收,所以不存在协程再次需要获取锁的情况。对于java多线程的情况,一般考虑在数据库中增加字段owner来记录当前资源在被哪个线程占用,java中线程可以直接获取唯一标示threadname写入到owner,golang中这点也略微麻烦 - 锁释放问题,分布式锁需要设置超时时间,避免在获取锁的协程挂掉之后锁不能被释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | //分布式锁实现代码 const TIMEOUT int64 = 10 * 60 // mysql table struct type distributedLock struct { Id string `gorm:"primary_key"` ExpireTime int64 } func (act *Activity) TryLock(id string) bool { // clean timeout lock act.deleleExpiredLock(id) // prepare data var now = time.Now().Unix() expireTime := now + TIMEOUT var newLock = distributedLock{ Id: id, ExpireTime: expireTime, } // insert lock if err := act.db.Table("distributed_lock").Create(&newLock).Error; err != nil { return false } return true } func (act *Activity) deleleExpiredLock(id string) { var now = time.Now().Unix() act.db.Table("distributed_lock").Where("id = ? AND expire_time < ?", id, now).Delete(distributedLock{}) } |
1 2 3 4 5 6 7 8 9 10 | go func() { //等待日志生成 time.Sleep(time.Second * 10) for { //定时获取搜集的pod,将每个pod交由一个协程处理 act.pollPods() timer := time.NewTimer(time.Second * 10) <-timer.C } }() |
1 2 3 4 5 6 7 8 9 10 11 12 | for _, podItem := range podList.Items { pod := podItem go func() { podName := pod.GetObjectMeta().GetName() if !act.TryLock(podName) { return } //搜集日志任务 act.writeActivityInfo(&pod) }() } } |