本文是基于上一篇布隆过滤器的进阶:https://www.jianshu.com/p/e4773b69319d
前言
在上一篇文章中,我们利用了redis实现了分布式的布隆过滤器,基本能满足大部分的需求。但是近期在修改代码的时候,想到高并发的情形下是否会出现错判,于是仔细考虑了实现原理、分析scrapyredis源码,觉得加上分布式锁会更好。
主要是为了在拓展中学习,深挖可能出现的问题。
布隆过滤器相关的代码都推在github上,并近期依然在改进优化,喜欢的小伙伴可以starts支持一下,有疑问可以发issue~
github地址:https://github.com/Sssmeb/BloomFilter/blob/master/RedisBloomFilter.py
锁和分布式锁
一般来说,在对数据进行加锁时,程序首先需要通过获取锁来得到对数据进行排他性访问的能力,然后才能对数据执行一系列操作,最后还要将锁释放给其他程序。
对于能够被多个线程访问的共享内存数据结构来说,这种 “获取锁、执行操作、释放锁” 动作非常常见。
Redis使用WATCH命令代替对数据进行加锁,因为WATCH只会在数据被其他客户端抢先修改了的情况下通知执行了这个命令的客户端,而不会阻止其他客户端对数据进行修改,所以WATCH被称为乐观锁。
悲观锁:例如我们熟悉的mysql就是悲观锁。即加锁后,其他客户端只能等待本次操作结束后,释放锁再进行操作。
分布式锁是由不同机器上的不同客户端进行获取和释放的。不在操作系统级别、编程语言级别使用锁是为了对存储的数据进行排他性访问,客户端需要访问一个锁,这个锁必须定义在一个可以让所有客户端都看得见的访问,而这个访问就是数据库(Redis)本身。
如果只是依靠watch、multi、exec组成的事务,当客户端负载(数量)不断增加时,事务会因执行失败而反复进行重试,所以这样的做法并不完美。
分布式锁的实现
注意事项
因为客户端在使用锁的过程中也可能因为某种原因下线(如客户端主机关机死机等),所以为了防止客户端在取得锁之后崩溃,导致锁一直处于 被获取 的状态,锁应该要实现由超时限制特性:未在指定时间内完成操作,锁将自动释放。
注意可能出错的场景:
- 持有锁的进程被自动释放掉锁后,进程本身并不知晓这点,甚至还可能会错误地释放了其他进程持有的锁。
- 某个持有锁的进程崩溃,其他锁不知道哪个进程持有锁,也不知道它已经崩溃,导致白白浪费等待时间
- 在一个进程持有的锁过期后,其他多个进程同时尝试去获取锁,并且都成功获取了。
- 上述一和三同时发生,多个进程获得了锁,并且以为自己是唯一一个获得锁的进程。
redis构建分布式锁
基本功能实现
为了对数据进行排他性访问,程序首先要获取锁。
SETNX只会在键不存在的情况下为键设置值,而锁要做的就是将一个随机生成的128位UUID设置位键的值,并使用这个值来防止锁被其他进程取得。如果程序在尝试获取锁的时候失败,那么它将不断地进行重试,直到成功地取得锁或者超过给定的时限为止。
def acquire_lock(conn, lockname, acquire_timeout=10):
# 128位随机标识符
identifier = str(uuid.uuid4())
end = time.time() + acquire_timeout
# 在规定时间内 不断重试
while time.time() < end:
if conn.setnx('lock:' + lockname, identifier):
return identifier
time.sleep(0.001)
return False
为什么需要一个唯一标识?
考虑没有唯一标识的场景下:
A客户端使用完锁,准备释放的过程中,锁过期(在有设时间限制的场景下)
由于A客户端的锁已经过期,所以另一个B客户端申请锁成功
A继续释放锁,由于没有唯一标识,自身的锁已过期,所以将B客户端的锁释放掉了。
B客户端却不知道自己的锁已经被释放了,继续执行程序。
锁释放时,先使用WATCH监视代表锁的键,接着检查键目前的值是否和加锁时设置的值相同,并在确认后删除(可以防止错误地释放同一个锁多次)。
def release_lock(conn, lockname, identifier):
pipe = conn.pipeline(True)
lockname = 'lock:' + lockname
while True:
try:
pipe.watch(lockname)
# 判断标志是否相同
if str(pipe.get(lockname), encoding='utf-8') == identifier:
pipe.multi()
pipe.delete(lockname)
pipe.execute()
return True
# 不同则直接退出 return False
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
return False
注:一般来说此处的while循环只会在极少数的情况用到,主要为了后期为锁加上超时限制后,锁可能会混合使用两个版本的锁。
通过以上锁可以在高负载情况下,有效的避免WATCH实现因为竞争过多而导致延迟剧增甚至无法执行的问题、减少重试次数、降低延迟时间、提升性能并能自定制合适的粒度锁。
性能优化——超时限制
为了给锁加上超时限制特性,程序在取得锁之后,调用expire命令来为锁设置过期时间,使得Redis可以自动删除超时的锁。为了确保锁在客户端已经崩溃(在setnx和expire直接崩溃是最糟糕的)的情况下,仍然能够自动被释放,客户端会在尝试获取锁失败之后,检查锁的超时时间,并为未设置超时时间的锁设置超时时间。因此锁总会带有超时时间,并最终因为超时而自动被释放,使得其他客户端可以继续尝试获取已被释放的锁。
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):
# 128位随机标识符
identifier = str(uuid.uuid4())
lockname = 'lock:' + lockname
lock_timeout = int(math.ceil(lock_timeout) # 确保传给exprie是整数
end = time.time() + acquire_timeout
while time.time() < end:
if conn.setnx(lockname, identifier):
conn.expire(lockname, lock_timeout)
return identifier
elif not conn.ttl(lockname): # 为没有设置超时时间的锁设置超时时间
conn.expire(lockname, lock_timeout)
time.sleep(0.001)
return False
新的函数增加了超时限制特性,确保了锁总会在有需要的时候被释放,而不会被某个客户端一直把持着。且可以直接复用之前的释放锁函数。
布隆过滤器 + 分布式锁
只需要将上述的分布式锁在add、is_exists操作前加锁,操作结束后释放锁即可。
分布式布隆过滤器为什么需要锁?
在高并发的场景下,可能出现一个客户端在进行add操作的同时,另一个客户端在进行is_exists,有可能出现错判的情况。
虽然redis 是单线程的,但由于每次add或者is_exists判断的时候需要对多个位进行操作,即循环发送 逐位 操作指令才能完成一次add或is_exists操作。
所以有可能出现一个客户端在进行add的位数组操作循环中,另一个客户端进行is_exists,导致错判
此时在redis中能采用的方法有:
- 事务。使用watch、multi、exec等。
- 非事务型流水线。(将一系列命令打包再一次性发送给redis)
- 分布式锁。
三种方法的优劣分别是:
- 采用事务实现操作简单,只需要在原代码头尾加上事务相关的代码即可。但事务有一个明显的缺点是 乐观锁 带来失败重试导致效率降低。在本场景下不明显,但是例如商品秒杀活动场景,乐观锁会造成大量的失败重试。
- 非事务型流水线。能解决普通分布式过滤器的需求,但是对于scrapyredis的去重过滤仍有可能出错(需要结合源码)
- 分布式锁能保证数据的安全性。但实现相对麻烦,需要处理的情况较多。
这里我们使用分布式锁。
具体代码参看github:https://github.com/Sssmeb/BloomFilter/blob/master/RedisBloomFilter.py
内含scrapyredis库(使用布隆过滤器版本)
引用
《redis实战》