年前临放假我们一起学习了在Redis中如何实现分布式锁,过年期间在家里好吃好喝好玩完毕,现在,该继续学习啦!
不知诸位还是否记得上次我们说的《沙滩 - 脚印》那个例子,在Zookeeper中,实现分布式锁原理也差不多。如果你不知道,快回头先看看分布式锁之Redis实现
如果您对zookeeper还不熟悉,需要先去了解相关背景知识。
一、Zookeeper特性
在开始之前,我们重温一下zookeeper中的一些概念性知识。
1、数据节点
zookeeper的视图结构和文件系统类似,存储于内存。其中, 每个节点称为数据节点znode。每个znode可以存储数据,也可以挂载子节点。
比如在Dubbo中,我们将服务的信息注册到zookeeper中。就是由一个个的节点和子节点组成。
[zk: localhost:2181(CONNECTED) 1] ls /dubbo/com.viewscenes.netsupervisor.service.InfoUserService
[consumers, configurators, routers, providers]
[zk: localhost:2181(CONNECTED) 2]
或者,我们将它理解为Windows系统中的文件夹,意思差不多的。
2、Watcher
Watcher(事件监听器),我们可以注册watcher监控znode的变化。比如znode删除、数据发生变化、子节点发生变化等。当触发这些事件时,客户端就会得到通知。
Watcher机制是Zookeeper实现分布式协调服务的重要特性。
3、节点类型
在zookeeper中,数据节点有着不同的类型。
- 持久节点(PERSISTENT)
- 持久顺序节点(PERSISTENT_SEQUENTIAL)
- 临时节点(EPHEMERAL)
- 临时顺序节点(EPHEMERAL_SEQUENTIAL)
持久节点,一旦被创建,除非主动进行删除操作,否则这个节点会一直保持。
临时节点,与客户端session绑定,一旦session失效,节点就会被自动删除。
有个重要的信息是,对于持久节点和临时节点,同一个znode下,节点的名称是唯一的! 就像在windows中,我们不可能在同一目录,创建两个相同名字的文件夹。记住它,这个实现分布式锁的基础。
二、实现
基于上面zookeeper的特性,对于分布式锁,我们就可以梳理出这样一条思路。
1、加锁,申请创建临时节点。
2、创建成功,则加锁成功;完成自己的业务逻辑后,删除此节点,释放锁。
3、创建失败,则证明节点已存在,当前锁被别人持有;注册watcher,监听数据节点的变化。
4、监听到数据节点被删除后,证明锁已被释放。重复步骤1,再次尝试加锁。
1、初始化
在构造方法中,我们先将zookeeper的客户端和锁的节点路径设置一下。
public class ZookeeperLock implements Lock {
Logger logger = LoggerFactory.getLogger(this.getClass());
private String root_path = "/zookeeper";
private String current_path;
private ZooKeeper zooKeeper;
public ZookeeperLock(String lock_name,ZooKeeper zooKeeper){
current_path = root_path+"/"+lock_name;
this.zooKeeper = zooKeeper;
}
}
2、加锁
上面我们说了,加锁就是创建节点的过程。代码也很简单。
public class ZookeeperLock implements Lock {
/**
* 加锁
* 失败后调用waitForRelease方法等待。
*/
public void lock() {
if (tryLock()){
logger.info("获取锁成功!");
}else {
waitForRelease();
}
}
/**
* 尝试加锁
* 创建临时节点,创建成功则加锁成功,返回true
* @return
*/
public boolean tryLock() {
try {
zooKeeper.create(current_path, "1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
logger.info("创建节点成功!");
return true;
} catch (Exception e) {
logger.error("创建节点失败!{}",e.getMessage());
}
return false;
}
}
3、等待锁释放
waitForRelease就是等待锁释放的方法。这里先判断一次锁的数据节点是否还存在,如果不存在,再次调用lock方法尝试加锁;如果存在,则通过countDownLatch来等待,直到触发NodeDeleted事件。
public class ZookeeperLock implements Lock {
/**
* 注册Watcher 监听znode节点删除事件
*/
private void waitForRelease(){
CountDownLatch countDownLatch = new CountDownLatch(1);
Watcher watcher = watchedEvent -> {
Watcher.Event.EventType type = watchedEvent.getType();
//触发NodeDeleted事件,跳过await方法
if (Watcher.Event.EventType.NodeDeleted.equals(type)){
countDownLatch.countDown();
}
};
try {
//判断当前锁的数据节点是否存在
Stat exists = zooKeeper.exists(current_path, watcher);
if (exists==null){
lock();
}else {
countDownLatch.await();
lock();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4、释放锁
我们把锁的数据节点删除之后,就释放了锁。
public class ZookeeperLock implements Lock {
public void unlock() {
try {
zooKeeper.delete(current_path,-1);
} catch (Exception e) {
logger.error("删除节点失败:{}",e.getMessage());
}
logger.info("释放锁成功!");
}
}
5、测试
public class ZkTest1 {
static final Logger logger = LoggerFactory.getLogger(ZkTest1.class);
static final int thread_count = 100;
static int num = 0;
public static void main(String[] args) throws IOException, InterruptedException {
CountDownLatch c1 = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper("192.168.139.131:2181", 99999, watchedEvent -> {
if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
c1.countDown();
}
});
c1.await();
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(thread_count);
CountDownLatch countDownLatch = new CountDownLatch(thread_count);
for (int i=0;i<thread_count;i++){
executorService.execute(() -> {
Lock lock = new ZookeeperLock("lock",zooKeeper);
try {
lock.lock();
num++;
}finally {
if (lock!=null){
lock.unlock();
}
}
countDownLatch.countDown();
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
logger.info("线程数量为:{},统计NUM数值为:{},共耗时:{}",thread_count,num,(end-start));
executorService.shutdown();
}
}
大家可以运行测试一下,重点可以看看输出的日志,来帮助我们理解整个代码流程。这种方式有很大的性能问题,当请求数高了之后,会变得非常非常慢。
15:07:51.713 [main] INFO - 线程数量为:100,统计NUM数值为:100,共耗时:1625
15:07:25.056 [main] INFO - 线程数量为:1000,统计NUM数值为:1000,共耗时:125062
如上,在笔者虚拟机环境中的测试结果。问题出在哪呢?
三、改进版实现
我们说上面的代码有较大的性能问题,事实上造成这种问题的原因,有个专业名词:惊群效应。
惊群问题是计算机科学中,当许多进程等待一个事件,事件发生后这些进程被唤醒,但只有一个进程能获得CPU执行权,其他进程又得被阻塞,这造成了严重的系统上下文切换代价。
就好比,一只凶恶的大灰狼,跑进羊群。虽然一次只会有一只羊被吃掉,但可爱的羊儿们为了保住自己的小命,都会四处奔逃。没有被吃掉的羊儿,继续埋头吃草...直到下一只狼的到来,历史总是惊人的相似。
回到我们上面的代码中,多个线程都会注册Watcher事件,等待锁释放;当触发数据节点删除事件,线程被唤醒,然后争先抢后的尝试加锁。只有一个线程加锁成功,其余的线程继续阻塞,等待唤醒...
我们怎么改进这个问题呢?
1、临时顺序节点
上面我们说zookeeper数据节点分为4个类型,其中有一个就是临时顺序节点。
首先,它是一个临时节点;其次,它的节点名称是有顺序的。比如,我们在/lock节点创建几个临时顺序节点,它看起来是这样的:
[zk: localhost:2181(CONNECTED) 2] create -s -e /lock/test 1234
Created /lock/test0000000230
[zk: localhost:2181(CONNECTED) 3] create -s -e /lock/test 1234
Created /lock/test0000000231
[zk: localhost:2181(CONNECTED) 4] create -s -e /lock/test 1234
Created /lock/test0000000232
[zk: localhost:2181(CONNECTED) 5] create -s -e /lock/test 1234
Created /lock/test0000000233
[zk: localhost:2181(CONNECTED) 6] create -s -e /lock/test 1234
Created /lock/test0000000234
[zk: localhost:2181(CONNECTED) 7] create -s -e /lock/test 1234
Created /lock/test0000000235
[zk: localhost:2181(CONNECTED) 8] ls /lock
[test0000000230, test0000000231, test0000000232, test0000000233, test0000000234, test0000000235]
[zk: localhost:2181(CONNECTED) 9]
可以看到,我们创建的/lock/test节点,都被加上了一个自增的序号。比如test0000000230、test0000000231
这样。这个自增序号是zookeeper内部机制保证的,我们暂且不用管它怎么生成。
2、实现原理
基于zookeeper临时顺序节点的特性,针对惊群效应,业界又改进了一种实现方法。它的思路是这样的:
1、加锁,在/lock锁节点下创建临时顺序节点并返回,比如:test0000000235
2、获取/lock节点下全部子节点,并排序。
3、判断当前线程创建的节点,是否在全部子节点中顺序最小。
4、如果是,则代表获取锁成功;完成自己的业务逻辑后释放锁。
5、如果不是最小,找到比自己小一位的节点,比如test0000000234;对它进行监听。
6、当上一个节点删除后,证明前面的客户端已释放锁;然后尝试去加锁,重复以上步骤。
3、实现
初始化
public class ZkClientLock implements Lock {
private Logger logger = LoggerFactory.getLogger(ZkClientLock.class);
private String lock_path = "/lock";
private ZkClient client;
private CountDownLatch countDownLatch;
private String beforePath;// 当前请求的节点前一个节点
private String currentPath;// 当前请求的节点
public ZkClientLock(ZkClient client){
this.client = client;
}
}
加锁
我们看加锁的过程,主要是判断自己是否处在全部子节点的第一位,是的话加锁成功;否则找到自己的上一个节点,调用waitForRelease方法等待锁释放。
public class ZkClientLock implements Lock {
/**
* 加锁
* 如未成功获取锁,则等待锁释放后再次尝试加锁
*/
public void lock() {
if (tryLock()){
logger.info("获取锁成功");
}else {
waitForRelease();
lock();
}
}
/**
* 当前节点排在全部子节点的第一位,则加锁成功
* 否则找出前一个节点
* @return
*/
public boolean tryLock() {
if (currentPath==null || currentPath.length()==0){
currentPath = client.createEphemeralSequential(lock_path+"/","lock");
logger.info("当前锁路径为:{}",currentPath);
}
//获取全部子节点并排序
List<String> children = client.getChildren(lock_path);
Collections.sort(children);
//当前节点如果排在第一位,返回成功
if (currentPath.equals(lock_path+"/"+children.get(0))){
return true;
}else {
//找出上一个节点
String sequenceNodeName = currentPath.substring(lock_path.length()+1);
int i = Collections.binarySearch(children, sequenceNodeName);
beforePath = lock_path + '/' + children.get(i - 1);
}
return false;
}
}
等待锁释放
这里就是对上一个节点进行监听,节点被删除后,方法返回。
public class ZkClientLock implements Lock {
/**
* 等待锁释放
* 对上一个节点进行监听,节点删除后返回
*/
private void waitForRelease(){
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {}
};
this.client.subscribeDataChanges(beforePath, listener);
if (this.client.exists(beforePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath, listener);
}
}
释放锁
public void unlock() {
client.delete(currentPath);
logger.info("释放锁成功");
}
这种方式改进之后,性能会得到大幅度提升。同样的测试方法,得到结果如下:
15:19:19.327 [main] INFO - 线程数量为:100,统计NUM数值为:100,共耗时:636
15:18:58.728 [main] INFO - 线程数量为:1000,统计NUM数值为:1000,共耗时:5398
四、Curator
我们上面写的两个锁实现,并不能用于生产环境中。还需要考虑很多细节才行,比如锁的可重入性、锁超时。
Curator是什么,想必不用再介绍。如果在生产环境中,使用到zookeeper分布式锁,笔者推荐使用开源组件,除非自己写的比人家的要好,哈哈。
首先,引入它的Maven坐标。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
1、用法
Curator帮我们封装了zookeeper分布式锁的实现逻辑,用起来非常简单。
首先,连接到zookeeper客户端。然后创建一个互斥锁的实例,调用即可。
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
"192.168.139.131:2181",
999999,
3000,
retryPolicy);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/lock");
//加锁
lock.acquire();
//解锁
lock.release();
}
用同样的测试方法,得到结果如下:
15:43:34.306 [main] INFO - 线程数量为:100,统计NUM数值为:100,共耗时:606
15:43:02.427 [main] INFO - 线程数量为:1000,统计NUM数值为:1000,共耗时:6785
2、InterProcessMutex
我们先看下InterProcessMutex类有哪些属性。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
//内部锁的实例对象
private final LockInternals internals;
//锁的基本路径
private final String basePath;
//线程和锁的映射
private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
//锁的名称前缀
private static final String LOCK_NAME = "lock-";
}
这里的重点是threadData,它是一个ConcurrentMap对象,保存着当前线程和锁的映射关系,是实现可重入锁的基础。我们再看下LockData这个类。
private static class LockData {
//当前线程
final Thread owningThread;
//当前锁的节点
final String lockPath;
//锁的次数
final AtomicInteger lockCount;
private LockData(Thread owningThread, String lockPath) {
this.lockCount = new AtomicInteger(1);
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
接着再看InterProcessMutex的构造方法。
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
StandardLockInternalsDriver是锁的驱动类,它主要就是创建锁的节点和判断当前节点是不是处于第1 位。接着看,就是初始化一些属性。
InterProcessMutex(CuratorFramework client, String path, String lockName,
int maxLeases, LockInternalsDriver driver) {
this.threadData = Maps.newConcurrentMap();
this.basePath = PathUtils.validatePath(path);
this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
3、加锁
加锁的方式有两种,有超时时间的和不带超时时间的。
lock.acquire();
lock.acquire(5000, TimeUnit.SECONDS);
不过没关系, 它们都会调用到internalLock
方法。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
//加锁
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//当前线程
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = this.threadData.get(currentThread);
//当前线程有锁的信息,锁次数加1,返回
if (lockData != null) {
lockData.lockCount.incrementAndGet();
return true;
} else {
//尝试加锁,返回当前锁的节点路径
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
if (lockPath != null) {
//将当前线程和锁的关系缓存到lockData
InterProcessMutex.LockData newLockData =
new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
}
以上代码我们分为两部分来看。
- 获取当前线程,从threadData缓存中获取锁的相关数据lockData。
- 如果,lockData不为空,说明当前线程是一个重入锁,则锁次数加1,返回。
- lockData为空就尝试去加锁。返回当前锁的节点路径,创建LockData 实例,放入到threadData缓存中。
然后我们接着看attemptLock
方法,它是如何尝试加锁的。
public class LockInternals {
//尝试加锁
//如果加锁成功,则返回当前锁的节点路径
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//方法执行开始时间
long startMillis = System.currentTimeMillis();
//锁超时时间
Long millisToWait = unit != null ? unit.toMillis(time) : null;
//锁节点数据
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
//是否已经持有锁
boolean hasTheLock = false;
boolean isDone = false;
while(!isDone) {
isDone = true;
try {
//创建锁
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
//加锁
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().
allowRetry(retryCount++, System.currentTimeMillis() - startMillis,
RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}
isDone = false;
}
}
return hasTheLock ? ourPath : null;
}
}
以上代码看着多,其实也不复杂,如果已经持有锁,就返回锁的节点路径。我们重点看两个地方。
- 创建锁
创建锁就是在zookeeper中创建临时顺序节点,返回锁的节点名称。
public class StandardLockInternalsDriver implements LockInternalsDriver {
//创建锁
public String createsTheLock(CuratorFramework client,
String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
if (lockNodeBytes != null) {
//lockNodeBytes默认为空
} else {
ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().
creatingParentContainersIfNeeded().
withProtection().
withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).
forPath(path);
}
return ourPath;
}
}
创建后锁的节点路径 = UUID + lock + zookeeper自增序列。它看起来是这样的:
_c_d5815293-f5b1-484d-b789-9f11df69149c-lock-0000006168
- 循环加锁
创建锁的节点之后,不断的循环去加锁,直到加锁成功或者锁超时退出。
public class LockInternals {
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
//是否已经持有锁
boolean haveTheLock = false;
boolean doDelete = false;
try {
if (this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
//如果没有持有锁,就一直循环
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
//获取所有子节点并排序
List<String> children = this.getSortedChildren();
//获取当前锁的自增序列
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
//判断当前锁是否处于第1位
PredicateResults predicateResults = this.driver.getsTheLock(this.client,
children, sequenceNodeName, this.maxLeases);
//处于第1位,返回true
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
//获取上一个序列路径
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
//对上一个序列路径进行监听,并等待
((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
if (millisToWait == null) {
this.wait();
} else {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
this.wait(millisToWait);
} else {
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if (doDelete) {
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
}
以上代码比较长,但逻辑比较清晰。我们重点看while循环内的代码。
- 获取所有子节点并排序。
- 获取当前锁的节点名称,判断是否处于全部子节点的第1位。
- 如果是,则返回true,退出循环。
- 如果不是,则对上一个序列节点进行监听,并等待。
- 如果没有锁超时时间,则一直等待;反之等待millisToWait时间后,退出循环,并删除当前锁的节点,返回false。
4、解锁
既然是可重入锁,解锁的时候必然先判断锁的重入次数。当次数为0时,删除zookeeper中的锁节点信息等。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
//解锁
public void release() throws Exception {
//获取当前线程的锁相关信息lockData
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = this.threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
} else {
//锁次数递减1
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount <= 0) {
if (newLockCount < 0) {
throw new IllegalMonitorStateException("....");
} else {
try {
//移除Watch、删除锁节点、移除线程和锁的映射关系
this.client.removeWatchers();
this.revocable.set((Object)null);
this.deleteOurPath(lockPath);
} finally {
this.threadData.remove(currentThread);
}
}
}
}
}
}
至此,Curator中关于互斥锁的实现逻辑我们已经分析完了。可以看到,它的实现原理跟我们自己改进后的代码实现基本上是相同的。不过,多了锁的可重入和锁超时。