81 zookeeper 分布式锁

1,分布式锁实现本质?

  1,重试策略
  2,超时控制;
  3,续命设计,续命如何避免死锁
  4,性能优化
   5,高可用
  6,公平性

2,分布式有哪些应用场景?

   1,集群环境中保证定时任务执行的幂等性问题 基于zk案例
         幂等性:执行的结果保证唯一不重复
       当我们定时任务集群的情况下,有可能会同时重复执行定时任务。
        解决思路: 多个jvm集群的定时任务 在触发的时候,获取分布式锁。如何                                        
         获取到分布式的jvm,就能够执行定时任务,没有获取到分布式锁的jvm就 
         不能执行定时任务。
image.png

2,用户下单库存超卖问题,防止超卖Redis解决。

3,锁的特性:互斥,悲观与乐观锁,信号量回顾。

      JUC锁:

悲观锁: 没有获取到锁的线程会直接阻塞等待。
将一个线程从用户态切换到内核态; 后期唤醒的成本非常高,内核态切换到用户态
乐观锁:
乐观: 没有获取到锁的线程不会一直阻塞等待,而是通过自旋的形式。比如cas;缺点:比较消耗cpu资源。
阻塞式和非阻塞式:
阻塞式锁: 没有获取到锁的线程就一直组撒;
非租塞式锁: 没有获取到锁的线程就不会组撒。
锁重入性: 当前线程获取到锁后,不断的实现复用。
公平性与非公平锁:
默认使用synchronized非公平性锁,靠cpu 争抢获取锁。
公平锁: 根据获取锁的时间排列,依赖于链表结构。

重量级锁: 实际上是悲观锁,没有获取锁的线程直接变为阻塞状态。
悲观锁: 当没有获取锁的线程,会阻塞。
乐观锁: 通过cas自旋,一直不断死循环重试获取锁,缺点消耗cpu资源。
阻塞与非租塞:阻塞;没有获取到锁就会阻塞,非租塞:没有获取到锁的线程不会阻塞。
重量级锁: 没有获取到锁的线程直接阻塞,没有自旋。
公平锁与非公平锁:synchronized默认情况下属于非公平性锁,根据争抢的形式实现。
公平性锁: 根据获取锁的时间排列获取锁。

Synchronized锁与分布式锁区别:

Synchronized锁适合于的单个jvm中保证线程安全问题
分布式锁适合于多个jvm之间跨网络通讯方式实现分布式锁。
Synchronized 锁属于传统锁,适合于单机jvm版本使用;
分布式锁适合于跨jvm之间通讯保证共享数据安全问题;

4,简单回顾zookeeper 使用基本特性

官方文档这样解释zookeeper,他是一个分布式服务框架,是Apache Hadoop 的一个子项目,他主要是用来解决解决分布式应用中经常遇到的一些数据管理问题,如: 统一命名服务,状态同步服务,集群管理,分布式应用配置项的管理等等
上面的解释有些抽象,简单的说:就是zookeeper=文件系统+监听通知机制。


image.png

路径的名称是不能重复的。
zookeeper的作用:
服务注册和订阅(共同节点);
分布式通知(监听znode)分布式配置中心。
服务命名(znode特性)
数据订阅,发布(watcer)
分布式锁(临时节点)

Zookeeper 节点类型:

持久化节点(zk断开节点还在)
持久化顺序编号目录节点
临时目录节点(客户端断开后节点就删除了)
临时目录编号目录节点:
Watch 当节点的内容发生变化的时候 可以实现通知给其他监听者。
临时与持久节点的区别:
临时节点当我们断开连接后,该临时节点自动删除
持久节点当我们断开连接以后,持久化到硬盘
事件通知Watch:

5,基于zookeeper 如何实现分布式锁原理

---临时节点加事件通知

6, 分布式锁如何避免羊群效应

7,zookeeper 如何避免死锁的问题 ?

8,站在ZAB 协议角度分析ZK实现分布式锁

