zk实现分布式锁

在一些并发请求的时候,需要保证数据的准确性,在同一时刻只能允许一个请求对同一条数据进行修改操作。在之前的单机应用当中,很容易想到利用jdk的锁来实现,例如 synchronized或者lock 。但是在如今业务复杂的分布式系统中jdk的锁并不适用,所以必须要要用分布式锁。

常见的几种分布式锁的实现方案,Redis、Mysql 、zookeeper;

本文主要讲的是如何使用zk实现分布式锁:

     ZooKeeper是一个开源的分布式协调服务,他为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名空间服务,配置服务和分布式锁等分布式基础服务。 

    ZooKeeper的数据模型是内存中的一个ZNode数,由斜杠(/)进行分割的路径,就是一个ZNode,每个ZNode上除了保存自己的数据内容,还保存一系列属性信息。

    ZooKeeper中的数据节点分为两种:持久节点和临时节点。所谓的持久节点是指一旦这个ZNode创建成功,除非主动进行ZNode的移除操作,节点会一直保存在ZooKeeper上;而临时节点的生命周期是跟客户端的(Session)会话相关联的,一旦客户端会话失效,这个会话上的所有临时节点都会被自动移除。

具体思路:

1、首先zookeeper中我们可以创建一个/distributed_lock持久化节点
2、然后再在/distributed_lock节点下创建自己的临时顺序节点,比如:/distributed_lock/task_00000000008
3、获取所有的/distributed_lock下的所有子节点,并排序
4、判读自己创建的节点是否最小值(第一位)
5、如果是,则获取得到锁,执行自己的业务逻辑,最后删除这个临时节点。
6、如果不是最小值,则需要监听自己创建节点前一位节点的数据变化,并阻塞。
7、当前一位节点被删除时,我们需要通过递归来判断自己创建的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)

一、导入依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>zk_lock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zk_lock</name>
    <description>zk_lock</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.9</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency><dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.3.1</version>
    </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.22</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
    </dependencies>

主要包括了zk客户端的依赖,mybatis-plus的依赖。

二、数据的配置

#数据库
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db3?characterEncoding=utf8&verifyServerCertificate=false&useSSL=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root

三、具体代码实现

分布式锁工具类

package com.example.zk_lock.util;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author wls
 */
@Slf4j
public class DistributedLockUtil {

    /**
     * 常量
     */
    private static final String CONNECTION_STRING = "localhost:2181";
    /**
     * 父节点
     */
    private static final String LOCK_NODE = "/distributed_lock";
    private static final String CHILDREN_NODE = "/lock_";

    public static DistributedLockUtil distributedLockUtil;
    public  static ZkClient zkClient;

    /**
     * 创建zkClient
     */
    public DistributedLockUtil() {
        distributedLockUtil = this;
        // 连接到Zookeeper
        zkClient = new ZkClient(new ZkConnection(CONNECTION_STRING));

        //创建节点
        if (!zkClient.exists(LOCK_NODE)) {
            zkClient.create(LOCK_NODE, "分布式锁节点", CreateMode.PERSISTENT);
        }
    }

    /**
     * 获取锁
     *
     * @return 锁名字
     */
    public static String getLock() {
        try {
            // 1.在Zookeeper指定节点下创建临时顺序节点
            String lockName = zkClient.createEphemeralSequential(LOCK_NODE + CHILDREN_NODE, "");
            // 尝试获取锁
            acquireLock(lockName);
            return lockName;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }

    /**
     * 尝试获取锁
     *
     * @throws InterruptedException 异常
     */
    public static void acquireLock(String lockName) throws InterruptedException {
        // 2.获取lock节点下的所有子节点
        List<String> childrenList = zkClient.getChildren(LOCK_NODE);
        // 3.对子节点进行排序,获取最小值
        childrenList.sort(new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
            }
        });
        // 4.判断当前创建的节点是否在第一位
        int lockPosition = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);
        if (lockPosition < 0) {
            // 不存在该节点
            throw new ZkNodeExistsException("不存在的节点:" + lockName);
        }
        if (lockPosition == 0) {
            // 获取到锁
            log.info("获取到锁:" + lockName);
            return;
        }
        // 未获取到锁,阻塞
        log.info("...... 未获取到锁,阻塞等待 。。。。。。");
        // 5.如果未获取得到锁,监听当前创建的节点前一位的节点
        final CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            /**
             * 当被删除时的监听事件
             * @param dataPath 节点
             * @throws Exception 异常
             */
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                // 6.前一个节点被删除,当不保证轮到自己
                log.info("。。。。。。前一个节点被删除  。。。。。。");
                acquireLock(lockName);
                latch.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) {
                //节点被改变
            }
        };
        try {
            //监听前一个节点
            zkClient.subscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener);
            //阻塞
            latch.await();
        } finally {
            log.info("。。。。。取消订阅。。。。。。");
            //取消监听
            zkClient.unsubscribeDataChanges(LOCK_NODE + "/" + childrenList.get(lockPosition - 1), listener);
        }

    }

    /**
     * 释放锁(删除节点)
     *
     * @param lockName 锁名字
     */
    public static void releaseLock(String lockName) {
        zkClient.delete(lockName);
    }

    public static void closeZkClient() {
        zkClient.close();
    }
}



具体业务实现

/**
     * 库存递减
     *
     * @param id  id
     * @param num 数量
     * @return
     */
    @Override
    public boolean killGoods(Long id, Integer num) {

        String lock = DistributedLockUtil.getLock();
        if (Objects.nonNull(lock)) {
            Goods goods = this.getById(id);
            if (goods.getQuantity() <= 0) {
                //库存数量不足,释放锁
                DistributedLockUtil.releaseLock(lock);
                return false;
            }
            log.info("库存数量======" + goods.getQuantity());
            //将库存减操作
            goods.setQuantity(goods.getQuantity() - 1);
            this.updateById(goods);
            DistributedLockUtil.releaseLock(lock);
            return true;
        }
        return false;
    }
**
 * @author wenlinshan
 * @version 1.0
 * @date 2021/6/16 11:06
 * @desc
 */
@RequestMapping
@RestController
public class GoodsController {
    @Resource
    private GoodsService goodsService;

    @GetMapping("test")
    public String createOrderTest() {
        if (!goodsService.killGoods(1405065181720055809L, 1)) {
            return "库存不足";
        }
        return "创建订单成功";
    }

    @GetMapping("close")
    public String closeZk(){
        DistributedLockUtil.closeZkClient();
        return "关闭成功";
    }

}

至此一个简单的分布式锁的demo已经实现。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容