分布式锁场景

以一下代码为例
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
    if (stock > 0) {
        int realstock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
        System.out.println("扣减成功,剩余库存:" + realstock);
    } else {
        System.out.println("扣减失败,库存不足");
    }
如果是高并发场景,存在线程安全问题,如果直接使用synchronized锁锁住
    synchronized (this) {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
        if (stock > 0) {
            int realstock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realstock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    }
在单机环境,没有问题,但是系统一般是集群架构部署,如果多个请求来,分发到不同tomcat进程上去,仍然会出问题。
一个最基础的分布式锁
    String lockkey ="lock:product_101";//模拟商品id
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
    if (!result) {
        return "error_code";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
    if (stock > 0) {
        int realstock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
        System.out.println("扣减成功,剩余库存:" + realstock);
    } else {
        System.out.println("扣减失败,库存不足");
    }
    stringRedisTemplate.delete("lockkey");
直接删锁场景如果出现异常,就会一直不释放锁导致死锁,就需要加try{}finally{}保证删锁逻辑执行
    String lockkey ="lock:product_101";//模拟商品id
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
    if (!result) {
        return "error_code";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
        if (stock > 0) {
            int realstock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realstock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    }finally{
        stringRedisTemplate.delete("lockkey");
    }
同时为了防止宕机,需要加一个超时时间
    String lockkey ="lock:product_101";//模拟商品id
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
    stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);//设置10s超时-错误示例,与上一步操作有原子性
    if (!result) {
        return "error_code";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
        if (stock > 0) {
            int realstock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realstock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    }finally{
        stringRedisTemplate.delete("lockkey");
    }
潜在问题

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh");
stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);
有原子性问题:
 如果在setIfAbsent和expire之间发生 Redis 崩溃、JVM 崩溃、网络中断,会导致锁没有设置过期时间,变成永久锁(死锁);
 并发竞争时可能失效,线程1setIfAbsent成功,但还未执行expire,线程B此时也能 setIfAbsent 成功(因为锁还未设置TTL),导致多个线程同时持有锁。
ps:TTL 表示一个键的剩余存活时间(单位:秒),它的作用是让Redis自动删除过期的键,避免内存泄漏或在分布式锁中,防止锁被永久占用(死锁)。

原子性问题解决
    String lockkey ="lock:product_101";//模拟商品id
    stringRedisTemplate.opsForValue().setIfAbsent(lockkey,"hhh",10,TimeUnit.SECONDS);
    if (!result) {
        return "error_code";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
        if (stock > 0) {
            int realstock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realstock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    }finally{
        stringRedisTemplate.delete("lockkey");
    }
高并发场景有严重问题

 如果线程1执行一段时间超过了超时,然后线程2执行,接着线程1执行到删除锁,删除的是线程2的,极端情况下,分布式锁可能不会生效。
 因此每个线程进来,生成一个唯一的id,在结束的时候,判断id是不是同一个if(clientId.equals(stringRedisTemplate.opsForValue().get(lockkey))

    String lockkey ="lock:product_101";//模拟商品id
    String clientId = UUID.randomUUID().toString();
    stringRedisTemplate.opsForValue().setIfAbsent(lockkey,clientId,10,TimeUnit.SECONDS);
    if (!result) {
        return "error_code";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
        if (stock > 0) {
            int realstock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realstock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    }finally{
        if(clientId.equals(stringRedisTemplate.opsForValue().get(lockkey)){
            stringRedisTemplate.delete("lockkey");
        }
    }

 依然有问题,如果执行过程中,执行if到删锁逻辑间,出现网络延迟,然后超时,分布式锁失效,高并发场景下其它线程可以加锁,等网络恢复,执行删锁逻辑把别的线程给删了

锁续命

 锁续命:主线程抢到锁,执行业务,弄个分线程,在分线程弄个定时任务(小于超时),每过一段时间,主线程还加着锁,就把超时时间重新设回原值。

redisson的锁续命

 两个线程,默认锁过期时间是30秒(可配置),成功获取锁后,Redisson 会启动一个 后台守护线程(Watchdog),每隔10秒(默认) 检查锁是否仍被当前线程持有。
 Watchdog 每隔 10秒 执行一次检查:
  如果锁仍被当前线程持有 → 把锁的过期时间 重置为30秒(重新计时)。
  如果锁已被释放或线程已丢失锁 → 停止续期,锁最终自动过期
 释放锁时停止续期:调用 unlock() 时,Redisson 会删除 Redis 中的锁(确保其他线程可以获取)。停止 Watchdog 线程(不再续期)
 异常情况:1.客户端崩溃 → Watchdog 线程停止,锁最终自动过期(不会死锁)2.Redis 故障 → 续期失败,锁按原定时间过期。


redisson锁续命.png

 所有线程都来抢锁,lua脚本保证原子性,只有一个线程可以抢锁成功,抢锁成功后,后台开启一个分线程锁续命;其它线程没有抢到锁,while循环自旋间歇性尝试加锁,不会死循环不断尝试加锁,而是阻塞等待,阻塞等待的初始时间是第一次尝试加锁返回的ttl时间,如果抢到锁的线程执行比较快,并快速把锁释放,其它线程会通过redis的发布订阅进行唤醒(没加锁的线程会通过Redis 的发布/订阅(Pub/Sub)机制监听一个特定的频道,当执行unlock方法时,会往队列发一条消息,监听的线程发现有消息了,调用onMessage方法,之后把阻塞唤醒,然后while循环)。

 onMessage:Redis 发布/订阅(Pub/Sub)机制中的一个回调方法,当客户端订阅的频道收到消息时,这个方法会被自动调用,作用:
  接收订阅频道的消息通知
  触发锁竞争的重试机制
  减少无效的轮询请求

  可能存在的问题:lock相当于设置key,redis异步同步给从节点,主节点正在同步给从节点,结果宕机,从节点会选举新的主节点,新的从节点没有key,再来一个线程,也可以加锁成功。(在主节点同步锁信息到从节点之前,如果主节点宕机,可能导致锁失效,出现多个客户端同时持有锁的情况)
  Redis 主从同步机制:
   主节点(Master):负责处理写操作(如SETNX加锁),并异步复制数据到从节点。
   从节点(Slave):接收主节点的数据同步,但不直接处理写请求。
   故障转移(Failover):当主节点宕机时,哨兵(Sentinel)会选举一个从节点成为新的主节点。
  问题点:
   主节点写入成功后,不会等待从节点确认,而是直接返回客户端成功(异步);
   如果主节点在同步完成前宕机,从节点可能丢失部分数据(如未同步的锁信息)。

RedLock

  一般使用奇数个redis节点,节点之间对等(主从、集群、哨兵等关系)。
  加锁是类似于SETNX一个key,节点执行的结果要返回给客户端,客户端需要收到超过半数节点的OK,才会认为加锁成功,执行加锁逻辑。


redLock.png

  一般节点要配从节点,不然宕机半数后永远加不上锁,但这样还是存在上述锁失效问题。
  如果不设置从节点,一般不是每条命令持久化,一秒持久化一次,兼顾内存操作以提高性能,当节点加锁正好在这一秒之内,宕机或重启后,仍会锁失效。

引入redisson
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <scope>3.6.5</scope>
    </dependency>
    String lockkey ="lock:product_101";//模拟商品id
    //获取锁对象
    RLock redissonLock = redisson.getLock(lockKey);
    //加分布式锁
    redissonLock.lock();//.setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));// jedis.get("stock")
        if (stock > 0) {
            int realstock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realstock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    }finally{
        //释放分布式锁
        redissonLock.unlock();
    }
lua脚本

 Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。使用脚本的好处如下:
 1.减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。这点跟管道类似。
 2.原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令(类似mset)是原子的(串行执行)。
 3.替代redis的事务功能:redis自带的事务功能很鸡肋,而redis的lua脚本几乎实现了常规的事务功能,官方推荐如果要使用redis的事务功能可以用redis lua替代。

 从Redis2.6.0版本开始,通过内置的Lua解释器,可以使用EVAL命令对Lua脚本进行求值。EVAL命令的格式如下:EVAL script numkeys key [key ....] arg [arg ....]
 EVAL:执行脚本的客户端命令
 script:一段Lua脚本程序,如return
 eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
  2:key的个数,对应KEYS[1],KEYS[2]
  key1:KEYS[1]
  key2:KEYS[2]
  first:ARGV[1]
  second:ARGV[2]

jedis.set("product_stock_10016", "15");  //初始化商品10016的库存
String script = " local count = redis.call('get', KEYS[1]) " +
    " local a = tonumber(count) " +
    " local b = tonumber(ARGV[1]) " +
    " if a >= b then " +
    "   redis.call('set', KEYS[1], a-b) " +
    "   return 1 " +
    " end " +
    " return 0 ";
;
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));
/**
 *redis.call('get', KEYS[1]):调用get方法,获取KEYS[1]对应的值,这里是15
 *   KEYS[1]对应Arrays.asList("product_stock_10016")的第一个;
 *   ARGV[1]对应 Arrays.asList("10")的第一个。
 * " local a = tonumber(count) " :把15转为number类型
 * " local b = tonumber(ARGV[1]) ":把10拿过来转为number类型
 *" if a >= b then ":如果库存大于扣减数量
 *"   redis.call('set', KEYS[1], a-b) ":设置扣减后的数量
 *"   return 1 ":返回1
 *" end " :条件结束
 *" return 0 ":否则返回0
 *优点:命令不可切割,当成一行执行
 */
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容