zookeeper可实现简单的分布式队列。
curator实现了先入先出的分布式消息队列,它采用的是zookeeper的持久化有序节点。
DistributedQueue是最普通的一种队列(FIFO)
废话不多说,直接上代码
//序列化数据
QueueSerializer queueSerializer = new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes(StandardCharsets.UTF_8);
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes,StandardCharsets.UTF_8);
}
};
//消费者
QueueConsumer<String> consumer = new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println("state changed");
}
@Override
public void consumeMessage(String s) {
//do someThing
System.out.println("消费数据:" + s);
}
};
/**
* client:客户端
* consumer:消费者,它可以接收队列的数据。
* queueSerializer:对队列中的对象的序列化和反序列化
* queuePath:PERSISTENT-SEQUENTIAL:持久化有序节点
* lockPath:TRANSIENT-SEQUENTIAL:消费者加锁, 当消费者消费数据时持有锁,这样其它消费者不能消费此消息,性能损失。
*/
DistributedQueue queue = QueueBuilder.builder(client, consumer, queueSerializer, "/example/customize-queue")
.lockPath("/lockPath-")
.buildQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put("test-" + i);//生产数据
}
Thread.sleep(60000);
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
其他分布式队列
- DistributedQueue:是最普通的一种队列;queue.put(" test-" + i);
- DistributedIdQueue:可以为队列中的每一个元素设置一个ID。 可以通过ID把队列中 任意的元素移除,queue.put(" test-" + i, "Id" + i);
- DistributedPriorityQueue:优先级队列对队列中的元素按照优先级进行排序。 Priority 越小, 元素月靠前, 越先被消费掉,queue.put("test-" + i, priority)
- DistributedDelayQueue:延迟队列,消费者隔一段时间才能收到元素, queue.put(aMessage, delayUntilEpoch);
最后,我想说的是,zookeeper虽然可最分布式队列,但是官方不推荐。主要原因在于在高并发常见下,效果不好。具体情况要看自己的业务常见。
官方解释:http://curator.apache.org/curator-recipes/distributed-queue.html