定义锁根路径:/locks ,过期时间
获得锁:
1先创建父节点locks,当前线程创建临时顺序节点(/locks/non000001)
2 获取/locks下所有孩子节点(get /locks)并排序,最小的节点排最前面
3 如果子节点为空说明会话断开了,节点被删除,返回获取锁失败
4 如果当前节点=最小节点,则返回获取锁成功,并新建一个线程来判定锁过期。
5 如果当前节点不是最小节点,则设置监听比自己次小节点的删除事件,然后挂起当前线程。(公平锁),如果要实现非公平锁,则设置监听最小节点。
6 当最小编号的线程获取锁,处理完业务则删除自己对应的zk节点,删除后会激活比自己大一号的节点线程从阻塞变为运行,被激活的线程是当前节点最小的了,然后就可以获取到锁。
代码实现:
package geektime.spring.springbucks.waiter;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author liangxianliang
* @create 2020-01-09 21:54
*/
public class ZookeeperLockTest {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperLockTest.class);
private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<ZooKeeper> ZK_THREAD_LOCAL = new ThreadLocal<>();
/** 锁的根路径 **/
private static final String LOCK_ROOT_PATH = "/locks";
/** 锁后缀 **/
private static final String LOCK_SUFFIX = "_NO_";
/** 创建根节点同步锁 **/
private static final String CREATE_ROOT_LOCK = "LOCK";
/** 公平锁 **/
private static final boolean LOCK_FAIR = false;
private final static byte[] BUF = new byte[0];
static int LOCK_EXPIRES = 10;
static int LOCK_WAITTIME = 10 ;
/**
* 获得所有
* @param key 路径不含‘/’
* @return boolean
*/
public static boolean tryLock(String key) {
return tryLock(key, LOCK_EXPIRES, LOCK_WAITTIME);
}
/**
* 获得锁
* @param key 键/路径
* @param expire 过期时间
* @param wait 等待时间
* @return boolean
*/
public static boolean tryLock(String key, long expire, long wait) {
ZooKeeper zooKeeper = getZooKeeper();
ZK_THREAD_LOCAL.set(zooKeeper);
return tryLock(zooKeeper, key, expire, wait);
}
/**
* 获得锁
* @param zooKeeper zk连接
* @param key 路径
* @param expire 过期时间
* @param wait 等待时间
* @return boolean
*/
private static boolean tryLock(ZooKeeper zooKeeper, String key, long expire, long wait) {
expire = expire * 1000;
wait = wait * 1000;
final String currNode;
String path = LOCK_ROOT_PATH + "/" + key + LOCK_SUFFIX;
try {
currNode = zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//步骤一
List<String> nodes = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
//过滤掉集合中不是当前业务的临时节点
nodes = nodes.stream().filter(o -> o.startsWith(key)).collect(Collectors.toList());
nodes.sort(String::compareTo);
//如果集合为空说明当前创建节点的session在步骤一处已经断开,并且创建的节点已经被zk服务器删除, 此种情况比较极端
if (nodes.size() == 0) {
return false;
}
//最小的节点就是自己创建的节点表示拿到锁
if (currNode.endsWith(nodes.get(0))) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//没有拿到锁
CountDownLatch countDownLatch = new CountDownLatch(1);
//非公平锁
if(LOCK_FAIR){
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
delPath(zooKeeper);
//等待锁超时
if(!countDownLatch.await(wait, TimeUnit.MILLISECONDS)){
return tryLock(zooKeeper, key, expire, wait);
}
}
}
}else{
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//当前节点的前一个节点
if (currNode.endsWith(nodes.get(i + 1))) {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
// 等待锁超时,如果是公平锁,等待时间是默认等待时间的2倍,防止因为拿锁的线程处理业务时间太久
// 导致当前线程等待超时
if(!countDownLatch.await(wait * 2, TimeUnit.MILLISECONDS)){
delPath(zooKeeper);
return false;
}
return true;
}
}
}
}
} catch (KeeperException | InterruptedException e) {
LOG.error("create '{}' node fail.", key, e);
}
return false;
}
/**
* 释放锁
*/
public static void unLock() {
ZooKeeper zooKeeper = ZK_THREAD_LOCAL.get();
delPath(zooKeeper);
close(ZK_THREAD_LOCAL.get());
THREAD_LOCAL.remove();
ZK_THREAD_LOCAL.remove();
}
/**
* 创建分布式锁的根路径
*/
private static void createLockRootPath() {
ZooKeeper zooKeeper = getZooKeeper();
try {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
synchronized (CREATE_ROOT_LOCK) {
stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
LOG.info("create lock root path '{}'", LOCK_ROOT_PATH);
zooKeeper.create(LOCK_ROOT_PATH, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String path = LOCK_ROOT_PATH + "/key" + LOCK_SUFFIX;
Stat stats = zooKeeper.exists(path, false);
zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获得zooke会话连接
* @return org.apache.zookeeper.ZooKeeper
*/
private static ZooKeeper getZooKeeper() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
final ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, null);
zooKeeper.register((watchedEvent) -> {
switch (watchedEvent.getState()) {
case Expired:
close(zooKeeper);
break;
case SyncConnected:
countDownLatch.countDown();
default:
}
});
if(!countDownLatch.await(3000, TimeUnit.MILLISECONDS)){
close(zooKeeper);
throw new RuntimeException("wait for creating zookeeper connection timeout, timeout is [3000]");
}
return zooKeeper;
} catch (Exception e) {
throw new RuntimeException("create Zookeeper instance fail.", e);
}
}
/**
* 启动一个线程来判断锁的过期时间,方式业务假死,zk不断开导致死锁
* @param zooKeeper zk连接
* @param currNode 当前节点
*/
private static void runExpireThread(final ZooKeeper zooKeeper, String currNode, long expire){
THREAD_LOCAL.set(currNode);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
try {
Thread.sleep(expire * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOG.info("等待了{}秒, 主动结束.", expire);
delPath(zooKeeper);
});
}
/**
* 删除创建的路径
* @param zooKeeper zk连接
*/
private static void delPath(ZooKeeper zooKeeper) {
try {
//无论节点是否存在,直接执行删除操作
zooKeeper.delete(THREAD_LOCAL.get(), -1);
} catch (Exception e){
LOG.error("lock expire, delete lock");
}
}
/**
* 断开连接
* @param zooKeeper zk连接
*/
private static void close(ZooKeeper zooKeeper) {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 监听节点删除事件
*/
static class LockWatcher implements Watcher {
private CountDownLatch latch;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted){
latch.countDown();
}
}
}
public static void main(String[] args) throws Exception {
//createLockRootPath();
int count = 30;
ExecutorService executorService = Executors.newFixedThreadPool(count);
for( int i = 0; i < count; i++) {
executorService.execute(() -> {
try {
if(ZookeeperLockTest.tryLock("lock")){
System.out.println("我拿到了锁");
System.out.println(Thread.currentThread().getName() + ": 我要等待6秒,测试其他超时是否有效.");
Thread.sleep(1000);
System.out.println("我的等待结束了,其他人可以过来拿锁了.");
}else{
System.out.println("没有拿到锁");
}
} catch (Exception e) {
e.printStackTrace();
}finally{
ZookeeperLockTest.unLock();
}
});
}
executorService.shutdown();
}
}