9,ZK实现分布式锁如何自旋

10 Zk实现分布式锁如何续命问题

分布式锁的实现方案:
1,基于数据库实现 --淘汰;
2, 基于zookeeper
3,基于Redis setnx实现
4,Redis框架 Redisson,RedisLock

Zookeeper 简单实现分布式锁

zookeeper 实现分布式锁原理:
Zookeeper 实现分布式锁原理:
Zookeeper 节点路径不能允许重复 保证唯一性
临时节点+事件通知
1,获取锁的方法:
多个jvm 同时在zk上创建一个临时节点/lockPath
最终只能有一个jvm创建节点成功,如果能创建临时节点成功表示获取锁成功能执行业务逻辑。如果没有创建临时节点成功的jvm,则表示获取锁失败。获取锁失败后,可以采用不断重试策略,重试多次。。获取锁失败后,当前的jvm 就进入到阻塞状态。
2,释放锁的方法:
直接调用.close();释放锁
因为采用临时节点,当我们调用close()方法的时候该临时节点会自动删除。
其他没有获取锁的jvm,就会从新进入到获取锁的状态。
3,被唤醒的方法:
被阻塞的jvm(没有获取成功的jvm),采用事件监听的方式;监听到节点已经被删除的情况下,则开始从新进入到获取锁的状态。

在zookeeper 中,节点路径不不允许重复。
获取锁的原理:
当多个jvm同时获取锁的时候,实际上底层在zk同时创建一个临时节点路径/localpath,但是最终只能有有个jvm'能够创建该节点成功,如果成功的情况下,则认为获取锁成功,如果失败的情况下,则认为获取锁失败。可以采用重试的机制,重试多次开始失败的情况下,当前jvm会进入阻塞状态,等待获取锁的jvm释放锁唤醒。
释放锁定原理:
主动关闭当前会话连接,由于使用临时节点,则该节点自动会被删除。
唤醒原理:
正在等待的jvm 通过watch机制,收到该节点被释放以后,开始重新进入获取到锁流程。

Zookeeper常见的问题:

创建的是一个临时节点,主要调用close该临时节点会立即删除。强制直接关闭tomcat jvm进程,该临时节点不会立马删除 延迟。
zookeeper实现分布式锁的疑问:
业务超时,一直不释放锁如何处理:
可以采用续命设计,续命多次如果业务还是没有执行完毕的情况下,则认为锁超时的,应该主动释放该锁,防止其他jvm一直阻塞等待。

Zookeeper快速入门

pom

<!-- java语言连接zk -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.8</version>
</dependency>

Zookeeper实现分布式锁代码

package com.taotao.myktdistributedluck.lock;

/**
 * @author wangjin
 * @title: Lock
 * @description; 项目
 * @date 2021/3/7 11:28
 */

public interface Lock {
    /**
     * 获取锁
     */
    void getLock() throws InterruptedException;
    /**
     * 释放锁
     */
    void unLock();
}




package com.taotao.myktdistributedluck.lock.impl;

import com.taotao.myktdistributedluck.lock.Lock;
import com.taotao.myktdistributedluck.utils.ZkClientUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;

/**
 * @author wangjin
 * @title: ZookeeperAbstractTemplateLock
 * @description; 模板引擎方法
 * @date 2021/3/7 11:30
 */
@Slf4j
public abstract class ZookeeperAbstractTemplateLock implements Lock {
    @Value("${server.port}")
    private String serverport;

    /**
     * 模板方法设计模式 定义共同的业务逻辑骨架,不同的算法交于实现类
     */
  /*  @Override
    public void getLock() throws InterruptedException {
        //1,调用tryLock方法获取锁
        if (tryLock()) {
            log.info("获取锁成功{}", serverport);
        } else {
            //2 获取锁失败,jvm阻塞
            waitLock();
            //3 唤醒之后从新进入到获取锁的状态
            getLock();
        }
    }
*/
    public abstract void waitLock() throws InterruptedException;

