对zk 操作的方式有一下几种:
- 基于java的 shell命令,zkCli.sh
- 原生的java api
- zkClient
- Curator
- spring cloud zookeeper
原生API
代码展示
- ZKConstant 常量
public class ZKConstant {
public static final String CONNET_STR = "192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181";
public static final int SESSION_TIMEOUT = 5000;
}
- CreateSession
public class CreateSession_API {
private static ZooKeeper zk1;
private static CountDownLatch connectSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// createSession();
createSessionWithSID();
}
public static void createSession() throws Exception {
//Zookeeper是API提供的1个类,我们连接zk集群,进行相应的znode操作,都是通过ZooKeeper的实例进行,这个实例就是zk client,和命令行客户端是同样的角色
//Zookeeper实例的创建需要传递3个参数
//connectString 代表要连接zk集群服务,通过逗号分隔
// 注册watcher事件
zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent watchedEvent) {
// 这个方法只会调用一次,在这个session建立完成调用
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectSemaphore.countDown();
System.out.println("event:" + watchedEvent);
System.out.println("receive session established.");
}
}
});
System.out.println(zk1.getState());
connectSemaphore.await();
System.out.println("zk session established");
}
// 重复使用上次session, 利用sessionId和passwd
public static void createSessionWithSID() throws Exception {
zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
new MyWatcher());
connectSemaphore.await();
long sessionId = zk1.getSessionId();
byte[] passwd = zk1.getSessionPasswd();
zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
new MyWatcher(),
1l, "test".getBytes());
zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
new MyWatcher(),
sessionId,
passwd);
Thread.sleep(Integer.MAX_VALUE);
}
static class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
// 只注册一次
System.out.println("receive watched event:" + watchedEvent);
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
connectSemaphore.countDown();
}
}
}
}
- CreateNode
public class CreateNode_API {
private static ZooKeeper zk1;
private static CountDownLatch connectSemaphore = new CountDownLatch(1); // 同步计数器
public static void main(String[] args) throws Exception {
// createNodeASync();
createNodeSync();
}
public static void createNodeSync() throws Exception {
ZooKeeper zookeeper = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectSemaphore.await();
// 创建临时节点
String path1 = zookeeper.create("/zk-test-ephemeral-",
"".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
System.out.println("Success create znode: " + path1);
String path2 = zookeeper.create("/zk-test-ephemeral-",
"".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode: " + path2);
}
public static void createNodeASync() throws Exception {
ZooKeeper zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
new MyWatcher());
connectSemaphore.await();
// 创建临时节点
zk1.create("/zk-test-eph-", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "I am context1.");
zk1.create("/zk-test-eph-", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "i am context2");
// 创建临时有序节点
zk1.create("/zk-test-eph-", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new IStringCallback(), "i am context3");
Thread.sleep(Integer.MAX_VALUE);
}
static class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
// 建立连接成功回调
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectSemaphore.countDown();
}
}
}
// 创建节点成功回调
static class IStringCallback implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("create path result: [" + rc + "," + path + "," + ctx + ", real path name:" + name);
}
}
}
- DeleteNode
public class DeleteNode_API {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
deleteNodeSync();
}
public static void deleteNodeSync() throws Exception {
String path = "/zk_book";
zk = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
new DeleteWatcher());
connectedSemaphore.await();
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.delete(path, -1);
Thread.sleep(Integer.MAX_VALUE);
}
static class DeleteWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected == watchedEvent.getState() && watchedEvent.getPath() == null) {
connectedSemaphore.countDown();
}
}
}
}
- ExistsNode
public class ExistsNode_API {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectedSemaphore.await();
// 对path 路劲进行监听
zk.exists(path, true);
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData(path, "123".getBytes(), -1);
zk.create(path+"/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData(path + "/c1", "000".getBytes(), -1);
zk.delete(path + "/c1", -1);
zk.delete(path, -1);
Thread.sleep(Integer.MAX_VALUE);
}
static class MyWatcher implements Watcher {
public void process(WatchedEvent watchedEvent) {
try {
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
connectedSemaphore.countDown();
} else if (Event.EventType.NodeCreated == watchedEvent.getType()) {
System.out.println("node (" + watchedEvent.getPath() + ") created ");
zk.exists(watchedEvent.getPath(), true);
} else if (Event.EventType.NodeDeleted == watchedEvent.getType()) {
System.out.println("node (" + watchedEvent.getPath() + ") deleted ");
zk.exists(watchedEvent.getPath(), true);
} else if (Event.EventType.NodeDataChanged == watchedEvent.getType()) {
System.out.println("node (" + watchedEvent.getPath() + ") dataChanged");
zk.exists(watchedEvent.getPath(), true);
}
}
} catch (Exception e) {
}
}
}
}
- GetData
public class GetData_API {
private static ZooKeeper zk1;
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static Stat stat = new Stat();
static String path = "/zk-book";
public static void main(String[] args) throws Exception {
sync_setData();
}
public static void sync_setData() throws Exception {
String path = "/zk-book";
zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectedSemaphore.await();
Stat stat = zk1.setData(path, "haha".getBytes(), -1);
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," +
stat.getVersion());
Stat stat2 = zk1.setData(path, "haha".getBytes(), -1);
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," +
stat.getVersion());
try {
// 指定version, 需要正确的version才可以通过
zk1.setData(path, "456".getBytes(), stat.getVersion());
} catch (KeeperException e) {
System.out.println("Error: " + e.code() + "," + e.getMessage());
}
Thread.sleep(Integer.MAX_VALUE);
}
public static void async_setData() throws Exception {
String path = "/zk-book";
zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectedSemaphore.await();
zk1.setData(path, "456".getBytes(), -1, new AsyncCallback.StatCallback() {
public void processResult(int i, String s, Object o, Stat stat) {
if (i == 0) {
System.out.println("SUCCESS");
}
}
}, null);
Thread.sleep(Integer.MAX_VALUE);
}
public static void sync_getChildren() throws Exception {
String path = "/zk-book";
zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectedSemaphore.await();
// zk1.delete(path+"/c1", 0);
zk1.delete(path, 0);
zk1.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk1.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
List<String> childrenList = zk1.getChildren(path, true);
System.out.println(childrenList);
zk1.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);
}
public static void async_getChildren() throws Exception {
String path = "/zk-book";
zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectedSemaphore.await();
zk1.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk1.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// 只会响应一次
zk1.getChildren(path, true, new AsyncCallback.Children2Callback() {
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path
+ ", ctx: " + ctx + ", children list: " + children + ", stat: " + stat);
}
}, null);
zk1.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);
}
public static void sync_getData() throws Exception {
String path = "/zk-book";
zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectedSemaphore.await();
zk1.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(new String(zk1.getData(path, true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
zk1.setData(path, "456".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
public static void async_getData() throws Exception {
zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
ZKConstant.SESSION_TIMEOUT, //
new MyWatcher());
connectedSemaphore.await();
zk1.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk1.getData(path, true, new IDataCallback(), null);
zk1.setData(path, "456".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
static class MyWatcher implements Watcher {
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
connectedSemaphore.countDown();
} else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
try {
zk1.getData(watchedEvent.getPath(), true, new IDataCallback(), null);
} catch (Exception e) {
}
}
}
}
}
static class IDataCallback implements AsyncCallback.DataCallback {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println(rc + ", " + path + ", " + new String(data));
System.out.println("--" + stat.getCzxid() + "," +
stat.getMzxid() + "," +
stat.getVersion());
}
}
}
PS: 若你觉得可以、还行、过得去、甚至不太差的话,可以“关注”或者“点赞”一下,就此谢过!