本文在分布式自增序列的实现(一) ---分布式序号生成器基础上成文,因此直接上解决办法,省去问题的讨论。请先阅读分布式自增序列的实现(一) ---分布式序号生成器。
我们在第一篇提到使用zookeeper的持久化序列node来自动生成分布式序列id,本文将继续讨论使用zookeeper客户单curator提供的DistributedAtomicLong功能实现分布式自增序列的实现。zookeeper是高性能的分布式是协调器,它本身就是为分布式而生的,从名称上看DistributedAtomicLong, 就类似java自带的AtomicLong,只不是java是的不是分布式的,而DistributedAtomicLong是分布式的。
DistributedAtomicLong功能介绍
官方文档http://curator.apache.org/curator-recipes/distributed-atomic-long.html
A counter that attempts atomic increments. It first tries using optimistic locking. If that fails, an optional InterProcessMutex is taken. For both optimistic and mutex, a retry policy is used to retry the increment
可以看到它是有符号的长整型,基本满足我们的大部分需求,如果长度不够可以通过两个字段结合实现更大的序列范围。从介绍可以看到,首先使用乐观锁,如果乐观锁失败,就使用Curator提供的InterProcessMutex锁。InterProcessMutex是Curator基于zookeeper提供的分布式锁。
具体代码实现
我直接使用的Curator提供的CuratorFramework 连接zookeeper,然后创建DistributedAtomicLong 对象, 有10个线程同时调用DistributedAtomicLong获取sequencem, 同时运行两个实例,也就是有20个线程,没10个线程使用一个DistributedAtomicLong 对象获取sequence。
完整代码在这里,欢迎加星,fork。 pom文件代码中有。
本程序启动时会自动连接zookeeper, 请先配置好自己的zookeeper服务器ip和端口等信息,并保证zookeeper启动着。
本示例代码使用了DistributedAtomicLong,请参考官方文档http://curator.apache.org/curator-recipes/distributed-atomic-long.html.
package com.yq;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
* Simple to Introduction
* className: ZKDistributedAtomicLongSequenceApp
*
* @author EricYang
* @version 2018/12/01 22:43
*/
@Slf4j
public class ZKDistributedAtomicLongSequenceApp {
private static final String COUNTER_ZNODE = "/yqlock_pathDistAtomicLong";
//like this "127.0.0.1:2181,"192.168.1.132:2181";
private static final String ZK_SERVERS = "127.0.0.1:2181";
static final int SESSION_OUTTIME = 15000;
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_SERVERS)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
cf.start();
final CountDownLatch countdown = new CountDownLatch(1);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
DistributedAtomicLong distAtomicLong = new DistributedAtomicLong(cf, COUNTER_ZNODE, retryPolicy);
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
//这里使用CountDownLatch,是为了保证10个线程同时启动,每个县被创建的线程都在await,等10个创建完成后,在主线程调用了countDown
countdown.await();
AtomicValue<Long> sequence = distAtomicLong.increment();
if (sequence.succeeded()) {
Long seq = sequence.postValue();
log.info("threadId={}, sequence={}", Thread.currentThread().getId(), seq);
} else {
log.warn("threadId={}, no sequence", Thread.currentThread().getId());
}
cyclicBarrier.await();
} catch (Exception e) {
log.error("acquire section exception.", e);
}
}
},"t" + i).start();
}
Thread.sleep(300);
//10个线程开始执行
countdown.countDown();
log.info("countDown");
cyclicBarrier.await();
log.info("End");
}
}