本地使用Zookeeper伪集群模式,同步锁控制多个进程的同步状态。
1、获取锁
实现方式:
1.各个进程创建一个EPHEMERAL_SEQUENTIAL目录节点。
2.调用getChildren方法获取当前的目录节点列表中最小的目录节点,
如果是自己创建的,那么它就获得了这个锁;
如果不是自己创建的调用exists();并监控 Zookeeper 上目录节点列表的变化,
一直到自己创建的节点是列表中最小编号的目录节点就可以获得锁。
2、释放锁
实现方式:
1.获取到锁的Server删除自己所创建的目录节点。
3、代码示例
public class MainTest implements Runnable {
private static ZooKeeper zooKeeper;
private static String root = "/xmy";
private static String tempZnode = "/lock";
private static String myZnode = null;
private static String hostPort = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(hostPort, 5000, null);
//创建一个EPHEMERAL_SEQUENTIAL目录节点
myZnode = zooKeeper.create(root + tempZnode, "value".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("----------创建的目录名为:" + myZnode);
MainTest mainTest = new MainTest();
mainTest.tryLock();
}
/**
* 获取锁
*/
public void tryLock() throws Exception {
System.out.println("----------抢锁开始----------");
List<String> list = zooKeeper.getChildren(root, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if (myZnode.equals(root + "/" + nodes[0])) {
System.out.println("----------我抢到了锁----------");
Thread.sleep(60 * 1000);
System.out.println("----------执行1分钟任务结束----------");
doReleaseShared();
} else {
waitForLock(nodes[0]);
}
}
void waitForLock(String lower) throws Exception {
System.out.println("----------未获取到锁----------");
//监听
Watcher childrenWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("----------收到的消息: " + event.toString());
if (event.getType() == Event.EventType.NodeDeleted) {
try {
synchronized (this) {
notifyAll();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
};
}
/**
* 释放锁
*/
public void doReleaseShared() throws Exception {
System.out.println("----------我释放了锁----------");
zooKeeper.delete(myZnode, -1);
}
@Override
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public synchronized void close() {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}