前言
前提需要:
- Python基础
- peewee
- 多线程
〇、超卖问题是如何产生的
库存扣减的并发问题
注意这时候t2来了
一、什么是分布式锁,有什么用
传统的单体应用使用本地锁(synchronized、reentrantLock),随着分布式的快速发现者,本地锁无法解决并发问题,需要一种能跨微服务/跨虚拟机的锁机制->分布式锁
作用:
- 并发正确性(资源独占)
- 效率:避免重复处理
- 互斥性:基本功能,一个获取锁,另外一个就不能获取
- 可重入性能:一个线程获取到锁之后,可以再次获取(多次获取)
- 锁超时:持有锁的线程挂掉后,一定时间锁自动释放
- 高效:加锁/释放锁速度快
- 高可用:集群、容灾
- 支持阻塞和非阻塞
- 支持公平锁和非公平锁
常用的分布式锁中间件:
- mysql
- redis
- zookeeper
- etcd
- chubby
这里用到的是mysql和redis 其他的原理其实都差不多
二、基于mysql的乐观锁和悲观锁
最合适的才是最好的
优点:
- 简单
- 不需要额外的组件 - 维护,mysql的维护比较简单
缺点:
- 性能
多一个组件就要维护一个组件,每多一个组件系统的风险就会增大,一般不建议引用过多的组件
追求的不是功能的多样性,而是系统的可用性!
0. python目录下settings.py - mysql初始化
from playhouse.pool import PooledMySQLDatabase
from playhouse.shortcuts import ReconnectMixin
from loguru import logger
# 使用peewee的连接池,使用ReconnectMixin来防止出现连接断开查询失败
class ReconnectMysqlDatabase(ReconnectMixin, PooledMySQLDatabase):
# python的mro super(ReconnectMixin, self)不是调用父类(object)
pass
DB = ReconnectMysqlDatabase(database=*, host=*, port=*, user=*,password=*)
1. 悲观锁
就是比较悲观的锁,总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,
这样别人想拿这个数据就会阻塞直到它拿到锁
(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)
在高并发的情况下能别去麻烦数据库就一定不要去麻烦数据库 缓存也是一样的
因为本身mysql性能并不高 承受不住高并发 而挂掉
FOR UPDATE 仅适用于InnoDB存储引擎,且必须在事务区块(BEGIN/COMMIT)中才能生效。
使用python的peewee框架演示:
自己创建好数据库 和生成表结构 下列代码已实现表结构
import time
import threading
from random import randint
from datetime import datetime
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
from inventory_srv.settings import settings
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
db = ReconnectMySQLDatabase("mxshop_inventory_srv", host="localhost", port=3306, user="root", password="123456")
# 这个BaseModel可以忽略 只不过是重写了peewee中的cudr的一些操作 继承就行了
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = settings.DB
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁
import threading
R = threading.Lock()
def sell():
# 多线程下的并发带来的数据不一致的问题
goods_list = [(1, 99), (2, 20), (3, 30)]
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 查询库存
R.acquire() # 获取锁 负载均衡
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
import time
from random import randint
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
R.release() # 释放锁
if __name__ == "__main__":
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
t1.start()
t2.start()
t1.join()
t2.join()
go的gorm框架演示:
package main
import (
"fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
"log"
"os"
"sync"
"time"
)
type BaseModel struct {
ID int32 `gorm:"primary_key;comment:ID" json:"id"`
CreatedAt time.Time `gorm:"column:add_time;comment:创建时间" json:"-"`
UpdatedAt time.Time `gorm:"column:update_time;comment:更新时间" json:"-"`
DeletedAt gorm.DeletedAt `gorm:"comment:删除时间" json:"-"`
IsDeleted bool `gorm:"comment:是否删除" json:"-"`
}
type Inventory struct {
BaseModel
Goods int32 `gorm:"type:int;index;comment:商品id"`
Stocks int32 `gorm:"type:int;comment:仓库"`
Version int32 `gorm:"type:int;comment:分布式锁-乐观锁"`
}
var DB *gorm.DB
func InitDB() {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
"root", "123456", "localhost", 3306, "mxshop_inventory_srv2")
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)
logger.Config{
SlowThreshold: time.Second, // 慢 SQL 阈值
//LogLevel: logger.Info, // 日志级别
LogLevel: logger.Silent, // 日志级别
//IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound(记录未找到)错误
Colorful: true, // 禁用彩色打印
},
)
// 参考 https://github.com/go-sql-driver/mysql#dsn-data-source-name 获取详情
var err error
DB, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
},
Logger: newLogger,
})
if err != nil {
panic(err)
}
}
func main() {
InitDB()
gNum := 20
var Num int32 = 1
var wg sync.WaitGroup
wg.Add(gNum)
for i := 0; i < gNum; i++ {
go func() {
defer wg.Done()
tx := DB.Begin()
var inv Inventory
fmt.Println("开始获取锁")
if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&Inventory{Goods: 421}).First(&inv); result.RowsAffected == 0 {
panic("库存不存在")
}
if inv.Stocks < Num {
panic("库存不足")
}
fmt.Println("获取锁成功")
//time.Sleep(time.Second * 5)
inv.Stocks -= Num
tx.Save(&inv)
fmt.Println("开始释放锁")
tx.Commit()
fmt.Println("释放锁成功")
}()
}
wg.Wait()
}
只是做了多线程的演示 如果用于分布式系统这问题多了去了
2. 乐观锁
版本号
使用python的peewee框架演示:
def sell2():
# 演示基于数据库的乐观锁机制
goods_list = [(1, 10), (2, 20), (3, 30)]
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 查询库存
while True:
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"当前的版本号:{goods_inv.version}")
print(f"商品{goods_id} 售出 {num}件")
import time
from random import randint
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
#我当时查询数据的时候版本号是goods_inv.version
query = Inventory.update(stocks=Inventory.stocks - num, version=Inventory.version + 1).where(
Inventory.goods == goods_id, Inventory.version==goods_inv.version)
ok = query.execute()
if ok:
print("更新成功")
break
else:
print("更新失败")
go的gorm框架演示:
package main
import (
"fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
"log"
"mxshop_srvs/inventory_srv/model"
"os"
"sync"
"time"
)
type BaseModel struct {
ID int32 `gorm:"primary_key;comment:ID" json:"id"`
CreatedAt time.Time `gorm:"column:add_time;comment:创建时间" json:"-"`
UpdatedAt time.Time `gorm:"column:update_time;comment:更新时间" json:"-"`
DeletedAt gorm.DeletedAt `gorm:"comment:删除时间" json:"-"`
IsDeleted bool `gorm:"comment:是否删除" json:"-"`
}
type Inventory struct {
BaseModel
Goods int32 `gorm:"type:int;index;comment:商品id"`
Stocks int32 `gorm:"type:int;comment:仓库"`
Version int32 `gorm:"type:int;comment:分布式锁-乐观锁"`
}
var DB *gorm.DB
func InitDB() {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
"root", "123456", "localhost", 3306, "mxshop_inventory_srv2")
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)
logger.Config{
SlowThreshold: time.Second, // 慢 SQL 阈值
//LogLevel: logger.Info, // 日志级别
LogLevel: logger.Silent, // 日志级别
//IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound(记录未找到)错误
Colorful: true, // 禁用彩色打印
},
)
// 参考 https://github.com/go-sql-driver/mysql#dsn-data-source-name 获取详情
var err error
DB, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
},
Logger: newLogger,
})
if err != nil {
panic(err)
}
}
func main() {
InitDB()
gNum := 20
var Num int32 = 1
var wg sync.WaitGroup
wg.Add(gNum)
for i := 0; i < gNum; i++ {
go func() {
defer wg.Done()
//tx := DB.Begin()
for {
var inv Inventory
//fmt.Println("开始获取锁")
if result := DB.Where(&Inventory{Goods: 421}).First(&inv); result.RowsAffected == 0 {
panic("库存不存在")
}
if inv.Stocks < Num {
panic("库存不足")
}
//update inventory set stocks = stocks-1,version=version+1 where goods=goods and version=version
inv.Stocks -= Num
if result := DB.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version = ?", int32(421), inv.Version).Updates(model.Inventory{Stocks: inv.Stocks, Version: inv.Version + 1}); result.RowsAffected == 0 {
fmt.Println("扣减失败 - 重试")
} else {
fmt.Println("扣减成功")
break
}
}
}()
}
wg.Wait()
}
三、基于redis的分布式锁
基于redis的分布式锁的优缺点
优点:
- 性能高
- 简单
- redis本身使用很频繁,这样的话我们不需要去额外维护
缺点:
- 依赖了第三方组件
- 单机的redis挂掉的可能性相对较高 - redis的cluster redis的sentinel
- redis的cluster的引入会导致刚才的redis的锁会有些问题 - redlock(争议较大)
其他的分布式锁:
基于zookeeper的分布式锁(暂时没必要去了解,用java的话可以去看看)
不是说所有的分布式锁你都需要知道原理
基于redis的锁。尽量自己去完成 一定要看懂源码这是一个面试高频题:连环炮
以下基于python实现的是redis分布式锁解决高并发超卖
import redis
import time
import threading
from random import randint
from datetime import datetime
from peewee import *
from inventory_srv.settings import settings
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
db = ReconnectMySQLDatabase("mxshop_inventory_srv", host="localhost", port=3306, user="root", password="123456")
# 这个BaseModel可以忽略 只不过是重写了peewee中的cudr的一些操作 继承就行了
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = settings.DB
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁 这里没用到
# 写一个redis分布式锁
class Lock:
# 初始化
def __init__(self, name):
self.redis_client = redis.Redis(host="1.1.1.1", port=6301)
self.name = name
# 上锁
def acquire(self):
if self.redis_client.setnx(self.name, 1): # 如果不存在设置并且返回1,否则返回0,这是原子操作
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.setnx(self.name, 1):
return True
# 释放锁
def release(self):
self.redis_client.delete(self.name)
def sell2():
# 多线程下的并发带来的数据不一致的问题
# 顾客(goods_list)商品id为1的买10件以此类推
goods_list = [(1, 10), (2, 20), (3, 30)]
#事务
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 获取锁
lock = Lock(f"lock:goods_{goods_id}")
#上锁
lock.acquire()
# 查询库存
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
lock.release() # 释放锁
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
# mysql中有修改的情况下另一个修改将无法进行 这是mysql的原子性
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release() # 释放锁
if __name__ == '__main__':
#开两个线程
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
t1.start()
t2.start()
t1.join()
t2.join()
这个分布式锁虽然解决了高并发的超卖、事务但是还存在其他问题
比如互斥性(已解决)安全性、死锁、容错等
如何解决分布式锁中的重点难题
分布式锁需要解决的问题:
Lua脚本宕机release脑裂宕机获取锁释放锁
设置过期时间
过期设置会产生新的问题
key过期了
key值续租过期时间重新设置线程
这个续租是有负面影响的 看我举个例子:
并且设置了续租不设置续租
到底用不用续租 这得根据情况判断了
设置超时时间和uuid解决死锁和安全性:
class Lock:
# 初始化
def __init__(self, name):
self.redis_client = redis.Redis(host="1.1.1.1", port=6301)
self.name = name
from uuid import uuid4
uuid = uuid4()
# 上锁
def acquire(self):
# if self.redis_client.setnx(self.name, 1): # 如果不存在设置并且返回1,否则返回0,这是原子操作
# 设置uuid提高安全性 我开的锁只能我自己删 再设置一个过期时间解决死锁
if self.redis_client.set(self.name, uuid, nx=True, ex=10): # 设置过期时间
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.set(self.name, uuid, nx=True, ex=10):
return True
# 释放锁
def release(self):
# 先做一个判断,先取出值来然后判断当前的值和你自己的lock中的id是否一致,如果一致删除,如果不一致报错
# 这块代码不安全,将get和delete操作原子化 - 但是redis提供了一个脚本语言 Lua
id = self.redis_client.get(self.name)
if id == self.id:
self.redis_client.delete(self.name)
else:
print("不能删除不属于自己的锁")
python第三方脚本py-redis-lock
github地址:https://github.com/ionelmc/python-redis-lock
实现的原理和上面差不多
Lua脚本(原子化)续租
第三方脚本:
import threading
import weakref
from base64 import b64encode
from logging import getLogger
from os import urandom
from typing import Union
from redis import StrictRedis
__version__ = '4.0.0'
logger_for_acquire = getLogger(f"{__name__}.acquire")
logger_for_refresh_thread = getLogger(f"{__name__}.refresh.thread")
logger_for_refresh_start = getLogger(f"{__name__}.refresh.start")
logger_for_refresh_shutdown = getLogger(f"{__name__}.refresh.shutdown")
logger_for_refresh_exit = getLogger(f"{__name__}.refresh.exit")
logger_for_release = getLogger(f"{__name__}.release")
# Check if the id match. If not, return an error code.
#释放锁
UNLOCK_SCRIPT = b"""
if redis.call("get", KEYS[1]) ~= ARGV[1] then
return 1
else
redis.call("del", KEYS[2])
redis.call("lpush", KEYS[2], 1)
redis.call("pexpire", KEYS[2], ARGV[2])
redis.call("del", KEYS[1])
return 0
end
"""
# Covers both cases when key doesn't exist and doesn't equal to lock's id
#续租
EXTEND_SCRIPT = b"""
if redis.call("get", KEYS[1]) ~= ARGV[1] then
return 1
elseif redis.call("ttl", KEYS[1]) < 0 then
return 2
else
redis.call("expire", KEYS[1], ARGV[2])
return 0
end
"""
RESET_SCRIPT = b"""
redis.call('del', KEYS[2])
redis.call('lpush', KEYS[2], 1)
redis.call('pexpire', KEYS[2], ARGV[2])
return redis.call('del', KEYS[1])
"""
RESET_ALL_SCRIPT = b"""
local locks = redis.call('keys', 'lock:*')
local signal
for _, lock in pairs(locks) do
signal = 'lock-signal:' .. string.sub(lock, 6)
redis.call('del', signal)
redis.call('lpush', signal, 1)
redis.call('expire', signal, 1)
redis.call('del', lock)
end
return #locks
"""
class AlreadyAcquired(RuntimeError):
pass
class NotAcquired(RuntimeError):
pass
class AlreadyStarted(RuntimeError):
pass
class TimeoutNotUsable(RuntimeError):
pass
class InvalidTimeout(RuntimeError):
pass
class TimeoutTooLarge(RuntimeError):
pass
class NotExpirable(RuntimeError):
pass
class Lock(object):
"""
A Lock context manager implemented via redis SETNX/BLPOP.
"""
unlock_script = None
extend_script = None
reset_script = None
reset_all_script = None
_lock_renewal_interval: float
_lock_renewal_thread: Union[threading.Thread, None]
def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
"""
:param redis_client:
An instance of :class:`~StrictRedis`.
:param name:
The name (redis key) the lock should have.
:param expire:
The lock expiry time in seconds. If left at the default (None)
the lock will not expire.
:param id:
The ID (redis value) the lock should have. A random value is
generated when left at the default.
Note that if you specify this then the lock is marked as "held". Acquires
won't be possible.
:param auto_renewal: 是否自动续租
If set to ``True``, Lock will automatically renew the lock so that it
doesn't expire for as long as the lock is held (acquire() called
or running in a context manager).
Implementation note: Renewal will happen using a daemon thread with
an interval of ``expire*2/3``. If wishing to use a different renewal
time, subclass Lock, call ``super().__init__()`` then set
``self._lock_renewal_interval`` to your desired interval.
:param strict:
If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
:param signal_expire:
Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
"""
if strict and not isinstance(redis_client, StrictRedis):
raise ValueError("redis_client must be instance of StrictRedis. Use strict=False if you know what you're doing.")
if auto_renewal and expire is None:
raise ValueError("Expire may not be None when auto_renewal is set")
self._client = redis_client
if expire:
expire = int(expire)
if expire < 0:
raise ValueError("A negative expire is not acceptable.")
else:
expire = None
self._expire = expire
self._signal_expire = signal_expire
if id is None:
self._id = b64encode(urandom(18)).decode('ascii')
elif isinstance(id, bytes):
try:
self._id = id.decode('ascii')
except UnicodeDecodeError:
self._id = b64encode(id).decode('ascii')
elif isinstance(id, str):
self._id = id
else:
raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
self._name = 'lock:' + name
self._signal = 'lock-signal:' + name
self._lock_renewal_interval = float(expire) * 2 / 3 if auto_renewal else None
self._lock_renewal_thread = None
self.register_scripts(redis_client)
@classmethod
def register_scripts(cls, redis_client):
global reset_all_script
if reset_all_script is None:
#注册Lua脚本
cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
cls.reset_script = redis_client.register_script(RESET_SCRIPT)
cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
@property
def _held(self):
return self.id == self.get_owner_id()
def reset(self):
"""
Forcibly deletes the lock. Use this with care.
"""
self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
@property
def id(self):
return self._id
def get_owner_id(self):
owner_id = self._client.get(self._name)
if isinstance(owner_id, bytes):
owner_id = owner_id.decode('ascii', 'replace')
return owner_id
def acquire(self, blocking=True, timeout=None):
"""
:param blocking:
Boolean value specifying whether lock should be blocking or not.
:param timeout:
An integer value specifying the maximum number of seconds to block.
"""
logger_for_acquire.debug("Acquiring Lock(%r) ...", self._name)
if self._held:
raise AlreadyAcquired("Already acquired from this Lock instance.")
if not blocking and timeout is not None:
raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
if timeout:
timeout = int(timeout)
if timeout < 0:
raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
if self._expire and not self._lock_renewal_interval and timeout > self._expire:
raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
busy = True
blpop_timeout = timeout or self._expire or 0
timed_out = False
while busy:
busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
if busy:
if timed_out:
return False
elif blocking:
# 作者的list
timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
else:
logger_for_acquire.warning("Failed to acquire Lock(%r).", self._name)
return False
# 是否应该去刷新过期时间,不是一定要这样做,这是有风险的,如果当前的进程没有挂,但是一直阻塞,退不出来,就会永远刷新过期时间,造成死锁!
logger_for_acquire.info("Acquired Lock(%r).", self._name)
if self._lock_renewal_interval is not None:
self._start_lock_renewer()
return True
# 续租
def extend(self, expire=None):
"""
Extends expiration time of the lock.
:param expire:
New expiration time. If ``None`` - `expire` provided during
lock initialization will be taken.
"""
if expire:
expire = int(expire)
if expire < 0:
raise ValueError("A negative expire is not acceptable.")
elif self._expire is not None:
expire = self._expire
else:
raise TypeError("To extend a lock 'expire' must be provided as an argument to extend() method or at initialization time.")
error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
if error == 1:
raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
elif error == 2:
raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
elif error:
raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
@staticmethod
def _lock_renewer(name, lockref, interval, stop):
"""
Renew the lock key in redis every `interval` seconds for as long
as `self._lock_renewal_thread.should_exit` is False.
"""
while not stop.wait(timeout=interval):
logger_for_refresh_thread.debug("Refreshing Lock(%r).", name)
lock: "Lock" = lockref()
if lock is None:
logger_for_refresh_thread.debug("Stopping loop because Lock(%r) was garbage collected.", name)
break
lock.extend(expire=lock._expire)
del lock
logger_for_refresh_thread.debug("Exiting renewal thread for Lock(%r).", name)
def _start_lock_renewer(self):
"""
Starts the lock refresher thread.
"""
if self._lock_renewal_thread is not None:
raise AlreadyStarted("Lock refresh thread already started")
logger_for_refresh_start.debug(
"Starting renewal thread for Lock(%r). Refresh interval: %s seconds.", self._name, self._lock_renewal_interval
)
self._lock_renewal_stop = threading.Event()
self._lock_renewal_thread = threading.Thread(
group=None,
target=self._lock_renewer,
kwargs={
'name': self._name,
'lockref': weakref.ref(self),
'interval': self._lock_renewal_interval,
'stop': self._lock_renewal_stop,
},
)
self._lock_renewal_thread.demon = True
self._lock_renewal_thread.start()
def _stop_lock_renewer(self):
"""
Stop the lock renewer.
This signals the renewal thread and waits for its exit.
"""
if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
return
logger_for_refresh_shutdown.debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
self._lock_renewal_stop.set()
self._lock_renewal_thread.join()
self._lock_renewal_thread = None
logger_for_refresh_exit.debug("Renewal thread for Lock(%r) exited.", self._name)
def __enter__(self):
acquired = self.acquire(blocking=True)
if not acquired:
raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
self.release()
def release(self):
"""Releases the lock, that was acquired with the same object.
.. note::
If you want to release a lock that you acquired in a different place you have two choices:
* Use ``Lock("name", id=id_from_other_place).release()``
* Use ``Lock("name").reset()``
"""
if self._lock_renewal_thread is not None:
self._stop_lock_renewer()
logger_for_release.debug("Releasing Lock(%r).", self._name)
error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
if error == 1:
raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
elif error:
raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
def locked(self):
"""
Return true if the lock is acquired.
Checks that lock with same name already exists. This method returns true, even if
lock have another id.
"""
return self._client.exists(self._name) == 1
reset_all_script = None
def reset_all(redis_client):
"""
Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
:param redis_client:
An instance of :class:`~StrictRedis`.
"""
Lock.register_scripts(redis_client)
reset_all_script(client=redis_client) # noqa
使用:
def sell2():
# 多线程下的并发带来的数据不一致的问题
goods_list = [(1, 10), (2, 20), (3, 30)]
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 查询库存
from inventory_srv.test.py_redis_lock import Lock as PyLock
redis_client = redis.Redis(host="1.1.1.1", port=6301)
lock = PyLock(redis_client, f"lock:goods_{goods_id}", auto_renewal=True,expire=15)
lock.acquire()
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
time.sleep(20)
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
lock.release() # 释放锁
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release() # 释放锁
if __name__ == '__main__':
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
t1.start()
t2.start()
t1.join()
t2.join()
测试的时候关注redis里的TTL
集成到服务的话 连接Redis的时候推荐用连接池:
pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
REDIS_CLIENT = redis.StrictRedis(connection_pool=pool)
redis_client = REDIS_CLIENT(host="1.1.1.1", port=6301)
golang第三方脚本redsync
写一个购买商品的测试:
package main
import (
"fmt"
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
"sync"
"time"
)
func main() {
// Create a pool with go-redis (or redigo) which is the pool redisync will
// use while communicating with Redis. This can also be any pool that
// implements the `redis.Pool` interface.
client := goredislib.NewClient(&goredislib.Options{
Addr: "121.40.213.174:6301",
})
pool := goredis.NewPool(client)
rs := redsync.New(pool)
// Obtain a new mutex by using the same name for all instances wanting the
// same lock.
gNum := 2 //购买的数量
mutexname := "421" //商品名称
//sync.WaitGroup和协程属于go的基础知识 不做介绍
var wg sync.WaitGroup
wg.Add(gNum)
for i := 0; i < gNum; i++ {
go func() {
defer wg.Done()
mutex := rs.NewMutex(mutexname)
fmt.Println("开始获取锁")
if err := mutex.Lock(); err != nil {
panic(err)
}
fmt.Println("获取锁成功")
time.Sleep(time.Second * 5)
fmt.Println("开始释放锁")
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
fmt.Println("释放锁成功")
}()
}
wg.Wait()
}
redsync - 红锁
这个概念是在redis集群中出现的
如图:
多节点redis实现的分布式锁算法(RedLock):有效防止单点故障
假设有5个完全独立的redis主服务器
1.获取当前时间戳
2.client尝试按照顺序使用相同的key,value获取所有redis服务的锁,在获取锁的过程中的获取时间比锁过期时间短很多,这是为了不要过长时间等待已经关闭的redis服务。并且试着获取下一个redis实例。比如:TTL为5s,设置获取锁最多用1s,所以如果一秒内无法获取锁,就放弃获取这个锁,从而尝试获取下个锁
3.client通过获取所有能获取的锁后的时间减去第一步的时间,这个时间差要小于TTL时间并且至少有3个redis实例成功获取锁,才算真正的获取锁成功
4.如果成功获取锁,则锁的真正有效时间是 TTL减去第三步的时间差 的时间;比如:TTL 是5s,获取所有锁用了2s,则真正锁有效时间为3s(其实应该再减去时钟漂移);
5.如果客户端由于某些原因获取锁失败,便会开始解锁所有redis实例;因为可能已经获取了小于3个锁,必须释放,否则影响其他client获取锁
算法示意图如下:
什么是时钟漂移
如果redis服务器的机器时钟发生了向前跳跃,就会导致这个key过早超时失效,比如说客户端1拿到锁后,key的过期时间是12:02分,但redis服务器本身的时钟比客户端快了2分钟,导致key在12:00的时候就失效了,这时候,如果客户端1还没有释放锁的话,就可能导致多个客户端同时持有同一把锁的问题。RedLock算法是否是异步算法??
可以看成是同步算法;因为 即使进程间(多个电脑间)没有同步时钟,但是每个进程时间流速大致相同;并且时钟漂移相对于TTL叫小,可以忽略,所以可以看成同步算法;(不够严谨,算法上要算上时钟漂移,因为如果两个电脑在地球两端,则时钟漂移非常大)
RedLock失败重试
当client不能获取锁时,应该在随机时间后重试获取锁;并且最好在同一时刻并发的把set命令发送给所有redis实例;而且对于已经获取锁的client在完成任务后要及时释放锁,这是为了节省时间;
RedLock释放锁
由于释放锁时会判断这个锁的value是不是自己设置的,如果是才删除;所以在释放锁时非常简单,只要向所有实例都发出释放锁的命令,不用考虑能否成功释放锁;
RedLock注意点(Safety arguments):
1.先假设client获取所有实例,所有实例包含相同的key和过期时间(TTL) ,但每个实例set命令时间不同导致不能同时过期,第一个set命令之前是T1,最后一个set命令后为T2,则此client有效获取锁的最小时间为TTL-(T2-T1)-时钟漂移;
2.对于以N/2+ 1(也就是一半以 上)的方式判断获取锁成功,是因为如果小于一半判断为成功的话,有可能出现多个client都成功获取锁的情况, 从而使锁失效
3.一个client锁定大多数事例耗费的时间大于或接近锁的过期时间,就认为锁无效,并且解锁这个redis实例(不执行业务) ;只要在TTL时间内成功获取一半以上的锁便是有效锁;否则无效
系统有活性的三个特征
1.能够自动释放锁
2.在获取锁失败(不到一半以上),或任务完成后 能够自动释放锁,不用等到其自动过期
3.在client重试获取哦锁前(第一次失败到第二次重试时间间隔)大于第一次获取锁消耗的时间;
4.重试获取锁要有一定次数限制
RedLock性能及崩溃恢复的相关解决方法
1.如果redis没有持久化功能,在clientA获取锁成功后,所有redis重启,clientB能够再次获取到锁,这样违法了锁的排他互斥性;
2.如果启动AOF永久化存储,事情会好些, 举例:当我们重启redis后,由于redis过期机制是按照unix时间戳走的,所以在重启后,然后会按照规定的时间过期,不影响业务;但是由于AOF同步到磁盘的方式默认是每秒-次,如果在一秒内断电,会导致数据丢失,立即重启会造成锁互斥性失效;但如果同步磁盘方式使用Always(每一个写命令都同步到硬盘)造成性能急剧下降;所以在锁完全有效性和性能方面要有所取舍;
3.有效解决既保证锁完全有效性及性能高效及即使断电情况的方法是redis同步到磁盘方式保持默认的每秒,在redis无论因为什么原因停掉后要等待TTL时间后再重启(学名:延迟重启) ;缺点是 在TTL时间内服务相当于暂停状态;
总结:
1.TTL时长 要大于正常业务执行的时间+获取所有redis服务消耗时间+时钟漂移
2.获取redis所有服务消耗时间要 远小于TTL时间,并且获取成功的锁个数要 在总数的一般以上:N/2+1
3.尝试获取每个redis实例锁时的时间要 远小于TTL时间
4.尝试获取所有锁失败后 重新尝试一定要有一定次数限制
5.在redis崩溃后(无论一个还是所有),要延迟TTL时间重启redis
6.在实现多redis节点时要结合单节点分布式锁算法 共同实现