特别说明: 本人平时混迹于 B 站,不咋回复这里的评论,有问题可以到 B 站视频评论区留言找我
视频地址: https://space.bilibili.com/31137138/favlist?fid=326428938
课件说明: 本次提供的课件是 Spring Cloud Netflix 版微服务架构指南,如果有兴趣想要学习 Spring Cloud Alibaba 版,可以前往 http://www.qfdmy.com 查看相关课程资源
案例代码: https://github.com/topsale/hello-spring-cloud-netflix
什么是分布式协调
分布式协调技术主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成"脏数据"的后果。
在这图中有三台机器,每台机器各跑一个应用程序。然后我们将这三台机器通过网络将其连接起来,构成一个系统来为用户提供服务,对用户来说这个系统的架构是透明的,他感觉不到我这个系统是一个什么样的架构。那么我们就可以把这种系统称作一个分布式系统。
在这个分布式系统中如何对进程进行调度,我假设在第一台机器上挂载了一个资源,然后这三个物理分布的进程都要竞争这个资源,但我们又不希望他们同时进行访问,这时候我们就需要一个协调器,来让他们有序的来访问这个资源。这个协调器就是我们经常提到的那个锁,比如说"进程-1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。这个分布式锁也就是我们分布式协调技术实现的核心内容。
什么是分布式锁
为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁。
为什么需要分布式锁
- 成员变量 A 存在 JVM1、JVM2、JVM3 三个 JVM 内存中
- 成员变量 A 同时都会在 JVM 分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的
- 不是同时发过来,三个请求分别操作三个不同 JVM 内存区域的数据,变量 A 之间不存在共享,也不具有可见性,处理的结果也是不对的
注:该成员变量 A 是一个有状态的对象
如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题,这就是分布式锁要解决的问题
分布式锁应该具备哪些条件
- 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
- 高可用的获取锁与释放锁
- 高性能的获取锁与释放锁
- 具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
- 具备锁失效机制,防止死锁
- 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
分布式锁的实现有哪些
- Memcached:利用 Memcached 的
add
命令。此命令是原子性操作,只有在key
不存在的情况下,才能add
成功,也就意味着线程得到了锁。 - Redis:和 Memcached 的方式类似,利用 Redis 的
setnx
命令。此命令同样是原子性操作,只有在key
不存在的情况下,才能set
成功。 - Zookeeper:利用 Zookeeper 的顺序临时节点,来实现分布式锁和等待队列。Zookeeper 设计的初衷,就是为了实现分布式锁服务的。
- Chubby:Google 公司实现的粗粒度分布式锁服务,底层利用了 Paxos 一致性算法。
Redis 实现分布式锁
分布式锁实现的三个核心要素:
加锁
最简单的方法是使用 setnx
命令。key
是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给 key
命名为 “lock_sale_商品ID” 。而 value
设置成什么呢?我们可以姑且设置成 1
。加锁的伪代码如下:
setnx(lock_sale_商品ID,1)
当一个线程执行 setnx
返回 1
,说明 key
原本不存在,该线程成功得到了锁;当一个线程执行 setnx
返回 0
,说明 key
已经存在,该线程抢锁失败。
解锁
有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行 del
指令,伪代码如下:
del(lock_sale_商品ID)
释放锁之后,其他线程就可以继续执行 setnx
命令来获得锁。
锁超时
锁超时是什么意思呢?如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住(死锁),别的线程再也别想进来。所以,setnx
的 key
必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx
不支持超时参数,所以需要额外的指令,伪代码如下:
expire(lock_sale_商品ID, 30)
综合伪代码如下:
if(setnx(lock_sale_商品ID,1) == 1){
expire(lock_sale_商品ID,30)
try {
do something ......
} finally {
del(lock_sale_商品ID)
}
}
存在什么问题
以上伪代码中存在三个致命问题
-
setnx
和expire
的非原子性
设想一个极端场景,当某线程执行 setnx
,成功得到了锁:
setnx
刚执行成功,还未来得及执行 expire
指令,节点 1 挂掉了。
这样一来,这把锁就没有设置过期时间,变成死锁,别的线程再也无法获得锁了。
怎么解决呢?setnx
指令本身是不支持传入超时时间的,set
指令增加了可选参数,伪代码如下:
set(lock_sale_商品ID,1,30,NX)
这样就可以取代 setnx
指令。
-
del
导致误删
又是一个极端场景,假如某线程成功得到了锁,并且设置的超时时间是 30 秒。
如果某些原因导致线程 A 执行的很慢很慢,过了 30 秒都没执行完,这时候锁过期自动释放,线程 B 得到了锁。
随后,线程 A 执行完了任务,线程 A 接着执行 del
指令来释放锁。但这时候线程 B 还没执行完,线程A实际上 删除的是线程 B 加的锁
。
怎么避免这种情况呢?可以在 del
释放锁之前做一个判断,验证当前的锁是不是自己加的锁。至于具体的实现,可以在加锁的时候把当前的线程 ID 当做 value
,并在删除之前验证 key
对应的 value
是不是自己线程的 ID。
加锁:
String threadId = Thread.currentThread().getId()
set(key,threadId ,30,NX)
解锁:
if(threadId .equals(redisClient.get(key))){
del(key)
}
但是,这样做又隐含了一个新的问题,判断和释放锁是两个独立操作,不是原子性。
- 出现并发的可能性
还是刚才第二点所描述的场景,虽然我们避免了线程 A 误删掉 key
的情况,但是同一时间有 A,B 两个线程在访问代码块,仍然是不完美的。怎么办呢?我们可以让获得锁的线程开启一个守护线程,用来给快要过期的锁“续航”。
当过去了 29 秒,线程 A 还没执行完,这时候守护线程会执行 expire
指令,为这把锁“续命”20 秒。守护线程从第 29 秒开始执行,每 20 秒执行一次。
当线程 A 执行完任务,会显式关掉守护线程。
另一种情况,如果节点 1 忽然断电,由于线程 A 和守护线程在同一个进程,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。
什么是 Zookeeper
ZooKeeper 是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。
Zookeeper 的数据模型
Zookeeper 的数据模型是什么样子呢?它很像数据结构当中的树,也很像文件系统的目录。
树是由节点所组成,Zookeeper 的数据存储也同样是基于节点,这种节点叫做 Znode
但是,不同于树的节点,Znode 的引用方式是路径引用,类似于文件路径:
/动物/猫
/汽车/宝马
这样的层级结构,让每一个 Znode 节点拥有唯一的路径,就像命名空间一样对不同信息作出清晰的隔离。
Znode 包含哪些元素
- data:Znode 存储的数据信息。
- ACL:记录 Znode 的访问权限,即哪些人或哪些 IP 可以访问本节点。
- stat:包含 Znode 的各种元数据,比如事务 ID、版本号、时间戳、大小等等。
- child:当前节点的子节点引用
这里需要注意一点,Zookeeper 是为读多写少的场景所设计。Znode 并不是用来存储大规模业务数据,而是用于存储少量的状态和配置信息,每个节点的数据最大不能超过 1MB
。
Zookeeper 的基本操作
- 创建节点
create
- 删除节点
delete
- 判断节点是否存在
exists
- 获得一个节点的数据
getData
- 设置一个节点的数据
setData
- 获取节点下的所有子节点
getChildren
这其中,exists
,getData
,getChildren
属于读操作。Zookeeper 客户端在请求读操作的时候,可以选择是否设置 Watch
Zookeeper 的事件通知
我们可以把 Watch 理解成是注册在特定 Znode 上的触发器。当这个 Znode 发生改变,也就是调用了 create
,delete
,setData
方法的时候,将会触发 Znode 上注册的对应事件,请求 Watch 的客户端会接收到异步通知。
具体交互过程如下:
- 客户端调用
getData
方法,watch
参数是true
。服务端接到请求,返回节点数据,并且在对应的哈希表里插入被 Watch 的 Znode 路径,以及 Watcher 列表。
- 当被 Watch 的 Znode 已删除,服务端会查找哈希表,找到该 Znode 对应的所有 Watcher,异步通知客户端,并且删除哈希表中对应的 Key-Value。
Zookeeper 的一致性
Zookeeper 身为分布式系统协调服务,如果自身挂了如何处理呢?为了防止单机挂掉的情况,Zookeeper 维护了一个集群。如下图:
Zookeeper Service 集群是一主多从结构。
在更新数据时,首先更新到主节点(这里的节点是指服务器,不是 Znode),再同步到从节点。
在读取数据时,直接读取任意从节点。
为了保证主从节点的数据一致性,Zookeeper 采用了 ZAB 协议,这种协议非常类似于一致性算法 Paxos 和 Raft。
什么是 ZAB
Zookeeper Atomic Broadcast,有效解决了 Zookeeper 集群崩溃恢复,以及主从同步数据的问题。
ZAB 协议定义的三种节点状态
Looking :选举状态。
Following :Follower 节点(从节点)所处的状态。
Leading :Leader 节点(主节点)所处状态。
最大 ZXID
最大 ZXID 也就是节点本地的最新事务编号,包含 epoch 和计数两部分。epoch 是纪元的意思,相当于 Raft 算法选主时候的 term。
- ZAB 的崩溃恢复
假如 Zookeeper 当前的主节点挂掉了,集群会进行崩溃恢复。ZAB 的崩溃恢复分成三个阶段:
Leader election
选举阶段,此时集群中的节点处于 Looking 状态。它们会各自向其他节点发起投票,投票当中包含自己的服务器 ID 和最新事务 ID(ZXID)。
接下来,节点会用自身的 ZXID 和从其他节点接收到的 ZXID 做比较,如果发现别人家的 ZXID 比自己大,也就是数据比自己新,那么就重新发起投票,投票给目前已知最大的 ZXID 所属节点。
每次投票后,服务器都会统计投票数量,判断是否有某个节点得到半数以上的投票。如果存在这样的节点,该节点将会成为准 Leader,状态变为 Leading。其他节点的状态变为 Following。
Discovery
发现阶段,用于在从节点中发现最新的 ZXID 和事务日志。或许有人会问:既然 Leader 被选为主节点,已经是集群里数据最新的了,为什么还要从节点中寻找最新事务呢?
这是为了防止某些意外情况,比如因网络原因在上一阶段产生多个 Leader 的情况。
所以这一阶段,Leader 集思广益,接收所有 Follower 发来各自的最新 epoch 值。Leader 从中选出最大的 epoch,基于此值加 1,生成新的 epoch 分发给各个 Follower。
各个 Follower 收到全新的 epoch 后,返回 ACK 给 Leader,带上各自最大的 ZXID 和历史事务日志。Leader 选出最大的 ZXID,并更新自身历史日志。
Synchronization
同步阶段,把 Leader 刚才收集得到的最新历史事务日志,同步给集群中所有的 Follower。只有当半数 Follower 同步成功,这个准 Leader 才能成为正式的 Leader。
自此,故障恢复正式完成。
- ZAB 的数据写入
Broadcast
ZAB 的数据写入涉及到 Broadcast 阶段,简单来说,就是 Zookeeper 常规情况下更新数据的时候,由 Leader 广播到所有的 Follower。其过程如下:
- 客户端发出写入数据请求给任意 Follower。
- Follower 把写入数据请求转发给 Leader。
- Leader 采用二阶段提交方式,先发送 Propose 广播给 Follower。
- Follower 接到 Propose 消息,写入日志成功后,返回 ACK 消息给 Leader。
- Leader 接到半数以上ACK消息,返回成功给客户端,并且广播 Commit 请求给 Follower
ZAB 协议既不是强一致性,也不是弱一致性,而是处于两者之间的单调一致性(顺序一致性)。它依靠事务 ID 和版本号,保证了数据的更新和读取是有序的。
Zookeeper 的应用场景
分布式锁
这是雅虎研究员设计 Zookeeper 的初衷。利用 Zookeeper 的临时顺序节点,可以轻松实现分布式锁。
服务注册和发现
利用 Znode 和 Watcher,可以实现分布式服务的注册和发现。最著名的应用就是阿里的分布式 RPC 框架 Dubbo。
共享配置和状态信息
Redis 的分布式解决方案 Codis,就利用了 Zookeeper 来存放数据路由表和 codis-proxy 节点的元信息。同时 codis-config 发起的命令都会通过 ZooKeeper 同步到各个存活的 codis-proxy。
此外,Kafka、HBase、Hadoop,也都依靠 Zookeeper 同步节点信息,实现高可用。
Zookeeper 实现分布式锁
Znode 的四种类型
Zookeeper 的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做 Znode。
- 持久节点(PERSISTENT)
默认的节点类型。创建节点的客户端与 Zookeeper 断开连接后,该节点依旧存在。
- 持久节点顺序节点(PERSISTENT_SEQUENTIAL)
所谓顺序节点,就是在创建节点时,Zookeeper 根据创建的时间顺序给该节点名称进行编号:
- 临时节点(EPHEMERAL)
和持久节点相反,当创建节点的客户端与 Zookeeper 断开连接后,临时节点会被删除:
- 临时顺序节点(EPHEMERAL_SEQUENTIAL)
顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper 根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与 Zookeeper 断开连接后,临时节点会被删除。
Zookeeper 分布式锁的原理
Zookeeper 分布式锁恰恰应用了临时顺序节点。具体如何实现呢?让我们来看一看详细步骤:
- 获取锁
首先,在 Zookeeper 当中创建一个持久节点 ParentLock。当第一个客户端想要获得锁时,需要在 ParentLock 这个节点下面创建一个临时顺序节点 Lock1。
之后,Client1 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock1 是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。
这时候,如果再有一个客户端 Client2 前来获取锁,则在 ParentLock 下载再创建一个临时顺序节点 Lock2。
Client2 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock2 是不是顺序最靠前的一个,结果发现节点 Lock2 并不是最小的。
于是,Client2 向排序仅比它靠前的节点 Lock1 注册 Watcher,用于监听 Lock1 节点是否存在。这意味着 Client2 抢锁失败,进入了等待状态。
这时候,如果又有一个客户端 Client3 前来获取锁,则在 ParentLock 下载再创建一个临时顺序节点 Lock3。
Client3 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock3 是不是顺序最靠前的一个,结果同样发现节点 Lock3 并不是最小的。
于是,Client3 向排序仅比它靠前的节点 Lock2 注册 Watcher,用于监听 Lock2 节点是否存在。这意味着 Client3 同样抢锁失败,进入了等待状态。
这样一来,Client1 得到了锁,Client2 监听了 Lock1,Client3 监听了 Lock2。这恰恰形成了一个等待队列,
- 释放锁
释放锁分为两种情况:
- 任务完成,客户端显示释放
当任务完成时,Client1 会显示调用删除节点 Lock1 的指令。
- 任务执行过程中,客户端崩溃
获得锁的 Client1 在任务执行过程中,如果崩溃,则会断开与 Zookeeper 服务端的链接。根据临时节点的特性,相关联的节点 Lock1 会随之自动删除。
由于 Client2 一直监听着 Lock1 的存在状态,当 Lock1 节点被删除,Client2 会立刻收到通知。这时候 Client2 会再次查询 ParentLock 下面的所有节点,确认自己创建的节点 Lock2 是不是目前最小的节点。如果是最小,则 Client2 顺理成章获得了锁。
同理,如果 Client2 也因为任务完成或者节点崩溃而删除了节点 Lock2,那么 Client3 就会接到通知。
最终,Client3 成功得到了锁。
实战 Redisson 实现分布式锁
Redisson 目前是官方唯一推荐的 Java 版的分布式锁并支持 Redlock
什么是 Redisson
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括 (BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson 提供了使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上
Redisson 应用场景
- Distributed Java applications 分布式 Java 应用,Redisson 为 Java 上的分布式应用程序提供了基于 Redis 的对象,集合,锁,同步器和服务的分布式实现
- Caching 缓存,Redisson 为 Java 提供了不同的基于 Redis 的缓存实现,如 JCache API,Hibernate 二级缓存,Spring Cache 和应用程序级别缓存
- Data Source Caching 数据源缓存,Redisson 支持适用 Read-Through,Write-Behind 策略对数据库。Web 服务器或者其他任何数据源进行基于 Redis 的读写缓存
- Distributed Java tasks Scheduling and execution 分布式 Java 任务调度和执行,在某些点上对 Java 的任务处理可以被拆分并且并行处理。Redisson 提供了 ExecutorService 和 ScheduledExecutorService 的分布式实现
- MapReduce Redisson 提供基于 Java 的 MapReduce 编程模型去处理存储在 Redis 中的大型数据
- Easy Java Redis client 简单的 Java Redis 客户端,Redission 可以被用来作为 Java Redis 客户端,没有学习难度,有了它 就可以不需要去知道所有 Redis 命令 就开始使用 Redis
- Web session clustering web 会话集群,负载均衡用户所有会话。Redisson 提供了基于 Tomcat 会话管理和 Spring Session 实现
- Distributed locks with Redis 实现 可重入锁、读写锁、公平锁、信号量、CountDownLatch 等很多种复杂的锁的语义,满足我们对分布式锁的不同层次的需求,这一点来说 ZK 分布式锁就显得匮乏一些了
Redisson 结构
Redisson 作为独立节点可以用于独立执行其他节点发布到分布式执行服务和分布式调度任务服务里的远程任务。
Redisson 操作对象
-
RedissonObject
,操作通用对象
package com.funtl.hello.spring.cloud.commons.redisson.operation;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.redisson.spring.starter.RedissonProperties;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
public class RedissonObject {
/**
* 数据缓存时间,默认 30 分钟
*/
private static final Long DATA_VALID_TIME = 1000 * 60 * 30L;
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonProperties redissonProperties;
/**
* 获取对象值
*
* @param name
* @param <T>
* @return
*/
public <T> T getValue(String name) {
RBucket<T> bucket = redissonClient.getBucket(name);
return bucket.get();
}
/**
* 获取对象空间
*
* @param name
* @param <T>
* @return
*/
public <T> RBucket<T> getBucket(String name) {
return redissonClient.getBucket(name);
}
/**
* 设置对象的值
*
* @param name 键
* @param value 值
* @return
*/
public <T> void setValue(String name, T value) {
setValue(name, value, DATA_VALID_TIME);
}
/**
* 设置对象的值
*
* @param name 键
* @param value 值
* @param time 缓存时间 单位毫秒 -1 永久缓存
* @return
*/
public <T> void setValue(String name, T value, Long time) {
RBucket<Object> bucket = redissonClient.getBucket(name);
if (time == -1) {
bucket.set(value);
} else {
bucket.set(value, time, TimeUnit.MILLISECONDS);
}
}
/**
* 如果值已经存在则则不设置
*
* @param name 键
* @param value 值
* @param time 缓存时间 单位毫秒
* @return true 设置成功,false 值存在,不设置
*/
public <T> Boolean trySetValue(String name, T value, Long time) {
RBucket<Object> bucket = redissonClient.getBucket(name);
boolean b;
if (time == -1) {
b = bucket.trySet(value);
} else {
b = bucket.trySet(value, time, TimeUnit.MILLISECONDS);
}
return b;
}
/**
* 如果值已经存在则则不设置
*
* @param name 键
* @param value 值
* @return true 设置成功,false 值存在,不设置
*/
public <T> Boolean trySetValue(String name, T value) {
return trySetValue(name, value, DATA_VALID_TIME);
}
/**
* 删除对象
*
* @param name 键
* @return true 删除成功,false 不成功
*/
public Boolean delete(String name) {
return redissonClient.getBucket(name).delete();
}
}
-
RedissonBinary
,操作对象二进制
package com.funtl.hello.spring.cloud.commons.redisson.operation;
import org.redisson.api.RBinaryStream;
import org.redisson.api.RListMultimap;
import org.redisson.api.RedissonClient;
import javax.annotation.Resource;
import java.io.InputStream;
import java.io.OutputStream;
public class RedissonBinary {
@Resource
private RedissonClient redissonClient;
/**
* 获取输出流
*
* @param name
* @return
*/
public OutputStream getOutputStream(String name) {
RListMultimap<Object, Object> listMultimap = redissonClient.getListMultimap("");
RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
return binaryStream.getOutputStream();
}
/**
* 获取输入流
*
* @param name
* @return
*/
public InputStream getInputStream(String name) {
RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
return binaryStream.getInputStream();
}
/**
* 获取输入流
*
* @param name
* @return
*/
public InputStream getValue(String name, OutputStream stream) {
try {
RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
InputStream inputStream = binaryStream.getInputStream();
byte[] buff = new byte[1024];
int len;
while ((len = inputStream.read(buff)) != -1) {
stream.write(buff, 0, len);
}
return binaryStream.getInputStream();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 获取对象空间
*
* @param name
* @return
*/
public RBinaryStream getBucket(String name) {
return redissonClient.getBinaryStream(name);
}
/**
* 设置对象的值
*
* @param name 键
* @param value 值
* @return
*/
public void setValue(String name, InputStream value) {
try {
RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
binaryStream.delete();
OutputStream outputStream = binaryStream.getOutputStream();
byte[] buff = new byte[1024];
int len;
while ((len = value.read(buff)) != -1) {
outputStream.write(buff, 0, len);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 删除对象
*
* @param name 键
* @return true 删除成功,false 不成功
*/
public Boolean delete(String name) {
RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
return binaryStream.delete();
}
}
-
RedissonCollection
,操作集合
package com.funtl.hello.spring.cloud.commons.redisson.operation;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class RedissonCollection {
/**
* 数据缓存时间,默认 30 分钟
*/
private static final Long DATA_VALID_TIME = 1000 * 60 * 30L;
@Resource
private RedissonClient redissonClient;
/**
* 获取map集合
*
* @param name
* @param <K>
* @param <V>
* @return
*/
public <K, V> RMap<K, V> getMap(String name) {
return redissonClient.getMap(name);
}
/**
* 设置map集合
*
* @param name
* @param data
* @param time 缓存时间,单位毫秒 -1永久缓存
* @return
*/
public void setMapValues(String name, Map data, Long time) {
RMap map = redissonClient.getMap(name);
Long dataValidTime = DATA_VALID_TIME;
if (time != -1) {
map.expire(dataValidTime, TimeUnit.MILLISECONDS);
}
map.putAll(data);
}
/**
* 设置map集合
*
* @param name
* @param data
* @return
*/
public void setMapValues(String name, Map data) {
setMapValues(name, data, DATA_VALID_TIME);
}
/**
* 获取List集合
*
* @param name
* @return
*/
public <T> RList<T> getList(String name) {
return redissonClient.getList(name);
}
/**
* 设置List集合
*
* @param name
* @param data
* @param time 缓存时间,单位毫秒 -1永久缓存
* @return
*/
public void setListValues(String name, List data, Long time) {
RList list = redissonClient.getList(name);
Long dataValidTime = DATA_VALID_TIME;
if (time != -1) {
list.expire(dataValidTime, TimeUnit.MILLISECONDS);
}
list.addAll(data);
}
/**
* 设置List集合
*
* @param name
* @param data
* @return
*/
public void setListValues(String name, List data) {
setListValues(name, data, DATA_VALID_TIME);
}
/**
* 获取set集合
*
* @param name
* @return
*/
public <T> RSet<T> getSet(String name) {
return redissonClient.getSet(name);
}
/**
* 设置set集合
*
* @param name
* @param data
* @param time 缓存时间,单位毫秒 -1永久缓存
* @return
*/
public void setSetValues(String name, Set data, Long time) {
RSet set = redissonClient.getSet(name);
Long dataValidTime = DATA_VALID_TIME;
if (time != -1) {
set.expire(dataValidTime, TimeUnit.MILLISECONDS);
}
set.addAll(data);
}
/**
* 设置set集合
*
* @param name
* @param data
* @return
*/
public void setSetValues(String name, Set data) {
setSetValues(name, data, DATA_VALID_TIME);
}
}
-
RedissonConfiguration
,使用 Java 配置的方式注入工具类
package com.funtl.hello.spring.cloud.commons.configuration;
import com.funtl.hello.spring.cloud.commons.redisson.operation.RedissonBinary;
import com.funtl.hello.spring.cloud.commons.redisson.operation.RedissonCollection;
import com.funtl.hello.spring.cloud.commons.redisson.operation.RedissonObject;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfiguration {
@Bean
@ConditionalOnMissingBean(RedissonObject.class)
public RedissonObject redissonObject() {
return new RedissonObject();
}
@Bean
@ConditionalOnMissingBean(RedissonBinary.class)
public RedissonBinary redissonBinary() {
return new RedissonBinary();
}
@Bean
@ConditionalOnMissingBean(RedissonCollection.class)
public RedissonCollection redissonCollection() {
return new RedissonCollection();
}
}
通过 RLock 对象操作分布式锁
注意: 此处新建一个名为
provider-item-service
的服务提供者,复制之前创建的provider-admin-service
项目并修改相关配置即可
- 创建测试表和数据,其中
num
表示库存的数量,这里演示多 JVM 环境下对临界资源的控制(解决超卖问题)
create table tb_item (id int not null primary key,name varchar(100),num int not null);
insert into tb_item(id, name,num) values(1000000, 'Apple', 3);
- 测试代码如下
package com.funtl.hello.spring.cloud.provider.controller;
import com.funtl.hello.spring.cloud.provider.domain.TbItem;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@Slf4j
@RestController
public class ProviderItemController {
@Resource
private RedissonClient redissonClient;
@PostMapping(value = "num/local")
public String testNumLock(TbItem tbItem) {
// 加锁,此处根据商品名称加锁
RLock lock = redissonClient.getLock(tbItem.getName());
lock.lock();
log.info("Thread {} 拿到了 {} 的锁", Thread.currentThread().getId(), tbItem.getName());
try {
// 阻塞模拟业务操作时间
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 解锁
lock.unlock();
log.info("Thread {} 释放了 {} 的锁", Thread.currentThread().getId(), tbItem.getName());
return "ok";
}
}
- 使用 PostMan 发起多个测试请求,控制台输出如下
Thread 104 拿到了 Apple 的锁
Thread 104 释放了 Apple 的锁
Thread 106 拿到了 Apple 的锁
Thread 106 释放了 Apple 的锁
- 如果在业务处理过程中服务宕机,Redis 会自动释放锁
通过自定义注解操作分布式锁
从上面的代码可以看出这是一个典型的环绕切面,我们可以使用 AOP 思想将交叉业务剥离出来,采用注解的方式切面操作分布式锁
- 增加 AOP 依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>
- 创建一个名为
RedissonLockModel
的枚举类型,用于设定各种锁的模式
package com.funtl.hello.spring.cloud.commons.redisson.enums;
public enum RedissonLockModel {
/**
* 可重入锁:某个线程已经获得某个锁,可以再次获取锁而不会出现死锁
*/
REENTRANT,
/**
* 公平锁:加锁前先查看是否有排队等待的线程,有的话优先处理排在前面的线程
*/
FAIR,
/**
* 联锁:可以把一组锁当作一个锁来加锁和释放
* 基于 Redis 的分布式 RedissonMultiLock 对象将多个 RLock 对象分组,并将它们作为一个锁处理。
* 每个 RLock 对象可能属于不同的 Redisson 实例
*/
MULTIPLE,
/**
* 红锁:用于解决异步数据丢失和脑裂问题
* 假设有多个 Redis 节点,这些节点之间既没有主从,也没有集群关系。
* 客户端用相同的 key 和随机值在多个节点上请求锁,请求锁的超时时间应小于锁自动释放时间。
* 当超过半数 Redis 上请求到锁的时候,才算是真正获取到了锁。
* 如果没有获取到锁,则把部分已锁的 Redis 释放掉
*/
REDLOCK,
/**
* 读锁(共享锁):共享用于不更改或不更新数据的操作(只读操作),如 SELECT 语句
* 如果事务 T 对数据 A 加上共享锁后,则其他事务只能对 A 再加共享锁,不能加排他锁。
* 获准共享锁的事务只能读数据,不能修改数据
*/
READ,
/**
* 写锁(排他锁):用于数据修改操作,例如 INSERT、UPDATE 或 DELETE。确保不会同时同一资源进行多重更新
* 如果事务 T 对数据 A 加上排他锁后,则其他事务不能再对 A 加任何类型的锁。
* 获准排他锁的事务既能读数据,又能修改数据。
* 我们在操作数据库的时候,可能会由于并发问题而引起的数据的不一致性(数据冲突)
*/
WRITE,
/**
* 自动模式,当参数只有一个使用 REENTRANT 参数多个 REDLOCK
*/
AUTO
}
- 创建一个名为
RedissonLockException
自定义分布式锁异常类
package com.funtl.hello.spring.cloud.commons.redisson.excepiton;
public class RedissonLockException extends RuntimeException {
public RedissonLockException() {
}
public RedissonLockException(String message) {
super(message);
}
public RedissonLockException(String message, Throwable cause) {
super(message, cause);
}
public RedissonLockException(Throwable cause) {
super(cause);
}
public RedissonLockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
- 创建一个名为
RedissonLock
自定义注解
package com.funtl.hello.spring.cloud.commons.redisson.annotation;
import com.funtl.hello.spring.cloud.commons.redisson.enums.RedissonLockModel;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedissonLock {
/**
* 锁的模式:如果不设置,自动模式,当参数只有一个使用 REENTRANT 参数多个 MULTIPLE
*
* @return
*/
RedissonLockModel lockModel() default RedissonLockModel.AUTO;
/**
* 如果 keys 有多个,如果不设置则使用联锁
*
* @return
*/
String[] keys() default {};
/**
* key 的静态常量:当 key 的 spel 的值是 List,数组时使用 + 号连接将会被 spel 认为这个变量是个字符串,只能产生一把锁,达不到我们的目的
* 而我们如果又需要一个常量的话这个参数将会在拼接在每个元素的后面
*
* @return
*/
String keyConstant() default "";
/**
* 锁超时时间,默认 30000 毫秒
*
* @return
*/
long lockWatchdogTimeout() default 0;
/**
* 等待加锁超时时间,默认 10000 毫秒 -1 则表示一直等待
*
* @return
*/
long attemptTimeout() default 0;
}
- 创建一个名为
RedissonLockAop
用于实现RedissonLock
自定义注解
package com.funtl.hello.spring.cloud.commons.redisson.aop;
import com.funtl.hello.spring.cloud.commons.redisson.annotation.RedissonLock;
import com.funtl.hello.spring.cloud.commons.redisson.enums.RedissonLockModel;
import com.funtl.hello.spring.cloud.commons.redisson.excepiton.RedissonLockException;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.RedissonMultiLock;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.annotation.Order;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
@Aspect
@Order(-10)
public class RedissonLockAop {
/**
* 等待加锁超时时间 -1 一直等待
*/
private static final Long ATTEMPT_TIMEOUT = 10000L;
/**
* 看门狗
* 在一个分布式环境下,多个服务实例请求获取锁,其中服务实例 A 成功获取到了锁,在执行业务逻辑的过程中,服务实例突然挂掉了可以采用锁超时机制解决
* 如果服务实例 A 没有宕机但是业务执行还没有结束,锁释放掉了就会导致线程问题(误删锁)。此时就一定要实现自动延长锁有效期的机制
* 看门狗的主要作用:只要这台服务实例没有挂掉,并且没有主动释放锁,看门狗都会每隔十秒给你续约一下,保证锁一直在你手中
*/
private static final Long LOCK_WATCH_DOG_TIMEOUT = 30000L;
private RedissonLockModel lockModel;
@Autowired
private RedissonClient redissonClient;
@Pointcut("@annotation(redissonLock)")
public void controllerAspect(RedissonLock redissonLock) {
}
/**
* 通过 Spring SpEL 获取参数
*
* @param key 定义的 key值以 # 开头 例如:#user
* @param parameterNames 形参
* @param values 形参值
* @param keyConstant key的常亮
* @return
*/
public List<String> getVauleBySpel(String key, String[] parameterNames, Object[] values, String keyConstant) {
List<String> keys = new ArrayList<>();
if (!key.contains("#")) {
String s = "redisson:lock:" + key + keyConstant;
log.info("没有使用 SpEL 表达式 value -> {}", s);
keys.add(s);
return keys;
}
// SpEL 解析器
ExpressionParser parser = new SpelExpressionParser();
// SpEL 上下文
EvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], values[i]);
}
Expression expression = parser.parseExpression(key);
Object value = expression.getValue(context);
if (value != null) {
if (value instanceof List) {
List value1 = (List) value;
for (Object o : value1) {
keys.add("redisson:lock:" + o.toString() + keyConstant);
}
} else if (value.getClass().isArray()) {
Object[] obj = (Object[]) value;
for (Object o : obj) {
keys.add("redisson:lock:" + o.toString() + keyConstant);
}
} else {
keys.add("redisson:lock:" + value.toString() + keyConstant);
}
}
log.info("SpEL 表达式 key={}, value={}", key, keys);
return keys;
}
@Around("controllerAspect(redissonLock)")
public Object aroundAdvice(ProceedingJoinPoint proceedingJoinPoint, RedissonLock redissonLock) throws Throwable {
String[] keys = redissonLock.keys();
if (keys.length == 0) {
throw new RuntimeException("keys 不能为空");
}
String[] parameterNames = new LocalVariableTableParameterNameDiscoverer().getParameterNames(((MethodSignature) proceedingJoinPoint.getSignature()).getMethod());
Object[] args = proceedingJoinPoint.getArgs();
long attemptTimeout = redissonLock.attemptTimeout();
if (attemptTimeout == 0) {
attemptTimeout = ATTEMPT_TIMEOUT;
}
long lockWatchdogTimeout = redissonLock.lockWatchdogTimeout();
if (lockWatchdogTimeout == 0) {
lockWatchdogTimeout = LOCK_WATCH_DOG_TIMEOUT;
}
RedissonLockModel lockModel = redissonLock.lockModel();
if (lockModel.equals(RedissonLockModel.AUTO)) {
RedissonLockModel lockModel1 = lockModel;
if (lockModel1 != null && !lockModel1.equals(RedissonLockModel.AUTO)) {
lockModel = lockModel1;
} else if (keys.length > 1) {
lockModel = RedissonLockModel.REDLOCK;
} else {
lockModel = RedissonLockModel.REENTRANT;
}
}
if (!lockModel.equals(RedissonLockModel.MULTIPLE) && !lockModel.equals(RedissonLockModel.REDLOCK) && keys.length > 1) {
throw new RuntimeException("参数有多个, 锁模式为 -> " + lockModel.name() + ".无法锁定");
}
log.info("锁模式 -> {}, 等待锁定时间 -> {} 秒.锁定最长时间 -> {} 秒", lockModel.name(), attemptTimeout / 1000, lockWatchdogTimeout / 1000);
boolean res = false;
RLock rLock = null;
// 一直等待加锁
switch (lockModel) {
case FAIR:
rLock = redissonClient.getFairLock(getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant()).get(0));
break;
case REDLOCK:
List<RLock> rLocks = new ArrayList<>();
for (String key : keys) {
List<String> vauleBySpel = getVauleBySpel(key, parameterNames, args, redissonLock.keyConstant());
for (String s : vauleBySpel) {
rLocks.add(redissonClient.getLock(s));
}
}
RLock[] locks = new RLock[rLocks.size()];
int index = 0;
for (RLock r : rLocks) {
locks[index++] = r;
}
rLock = new RedissonRedLock(locks);
break;
case MULTIPLE:
rLocks = new ArrayList<>();
for (String key : keys) {
List<String> vauleBySpel = getVauleBySpel(key, parameterNames, args, redissonLock.keyConstant());
for (String s : vauleBySpel) {
rLocks.add(redissonClient.getLock(s));
}
}
locks = new RLock[rLocks.size()];
index = 0;
for (RLock r : rLocks) {
locks[index++] = r;
}
rLock = new RedissonMultiLock(locks);
break;
case REENTRANT:
List<String> vauleBySpel = getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant());
//如果spel表达式是数组或者LIST 则使用红锁
if (vauleBySpel.size() == 1) {
rLock = redissonClient.getLock(vauleBySpel.get(0));
} else {
locks = new RLock[vauleBySpel.size()];
index = 0;
for (String s : vauleBySpel) {
locks[index++] = redissonClient.getLock(s);
}
rLock = new RedissonRedLock(locks);
}
break;
case READ:
RReadWriteLock rwlock = redissonClient.getReadWriteLock(getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant()).get(0));
rLock = rwlock.readLock();
break;
case WRITE:
RReadWriteLock rwlock1 = redissonClient.getReadWriteLock(getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant()).get(0));
rLock = rwlock1.writeLock();
break;
}
// 执行 AOP
if (rLock != null) {
try {
if (attemptTimeout == -1) {
res = true;
// 一直等待加锁
rLock.lock(lockWatchdogTimeout, TimeUnit.MILLISECONDS);
} else {
res = rLock.tryLock(attemptTimeout, lockWatchdogTimeout, TimeUnit.MILLISECONDS);
}
if (res) {
Object obj = proceedingJoinPoint.proceed();
return obj;
} else {
throw new RedissonLockException("获取锁失败");
}
} finally {
if (res) {
rLock.unlock();
}
}
}
throw new RedissonLockException("获取锁失败");
}
}
- 在
RedissonConfiguration
中注入RedissonLockAop
@Bean
@ConditionalOnMissingBean(RedissonLockAop.class)
public RedissonLockAop redissonLockAop() {
return new RedissonLockAop();
}
- 在 Controller 层使用注解的方式加锁
@RedissonLock(keys = "#tbItem.name", lockModel = RedissonLockModel.AUTO)
@PostMapping(value = "num/annotation")
public String testNumAnnotation(TbItem tbItem) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ok";
}
- 使用 PostMan 测试,控制台打印结果如下
锁模式 -> REENTRANT, 等待锁定时间 -> 10 秒.锁定最长时间 -> 30 秒
SpEL 表达式 key=#tbItem.name, value=[redisson:lock:Apple]
注意: 记得同时通过 Redis 客户端工具观察数据变化
特别说明: 本人平时混迹于 B 站,不咋回复这里的评论,有问题可以到 B 站视频评论区留言找我
视频地址: https://space.bilibili.com/31137138/favlist?fid=326428938
课件说明: 本次提供的课件是 Spring Cloud Netflix 版微服务架构指南,如果有兴趣想要学习 Spring Cloud Alibaba 版,可以前往 http://www.qfdmy.com 查看相关课程资源
案例代码: https://github.com/topsale/hello-spring-cloud-netflix