分布式锁的三种实现方式
基于数据库
- 新建一张表,每次insert 一条记录,利用唯一约束,释放锁删除此记录即可。
- for update 利用行级锁。
强依赖数据库,一但数据库不可用则系统不可用。
一单获取锁失败,则直接返回失败,线程不会进入等待队列。
Redis
推荐redission,提供丰富的工具类,支持LUA脚本,支持spring框架等等(太多,大家可以度娘下)。
我见过很多的应用中都是一种写法
jedis.set(key, value, "NX", "PX", expireTime);
这种写法有什么问题?当多个线程同时获取锁失败时,未获取到锁的线程依然不能进入等待队列,直接返回失败,很多童鞋使用了强大的武器for循环,而在redission中的lock就利用了redis的订阅功能实现的线程的等待和通知,有兴趣可以参考https://redisson.org/
zookeeper
今天这里主要分析zookeeper的实现方式和细节,以帮助大家在应用、以及在面试过程当中能够很好的理解和回答分布式锁的实现过程。
- DEMO
package com.jyly.mydubbo.zk;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
/**
* @author 咖啡爷爷
*
*/
public class Lock {
// 根节点目录
private String ROOT_LOCK = "/locks";
CountDownLatch countDownLatch = null;
ZkClient zkClient = null;
final LockContext context = new LockContext();
TreeSet<String> treeSet = null;
public static String lockKey = "node";
public Lock() {
// init zkclient
zkClient = new ZkClient("xxx.xxx.xxx.xx:2181");
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
Lock lock = new Lock();
try {
if (lock.tryLock(lockKey)) {
System.out.println(Thread.currentThread().getName() + ">>>>获取到锁");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + ">>>>释放锁");
}
}
}).start();
}
}
/**
* 获取分布式锁
* @param key
* @return
*/
public boolean tryLock(String key) {
try {
if (!zkClient.exists(ROOT_LOCK)) {
zkClient.createPersistent(ROOT_LOCK);
}
String seq = zkClient.createEphemeralSequential(ROOT_LOCK.concat("/").concat(key), null);
context.set(seq.substring(seq.lastIndexOf("/") + 1, seq.length()));
if (isMinNode()) {
return true;
} else {
addListenPreNode();
countDownLatch = new CountDownLatch(1);
for(;;) {
countDownLatch.await();
if(isMinNode()) {
return true;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 判断当前是否是最小节点
private boolean isMinNode() {
treeSet = new TreeSet<>();
for (String children : zkClient.getChildren(ROOT_LOCK)) {
treeSet.add(children);
}
String minNode = treeSet.first();
if (context.get().equals(minNode)) {
return true;
}
return false;
}
/**
* 添加监听前驱节点
*/
private void addListenPreNode() {
zkClient.subscribeDataChanges(ROOT_LOCK.concat("/").concat(treeSet.lower(context.get())), new IZkDataListener() {
public void handleDataChange(String arg0, Object arg1) throws Exception {
}
public void handleDataDeleted(String arg0) throws Exception {
countDownLatch.countDown();
}
});
}
/**
* 解锁释放节点
*/
public void unlock() {
zkClient.delete(ROOT_LOCK.concat("/").concat(context.get()));
}
}
class LockContext {
ThreadLocal<String> lockContext = new ThreadLocal<>();
public String get() {
return lockContext.get();
}
public void set(String seq) {
lockContext.set(seq);
}
}
-
zk里面的目录结构