利用zookeeper实现分布式队列

一、zookeeper介绍

        zookeeper是源代码开放的分布式协调服务,由雅虎创建,是Google的开源实现。zookeeper是一个高性能的分布式数据一致性解决方案,它将那些复杂的、容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并提供一系列简单易用的接口给用户使用。

        其本身提供了一致性保证,特点如下:

        (1)顺序一致性:客户端的更新顺序与它们被发送的顺序一致。

        (2)原子性:更新操作要么成功,要么失败,没有第三种结果。

        (3)单系统镜像:无论客户端连接到哪一个服务器,他都将看到相同的zookeeper视图。

        (4)可靠性:一旦一个更新操作被应用,那么在客户端再次更新之前,其值不会再改变。

二、应用场景

        zookeeper可以应用于很多的分布式服务场景,包括:集群管理,master选举,发布/订阅,分布式锁,分布式队列,分布式命名服务,服务注册于发现,负载均衡等等。下面一个例子介绍zookeeper如何实现分布式队列。

三、zookeeper分布式队列实现

        zookeeper分布式队列的实现完成以下几个要素:

        (1)、数据入队,在一个节点下创建有序子节点,节点中设置需要入队的数据,完成数据的入队操作。

        (2)、数据出队,取出该节点下的所有子节点,如果数量不为0,取出一个子节点,并将子节点删除。

        (3)、提供判断是否有数据等的api。

        下面为具体代码实现

1、DistributedSimpleQueue类


public class DistributedSimpleQueue{

        protected final ZkClient zkClient;

        protected final String root;

        protected static final String Node_NAME = "n_";

        public DistributedSimpleQueue(ZkClient zkClient, String root) {

                this.zkClient = zkClient;

                this.root = root;

         }

        public int size() {

            return zkClient.getChildren(root).size();

        }

        public boolean isEmpty() {

            return zkClient.getChildren(root).size() == 0;

        } 

        public boolean offer(T element) throws Exception{ 

                String nodeFullPath = root .concat( "/" ).concat( Node_NAME );

                 try { 

                         zkClient.createPersistentSequential(nodeFullPath , element); 

                 }catch (ZkNoNodeException e) { 

                         zkClient.createPersistent(root); 

                         offer(element); 

                 } catch (Exception e) { 

                     throw ExceptionUtil.convertToRuntimeException(e); 

                 } 

                 return true; 

             }

            @SuppressWarnings("unchecked")

            public T poll() throws Exception {

                   try {

                           List  list = zkClient.getChildren(root);

                           if (list.size() == 0) {

                                return null;

                            }

                            Collections.sort(list, new Comparator() {

                                    public int compare(String lhs, String rhs) {

                                            return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));

                                    }

                            });

                            for ( String nodeName : list ){

                                    String nodeFullPath = root.concat("/").concat(nodeName);

                                    try {

                                            T node = (T) zkClient.readData(nodeFullPath);

                                            zkClient.delete(nodeFullPath);

                                            return node;

                                       } catch (ZkNoNodeException e) {

                                                // ignore

                                        }

                            }

                            return null;

                    } catch (Exception e) {

                            throw ExceptionUtil.convertToRuntimeException(e);

                    }

            }

            private String getNodeNumber(String str, String nodeName) {

                    int index = str.lastIndexOf(nodeName);

                    if (index >= 0) {

                        index += Node_NAME.length();

                        return index <= str.length() ? str.substring(index) : "";

                    }

                    return str;

            }

}

2、阻塞队列实现类

public class DistributedBlockingQueue  extends DistributedSimpleQueue{ 

         public DistributedBlockingQueue(ZkClient zkClient, String root) { 

                 super(zkClient, root);

        } 

        @Override

        public T poll() throws Exception {

                while (true){

                        final CountDownLatch latch = new CountDownLatch(1);

                        final IZkChildListener childListener = new IZkChildListener() {

                                public void handleChildChange(String parentPath, List currentChilds) throws Exception {

                                        latch.countDown();

                                }

                        };

                        zkClient.subscribeChildChanges(root, childListener);

                        try{

                                T node = super.poll();

                                if ( node != null ){

                                        return node;

                                }else{

                                        latch.await();

                                }

                        }finally{

                                zkClient.unsubscribeChildChanges(root, childListener);

                        }

                }

            }

}

        阻塞队列的实现利用了CountDownLatch 的特性。当子节点数量为0时,即队列中没有元素,这是线程在此等待,同时监听子节点的变化,如果有数据入队,则从等待返回,取出数据。

3、测试类

public class TestDistributedBlockingQueue {

            public static void main(String[] args) {

                    ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);

                    int delayTime = 5;

                    ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());

                    final DistributedBlockingQueuequeue = new DistributedBlockingQueue(zkClient,"/Queue");

                    final User user1 = new User();

                    user1.setId("1");

                    user1.setName("xiao wang");

                    final User user2 = new User();

                    user2.setId("2");

                    user2.setName("xiao wang");

                    try {

                            delayExector.schedule(new Runnable() {

                                    public void run() {

                                         try {

                                                queue.offer(user1);

                                                queue.offer(user2);

                                            } catch (Exception e) {

                                                    e.printStackTrace();

                                            }

                                        }

                               }, delayTime , TimeUnit.SECONDS);

                               System.out.println("ready poll!");

                                User u1 = (User) queue.poll();

                                User u2 = (User) queue.poll();

                                if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){

                                        System.out.println("Success!");

                                }

                    } catch (Exception e) {

                            e.printStackTrace();

                    } finally{

                                delayExector.shutdown();

                    try {

                                delayExector.awaitTermination(2, TimeUnit.SECONDS);

                    } catch (InterruptedException e) {

                    }

            }

        }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,026评论 19 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,785评论 18 399
  • ... 一、相关概念 中间件:为分布式系统提供协调服务的组件,如专门用于计算服务的机器就是一个计算型中间件,还有专...
    帅可儿妞阅读 496评论 0 0
  • 一、基本数据类型 注释 单行注释:// 区域注释:/* */ 文档注释:/** */ 数值 对于byte类型而言...
    龙猫小爷阅读 4,292评论 0 16
  • 石min阅读 125评论 0 0