    public abstract boolean tryLock();

    @Override
    public void unLock() {
        //关闭连接,释放锁
        ZkClientUtils.getZkClient().close();
    }
@Override
public  void getLock() throws InterruptedException {
        //调用trylock获取锁
    for (int i = 0; i < 5; i++) {
        boolean result=tryLock();
        if(result){
            return;
        }
    }
    waitLock();
    getLock();
}
}




package com.taotao.myktdistributedluck.lock.impl;

import com.taotao.myktdistributedluck.utils.ZkClientUtils;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

/**
 * @author wangjin
 * @title: ZookeeperTemplateLock
 * @description; 项目
 * @date 2021/3/7 11:38
 */
@Component
public class ZookeeperTemplateLock extends ZookeeperAbstractTemplateLock {
    //分布式锁的路径
    private String lock = "/lockPath";
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    @Override
    public void waitLock() throws InterruptedException {
        //创建事件监听
        {
            IZkDataListener iZkDataListener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {

                }

                @Override
                public void handleDataDeleted(String s) throws Exception {
                    //当节点删除后我们应该从新唤醒
                    countDownLatch.countDown();
                }
            };
            ZkClientUtils.getZkClient().subscribeDataChanges(lock, iZkDataListener);
            try {
                //让当前线程阻塞
                countDownLatch.await();
            } catch (Exception e) {
                e.getCause();
            }
            //当我们节点被删除以后,我们应该重新唤醒,移除事件监听
            ZkClientUtils.getZkClient().unsubscribeDataChanges(lock, iZkDataListener);
        }

    }

    @Override
    public boolean tryLock() {
        /**
         * 1.获取锁方法:
         * 多个jvm同时在zk上创建一个临时节点/lockPath,
         * 最终只能够有一个jvm创建临时节点成功,如果能够创建
         * 临时节点成功jvm 表示获取锁成功能够正常执行业务逻辑,
         * 如果没有创建临时节点成功的jvm,则表示获取锁失败。
         * 获取锁失败之后,可以采用不断重试策略,重试多次
         * 获取锁失败之后,当前的jvm就进入到阻塞状态。
         */
        try {
            ZkClientUtils.newZkClient().createEphemeral(lock);
            return true;
        } catch (Exception e) {
            e.getCause();
            //获取锁失败其他jvm获取锁
            return false;
        }


    }
}


package com.taotao.myktdistributedluck;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.Schedules;

@EnableScheduling
@SpringBootApplication
public class MyktDistributedLuckApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyktDistributedLuckApplication.class, args);
    }

}



package com.taotao.myktdistributedluck;


import com.taotao.myktdistributedluck.lock.impl.ZookeeperTemplateLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
*@title: MayiktTask
*@description; 项目
*@author wangjin
*@date 2021/3/7 13:13
*/  
@Slf4j
@Component
public class MayiktTask {
    @Value("${server.port}")
    private String serverport;
     @Autowired
     private ZookeeperTemplateLock zookeeperTemplateLock;
    /**
     * 每隔2s执行定时任务
     */
    @Scheduled(cron = "0/2 * * * * *")
    public  synchronized  void taskService() throws InterruptedException {
        zookeeperTemplateLock.getLock();
        log.info("[{}]正在调用阿里云发送短信"+serverport);
        zookeeperTemplateLock.unLock();
    }
}

package com.taotao.myktdistributedluck.utils;

import org.I0Itec.zkclient.ZkClient;

/**
 * @ClassName ZkClientUtils
 * @Author 
 * @Version V1.0
 **/
public class ZkClientUtils {
    private static ZkClient zkClient = null;

    public static ZkClient getZkClient() {
        return zkClient;
    }

    public static ZkClient newZkClient() {
        if (zkClient != null) {
            zkClient.close();
        }
        zkClient = new ZkClient("127.0.0.1:2181", 5000);
        return zkClient;
    }
}

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容