在一些并发请求的时候,需要保证数据的准确性,在同一时刻只能允许一个请求对同一条数据进行修改操作。在之前的单机应用当中,很容易想到利用jdk的锁来实现,例如 synchronized或者lock 。但是在如今业务复杂的分布式系统中jdk的锁并不适用,所以必须要要用分布式锁。
常见的几种分布式锁的实现方案,Redis、Mysql 、zookeeper;
本文主要讲的是如何使用zk实现分布式锁:
ZooKeeper是一个开源的分布式协调服务,他为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名空间服务,配置服务和分布式锁等分布式基础服务。
ZooKeeper的数据模型是内存中的一个ZNode数,由斜杠(/)进行分割的路径,就是一个ZNode,每个ZNode上除了保存自己的数据内容,还保存一系列属性信息。
ZooKeeper中的数据节点分为两种:持久节点和临时节点。所谓的持久节点是指一旦这个ZNode创建成功,除非主动进行ZNode的移除操作,节点会一直保存在ZooKeeper上;而临时节点的生命周期是跟客户端的(Session)会话相关联的,一旦客户端会话失效,这个会话上的所有临时节点都会被自动移除。
具体思路:
1、首先zookeeper中我们可以创建一个/distributed_lock持久化节点
2、然后再在/distributed_lock节点下创建自己的临时顺序节点,比如:/distributed_lock/task_00000000008
3、获取所有的/distributed_lock下的所有子节点,并排序
4、判读自己创建的节点是否最小值(第一位)
5、如果是,则获取得到锁,执行自己的业务逻辑,最后删除这个临时节点。
6、如果不是最小值,则需要监听自己创建节点前一位节点的数据变化,并阻塞。
7、当前一位节点被删除时,我们需要通过递归来判断自己创建的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)
一、导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>zk_lock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zk_lock</name>
<description>zk_lock</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency><dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
主要包括了zk客户端的依赖,mybatis-plus的依赖。
二、数据的配置
#数据库
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db3?characterEncoding=utf8&verifyServerCertificate=false&useSSL=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
三、具体代码实现
分布式锁工具类
package com.example.zk_lock.util;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author wls
*/
@Slf4j
public class DistributedLockUtil {
/**
* 常量
*/
private static final String CONNECTION_STRING = "localhost:2181";
/**
* 父节点
*/
private static final String LOCK_NODE = "/distributed_lock";
private static final String CHILDREN_NODE = "/lock_";
public static DistributedLockUtil distributedLockUtil;
public static ZkClient zkClient;
/**
* 创建zkClient
*/
public DistributedLockUtil() {
distributedLockUtil = this;
// 连接到Zookeeper
zkClient = new ZkClient(new ZkConnection(CONNECTION_STRING));
//创建节点
if (!zkClient.exists(LOCK_NODE)) {
zkClient.create(LOCK_NODE, "分布式锁节点", CreateMode.PERSISTENT);
}
}
/**
* 获取锁
*
* @return 锁名字
*/
public static String getLock() {
try {
// 1.在Zookeeper指定节点下创建临时顺序节点
String lockName = zkClient.createEphemeralSequential(LOCK_NODE + CHILDREN_NODE, "");
// 尝试获取锁
acquireLock(lockName);
return lockName;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 尝试获取锁
*
* @throws InterruptedException 异常
*/
public static void acquireLock(String lockName) throws InterruptedException {
// 2.获取lock节点下的所有子节点
List<String> childrenList = zkClient.getChildren(LOCK_NODE);
// 3.对子节点进行排序,获取最小值
childrenList.sort(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
}
});
// 4.判断当前创建的节点是否在第一位
int lockPosition = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);
if (lockPosition < 0) {
// 不存在该节点
throw new ZkNodeExistsException("不存在的节点:" + lockName);
}
if (lockPosition == 0) {
// 获取到锁
log.info("获取到锁:" + lockName);
return;
}
// 未获取到锁,阻塞
log.info("...... 未获取到锁,阻塞等待 。。。。。。");
// 5.如果未获取得到锁,监听当前创建的节点前一位的节点
final CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
/**
* 当被删除时的监听事件
* @param dataPath 节点
* @throws Exception 异常
*/
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 6.前一个节点被删除,当不保证轮到自己
log.info("。。。。。。前一个节点被删除 。。。。。。");
acquireLock(lockName);
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) {
//节点被改变
}
};
try {
//监听前一个节点
zkClient.subscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener);
//阻塞
latch.await();
} finally {
log.info("。。。。。取消订阅。。。。。。");
//取消监听
zkClient.unsubscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener);
}
}
/**
* 释放锁(删除节点)
*
* @param lockName 锁名字
*/
public static void releaseLock(String lockName) {
zkClient.delete(lockName);
}
public static void closeZkClient() {
zkClient.close();
}
}
具体业务实现
/**
* 库存递减
*
* @param id id
* @param num 数量
* @return
*/
@Override
public boolean killGoods(Long id, Integer num) {
String lock = DistributedLockUtil.getLock();
if (Objects.nonNull(lock)) {
Goods goods = this.getById(id);
if (goods.getQuantity() <= 0) {
//库存数量不足,释放锁
DistributedLockUtil.releaseLock(lock);
return false;
}
log.info("库存数量======" + goods.getQuantity());
//将库存减操作
goods.setQuantity(goods.getQuantity() - 1);
this.updateById(goods);
DistributedLockUtil.releaseLock(lock);
return true;
}
return false;
}
**
* @author wenlinshan
* @version 1.0
* @date 2021/6/16 11:06
* @desc
*/
@RequestMapping
@RestController
public class GoodsController {
@Resource
private GoodsService goodsService;
@GetMapping("test")
public String createOrderTest() {
if (!goodsService.killGoods(1405065181720055809L, 1)) {
return "库存不足";
}
return "创建订单成功";
}
@GetMapping("close")
public String closeZk(){
DistributedLockUtil.closeZkClient();
return "关闭成功";
}
}
至此一个简单的分布式锁的demo已经实现。