- zkClient:开源的zk客户端,在原生API基础上封装,是一个更易于使用的zookeeper客户端
- 创建会话(同步,重试)
- 创建节点(同步,递归创建)
- 删除节点(同步,递归删除)
- 代码示例
import net.lc7.model.User; import net.lc7.util.ZkPropertiesUtil; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.util.List; /** * @Description: * @Author: Jason.zhu * @Create: 2019/05/22 17:37 */ public class ZkClientMain { private static ZkClient zkClient; public static void main(String[] args) throws Exception{ String ip_port = ZkPropertiesUtil.getZkServerIp(); int sessionTimeout = 10000; int connetionTimeout = ZkPropertiesUtil.getZktimeout(); zkClient = new ZkClient(ip_port,sessionTimeout,connetionTimeout,new SerializableSerializer()); updateNodeData(); } /** * 新建节点 */ private static void createNode(){ User user = User.builder().age(18).id(1).name("jason").build(); String node = zkClient.create("/user",user,CreateMode.PERSISTENT); System.out.println(node); } //获取节点 private static void getNodeData(){ Stat stat = new Stat(); User user1 = zkClient.readData("/user", stat); System.out.println("name : " + user1.getName() ); System.out.println("stat : " + stat); } /** * 创建子节点 */ private static void createChildrenNode(){ String node = zkClient.create("/user/1","user1",CreateMode.PERSISTENT); System.out.println(node); node = zkClient.create("/user/2","user2",CreateMode.PERSISTENT); System.out.println(node); node = zkClient.create("/user/3","user3",CreateMode.PERSISTENT); System.out.println(node); } /** * 获取子节点 */ private static void getChildNode(){ String node = "/user"; boolean exist = zkClient.exists(node); if(exist){ List<String> childNodes = zkClient.getChildren(node); childNodes.stream().forEach(System.out::println); } } private static void delNode(){ String node = "/user"; boolean exist = zkClient.exists(node); if(exist){ // zkClient.delete(node);//删除当前节点,有子节点无法删除 zkClient.deleteRecursive(node);//删除当前节点,有子节点删除子节点 } } /** * 更新节点数据 */ private static void updateNodeData(){ User user = User.builder().age(11).name("wangwu").id(12).build(); String node = "/user"; zkClient.writeData(node, user); } /** * 监控子节点 */ private static void watchChildChange() throws InterruptedException { String node = "/user"; zkClient.subscribeChildChanges(node, (str,strs) -> { System.out.println(str); System.out.println(strs); }); Thread.sleep(Long.MAX_VALUE); } /** *监控节点 */ private static void watchNode(){ String node ="/user"; zkClient.subscribeDataChanges(node, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { System.out.println("node name : " + s + ",updated !!"); User user = (User)o; System.out.println("node new value : " + user); } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("node name : " + s + ", deleted !!"); } }); try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } } import lombok.Builder; import lombok.Data; import java.io.Serializable; @Data @Builder public class User implements Serializable { private int id; private int age; private String name; @Override public String toString(){ return "id=" + id + ", age="+age + ",name="+name; } }
- curator:开源的zk客户端,在原生API基础上封装,apache顶级项目
- Curator采用Fluent风格API
- Curator对zk进行基本操作代码示例:
import net.lc7.util.ZkPropertiesUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; /** * @Description: Curator对zk进行基本操作,Curator采用流式风格API * @Author: Jason.zhu * @Create: 2019/05/24 17:55 */ public class CuratorClient { private static CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient( ZkPropertiesUtil.getZkServerIp(), new RetryNTimes(10, 5000)); private static String path = "/curator_node"; public static void main(String[] args) throws Exception { curatorFramework.start(); //create node String data = "curator_data"; curatorFramework.create().creatingParentsIfNeeded().forPath(path,data.getBytes()); //get Node and Data print("ls", "/"); print(curatorFramework.getChildren().forPath("/")); print("get", path); print(curatorFramework.getData().forPath(path)); //update node data String dataNew = "curator_data_new"; print("set", path, dataNew); curatorFramework.setData().forPath(path, dataNew.getBytes()); print("get", path); print(curatorFramework.getData().forPath(path)); //remove node print("delete", path); curatorFramework.delete().forPath(path); print("ls", "/"); print(curatorFramework.getChildren().forPath("/")); curatorFramework.close(); } private static void print(String... cmds) { StringBuilder text = new StringBuilder("$ "); for (String cmd : cmds) { text.append(cmd).append(" "); } System.out.println(text.toString()); } private static void print(Object result) { System.out.println( result instanceof byte[] ? new String((byte[]) result) : result); } } public class ZkPropertiesUtil { private static Properties zkProperties = new Properties(); static { InputStream is = ZkPropertiesUtil.class.getResourceAsStream("/zk.properties"); try { zkProperties.load(is); } catch (Exception e) { e.printStackTrace(); }finally { try { if (is != null) { is.close(); } } catch (IOException e) { e.printStackTrace(); } } } public static String getZkServerIp(){ return zkProperties.getProperty("zk.zkServerIps"); } public static int getZktimeout(){ return Integer.valueOf(zkProperties.getProperty("zk.timeout")); } public static void main(String[] args) { String source = "0101888"; System.out.println(source.substring(0, source.length() - 2)); } }
- Curator操作zk实现分布式锁
import net.lc7.util.ZkPropertiesUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMultiLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; /** * @Description: Curator操作zk实现分布式锁 * @Author: Jason.zhu * @Create: 2019/05/27 11:28 */ public class CuratorDistributeLock { private static String lockPath = "/lockPath"; public static void main(String[] args) throws Exception{ CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp(), new RetryNTimes(10,5000)); curatorFramework.start(); System.out.println("zk client start successfully!!"); Thread.sleep(1000L); Thread t1 = new Thread(() -> { doWithLock(curatorFramework); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(curatorFramework); }, "t2"); t1.start(); t2.start(); // curatorFramework.close(); } private static void doWithLock(CuratorFramework curatorFramework){ System.out.println("Client state : " + curatorFramework.getState()); InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); try { if(lock.acquire(10*1000, TimeUnit.SECONDS)){ System.out.println(Thread.currentThread().getName() + "hold lock!!"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + "release lock!!"); } }catch (Exception e){ e.printStackTrace(); }finally { try { lock.release(); }catch (Exception e){ e.printStackTrace(); } } } }
- Curator操作zk实现Leader选举
import net.lc7.util.ZkPropertiesUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; /** * @Description: Leader选举 * 当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。 * Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。 * 注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。 * autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。 * @Author: Jason.zhu * @Create: 2019/05/27 11:49 */ public class CuratorLeaderClient { private static String path = "/ensurePath"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership !!"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership !!"); } @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { System.out.println(connectionState.name() + " state changed !! " + connectionState.isConnected()); } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } public static void registerListener(LeaderSelectorListener listener){ CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp() ,new RetryNTimes(10, 5000)); curatorFramework.start(); //ensure path try { new EnsurePath(path).ensure(curatorFramework.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } //register listener LeaderSelector selector = new LeaderSelector(curatorFramework, path, listener); selector.autoRequeue(); selector.start(); } }
- Curator操作ZK实现监听功能
import net.lc7.util.ZkPropertiesUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.retry.RetryNTimes; /** * @Description: Curator实现监听功能 * * Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。 * Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。 * Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。 * * @Author: Jason.zhu * @Create: 2019/05/27 11:10 */ public class CuratorWatchClient { private static String watcherNode = "/watcherNode"; public static void main(String[] args) throws Exception { CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory.newClient( ZkPropertiesUtil.getZkServerIp(), new RetryNTimes(10, 5000)); curatorFrameworkClient.start(); System.out.println("zk client start successfully!!"); //register watcher PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFrameworkClient, watcherNode, true); pathChildrenCache.getListenable().addListener((client, event) -> { ChildData data = event.getData(); if(null == data){ System.out.println("no data in event [" + event +"]"); }else { System.out.println( "Receive event :" + "type=["+ event.getType() +"], " + "path=[" + data.getPath() + "]," + "data=["+ data.getData() +"]," + "state=[" + data.getStat() + "]" ); } }); pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); System.out.println(" Register zookeeper watcher successfully!! Please operate in terminal to show the listener function"); Thread.sleep(Integer.MAX_VALUE); } }
ZooKeeper(四)ZooKeeper客户端Java zkClient和Curator
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...