以一下代码为例
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
如果是高并发场景,存在线程安全问题,如果直接使用synchronized锁锁住
synchronized (this) {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
}
在单机环境,没有问题,但是系统一般是集群架构部署,如果多个请求来,分发到不同tomcat进程上去,仍然会出问题。
一个最基础的分布式锁
String lockkey ="lock:product_101";//模拟商品id
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
stringRedisTemplate.delete("lockkey");
直接删锁场景如果出现异常,就会一直不释放锁导致死锁,就需要加try{}finally{}保证删锁逻辑执行
String lockkey ="lock:product_101";//模拟商品id
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
if (!result) {
return "error_code";
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
}finally{
stringRedisTemplate.delete("lockkey");
}
同时为了防止宕机,需要加一个超时时间
String lockkey ="lock:product_101";//模拟商品id
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);//设置10s超时-错误示例,与上一步操作有原子性
if (!result) {
return "error_code";
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
}finally{
stringRedisTemplate.delete("lockkey");
}
潜在问题
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);
有原子性问题:
如果在setIfAbsent和expire之间发生 Redis 崩溃、JVM 崩溃、网络中断,会导致锁没有设置过期时间,变成永久锁(死锁);
并发竞争时可能失效,线程1setIfAbsent成功,但还未执行expire,线程B此时也能 setIfAbsent 成功(因为锁还未设置TTL),导致多个线程同时持有锁。
ps:TTL 表示一个键的剩余存活时间(单位:秒),它的作用是让Redis自动删除过期的键,避免内存泄漏或在分布式锁中,防止锁被永久占用(死锁)。
原子性问题解决
String lockkey ="lock:product_101";//模拟商品id
stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh",10,TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
}finally{
stringRedisTemplate.delete("lockkey");
}
高并发场景有严重问题
如果线程1执行一段时间超过了超时,然后线程2执行,接着线程1执行到删除锁,删除的是线程2的,极端情况下,分布式锁可能不会生效。
因此每个线程进来,生成一个唯一的id,在结束的时候,判断id是不是同一个if(clientId.equals(stringRedisTemplate.opsForValue().get(lockkey))
String lockkey ="lock:product_101";//模拟商品id
String clientId = UUID.randomUUID().toString();
stringRedisTemplate.opsForValue().setIfAbsent(lockkey,clientId,10,TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
}finally{
if(clientId.equals(stringRedisTemplate.opsForValue().get(lockkey)){
stringRedisTemplate.delete("lockkey");
}
}
依然有问题,如果执行过程中,执行if到删锁逻辑间,出现网络延迟,然后超时,分布式锁失效,高并发场景下其它线程可以加锁,等网络恢复,执行删锁逻辑把别的线程给删了
锁续命
锁续命:主线程抢到锁,执行业务,弄个分线程,在分线程弄个定时任务(小于超时),每过一段时间,主线程还加着锁,就把超时时间重新设回原值。
redisson的锁续命
两个线程,默认锁过期时间是30秒(可配置),成功获取锁后,Redisson 会启动一个 后台守护线程(Watchdog),每隔10秒(默认) 检查锁是否仍被当前线程持有。
Watchdog 每隔 10秒 执行一次检查:
如果锁仍被当前线程持有 → 把锁的过期时间 重置为30秒(重新计时)。
如果锁已被释放或线程已丢失锁 → 停止续期,锁最终自动过期
释放锁时停止续期:调用 unlock() 时,Redisson 会删除 Redis 中的锁(确保其他线程可以获取)。停止 Watchdog 线程(不再续期)
异常情况:1.客户端崩溃 → Watchdog 线程停止,锁最终自动过期(不会死锁)2.Redis 故障 → 续期失败,锁按原定时间过期。
redisson锁续命.png
所有线程都来抢锁,lua脚本保证原子性,只有一个线程可以抢锁成功,抢锁成功后,后台开启一个分线程锁续命;其它线程没有抢到锁,while循环自旋间歇性尝试加锁,不会死循环不断尝试加锁,而是阻塞等待,阻塞等待的初始时间是第一次尝试加锁返回的ttl时间,如果抢到锁的线程执行比较快,并快速把锁释放,其它线程会通过redis的发布订阅进行唤醒(没加锁的线程会通过Redis 的发布/订阅(Pub/Sub)机制监听一个特定的频道,当执行unlock方法时,会往队列发一条消息,监听的线程发现有消息了,调用onMessage方法,之后把阻塞唤醒,然后while循环)。
onMessage:Redis 发布/订阅(Pub/Sub)机制中的一个回调方法,当客户端订阅的频道收到消息时,这个方法会被自动调用,作用:
接收订阅频道的消息通知
触发锁竞争的重试机制
减少无效的轮询请求可能存在的问题:lock相当于设置key,redis异步同步给从节点,主节点正在同步给从节点,结果宕机,从节点会选举新的主节点,新的从节点没有key,再来一个线程,也可以加锁成功。(在主节点同步锁信息到从节点之前,如果主节点宕机,可能导致锁失效,出现多个客户端同时持有锁的情况)
Redis 主从同步机制:
主节点(Master):负责处理写操作(如SETNX加锁),并异步复制数据到从节点。
从节点(Slave):接收主节点的数据同步,但不直接处理写请求。
故障转移(Failover):当主节点宕机时,哨兵(Sentinel)会选举一个从节点成为新的主节点。
问题点:
主节点写入成功后,不会等待从节点确认,而是直接返回客户端成功(异步);
如果主节点在同步完成前宕机,从节点可能丢失部分数据(如未同步的锁信息)。
RedLock
一般使用奇数个redis节点,节点之间对等(主从、集群、哨兵等关系)。
加锁是类似于SETNX一个key,节点执行的结果要返回给客户端,客户端需要收到超过半数节点的OK,才会认为加锁成功,执行加锁逻辑。
redLock.png
一般节点要配从节点,不然宕机半数后永远加不上锁,但这样还是存在上述锁失效问题。
如果不设置从节点,一般不是每条命令持久化,一秒持久化一次,兼顾内存操作以提高性能,当节点加锁正好在这一秒之内,宕机或重启后,仍会锁失效。
引入redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<scope>3.6.5</scope>
</dependency>
String lockkey ="lock:product_101";//模拟商品id
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock();//.setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
if (stock > 0) {
int realstock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realstock);
} else {
System.out.println("扣减失败,库存不足");
}
}finally{
//释放分布式锁
redissonLock.unlock();
}
lua脚本
Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。使用脚本的好处如下:
1.减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。这点跟管道类似。
2.原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令(类似mset)是原子的(串行执行)。
3.替代redis的事务功能:redis自带的事务功能很鸡肋,而redis的lua脚本几乎实现了常规的事务功能,官方推荐如果要使用redis的事务功能可以用redis lua替代。从Redis2.6.0版本开始,通过内置的Lua解释器,可以使用EVAL命令对Lua脚本进行求值。EVAL命令的格式如下:EVAL script numkeys key [key ....] arg [arg ....]
EVAL:执行脚本的客户端命令
script:一段Lua脚本程序,如return
eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
2:key的个数,对应KEYS[1],KEYS[2]
key1:KEYS[1]
key2:KEYS[2]
first:ARGV[1]
second:ARGV[2]
jedis.set("product_stock_10016", "15"); //初始化商品10016的库存
String script = " local count = redis.call('get', KEYS[1]) " +
" local a = tonumber(count) " +
" local b = tonumber(ARGV[1]) " +
" if a >= b then " +
" redis.call('set', KEYS[1], a-b) " +
" return 1 " +
" end " +
" return 0 ";
;
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));
/**
*redis.call('get', KEYS[1]):调用get方法,获取KEYS[1]对应的值,这里是15
* KEYS[1]对应Arrays.asList("product_stock_10016")的第一个;
* ARGV[1]对应 Arrays.asList("10")的第一个。
* " local a = tonumber(count) " :把15转为number类型
* " local b = tonumber(ARGV[1]) ":把10拿过来转为number类型
*" if a >= b then ":如果库存大于扣减数量
*" redis.call('set', KEYS[1], a-b) ":设置扣减后的数量
*" return 1 ":返回1
*" end " :条件结束
*" return 0 ":否则返回0
*优点:命令不可切割,当成一行执行
*/