项目背景
由于项目由之前单一地区的推广还算顺利,因此后面规划是面向全国范围推广,之前的小步快走的开发模式决定了初期的项目架构使用了mysql的单库,随着全国范围的推广肯定单库的高可用模式会面临问题,因此要求进行数据的分库分表拆分,项目一开始主键使用的自增主键,自增主键的N多好处不用再强调(索引查找效率、范围查找,巴拉巴拉)但是如何分库分表以后就会面临自增主键的不适用,因此考虑引入分布式id生成组件"leaf",开源的有很多百度,滴滴,美团都有。最终比较后选择了美团。
源码分析的意义与价值
如果不能把控的中间件引入是灾难性的,因此在引入leaf之前功课也是要做足的,包括后面可能存在的定制化需求,当初选择阿里开源的RocketMq的原因也是因为是java语言,团队成员起码可以有些问题从源码入手。
好了开始分析源码,leaf提供了两种id生成方式一种是基于mysql分段双buffer模式,一种是基于zookeeper的。
分段id生成器
宏观上先说明一下,这里不得不引申一下一个概念方便理解,就是并发控制的本质到底是什么,并发控制的本质个人总结如下,多个线程(可能来自一个java进程也就是同一个java虚拟机,也可能来自于不同的机器上面不同的虚拟机)那这里面其实就是两个不同概念,一个是并发控制,另一个是分布式并发控制,他们的道理类似,在实现上都是通过锁定一块共享区域某一个共享区域,往往中间件作者为了性能会采用内存来进行锁定,比如之前文章的redis分布式锁,比如Zookeeper里面系统并发控制也是通过DataTree里面dataNode的parent的对象头进行锁定的。这些无外乎要么利用的redis的单线程的特性,亦或者利用了zookeeper一主多从情况下的写入安全性来完成的各种并发控制。
美团实现的分段id生成器使用的则是mysql数据库完成数据的共享,并通过update语句加锁完成并发控制。接下来分析一下分段id生成的核心代码就一个类。
package com.sankuai.inf.leaf.segment;
import com.sankuai.inf.leaf.IDGen;
import com.sankuai.inf.leaf.common.Result;
import com.sankuai.inf.leaf.common.Status;
import com.sankuai.inf.leaf.segment.dao.IDAllocDao;
import com.sankuai.inf.leaf.segment.model.*;
import org.perf4j.StopWatch;
import org.perf4j.slf4j.Slf4JStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class SegmentIDGenImpl implements IDGen {
private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class);
/**
* IDCache未初始化成功时的异常码
*/
private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
/**
* key不存在时的异常码
*/
private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
/**
* SegmentBuffer中的两个Segment均未从DB中装载时的异常码
*/
private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
/**
* 最大步长不超过100,0000
*/
private static final int MAX_STEP = 1000000;
/**
* 一个Segment维持时间为15分钟
* 这里指的是一个系统默认认为的合理时间,主要用于调整buffer里面步长的大小,如果当前次更新距离上次更新时间超过15分钟的话
* 那么步长就会动态调整为二分之一之前的长度,如果说当次更新时间距离上次更新时间未超过15分钟那么说明系统压力大,那么就适当调整步长到2倍直到最大步长
* MAX_STEP
*/
private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
/**
* 用来更新本地的buffer的线程池,用来更新每个tag对应的segmentBufferr里面备用的segment
*/
private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());
/**
* 初始化状态,主要用来标记mysql中tag是否初次被同步进入内存中
*/
private volatile boolean initOK = false;
/**
* 用来保存每个tag对应的segmentBuffer,业务通过tag进行隔离,并且此处使用了并发安全的容器,主要是防止在刷新tag的时候出现线程不安全的问题
*/
private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();
/**
* 主要是用来与mysql打交道,加载tag,加载step,更新maxid
*/
private IDAllocDao dao;
/**
* 作者比较优秀,为了刷新线程起一个比较好听的名字特意写了个一个工厂,哈哈并且内部做了一个线程计数的变量
*/
public static class UpdateThreadFactory implements ThreadFactory {
private static int threadInitNumber = 0;
private static synchronized int nextThreadNum() {
return threadInitNumber++;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
}
}
/**
* 暴露给外部调用用来初始化分段id生成器的功能,主要包括更新所有的tag进入到内存中,并且启动一个单线程的守护线程去做定时刷新这些tag的操作,
* 间隔60秒,这里之所以用单线程的线程池我个人的判断是为了充分利用阻塞的特性,因为在极端的情况下60秒加载不完那么就阻塞着在哪里,当然,绝大多数业务
* 一分钟肯定是能够加载完的。
*/
@Override
public boolean init() {
logger.info("Init ...");
// 确保加载到kv后才初始化成功
updateCacheFromDb();
initOK = true;
updateCacheFromDbAtEveryMinute();
return initOK;
}
/**
* 刷新缓存的方法。单线程每隔60秒刷新一次tag,与mysql 同步一次tag的信息
*/
private void updateCacheFromDbAtEveryMinute() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("check-idCache-thread");
t.setDaemon(true);
return t;
}
});
service.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateCacheFromDb();
}
}, 60, 60, TimeUnit.SECONDS);
}
/**
* 从mysql中同步tag的信息进入内存中,这里作者做的也很巧妙,并不着急立马就去加载segment我们看到SegmentBuffer里面有一个初始化是否成功的标志字段
* initOk 他标志着目前这个segmentBuffer是否可用,但是这个方法里面默认是false的,作者在这里巧妙的利用了懒加载的方式,将max的值的更改延后,因为我们思考一种弄场景
* leaf在美团中可能是全集团公用的,可能部署了上百个节点,那么很有可能这些服务会面临重启,如果每次重启都会默认更新mysql的话,一方面会浪费非常多的step的id,另外一方面很有可能
* 就算浪费了id也可能会用不到,因此这里面用户使用了懒加载的思想只是先进行占位,当用户在真正使用的时候再去查询并填充segment并更新mysql,因此这里面有个细节就是
* 如果系统极端在乎平滑性,那么在leaf在对外提供服务前,先手动调用一次,以确保segment被填充完善,降低延时性。
*/
private void updateCacheFromDb() {
logger.info("update cache from db");
StopWatch sw = new Slf4JStopWatch();
try {
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌进cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//cache中已失效的tags从cache删除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
} catch (Exception e) {
logger.warn("update cache from db exception", e);
} finally {
sw.stop("updateCacheFromDb");
}
}
/**
* 主力接口,用于对外界提供id
* 判断当前tag缓存是否已经就绪,如果未就绪直接报错,因此要求调用方应该先调用init(),进行基础环境的就绪
* 缓存就绪成功,从缓存中查看客户端请求的key是否存在,不存在的可能有两种,一种是mysql中没有,这个需要等大概60秒才会刷新,因此在leaf使用过程中应该提前就绪好mysql,让让多个leaf服务都能刷新到相应的key
* 另外一种可能就是mysql中也没有,当然也会造成cache中没有,两种情况造成的缓存中没有,系统都会返回key不存在,id生成失败
* 如果缓存中也恰好查到了有key,那么就会因为懒加载的原因造成可能segmentBuffer没有初始化,(任何事情都有两面性)
* 我们看到美团的处理方式是通过锁定对应的segmentBuffer的对象头,可以说也是无所不用其极的减低锁粒度,不得不说一句nice
* 另外我们看到使用了双重检查,防止并发问题,这里多啰嗦一句为什么回出现并发问题,两个线程都到了synchronized的临界区后,一个线程拿到了buffer的头锁,进入可能去更新mysql了,如果他执行完他会放开头锁
* 但是如果不通过判断那么他也会继续执行更新mysql的操作,因此造成不满足我们预期的事情发生了,所以这里通过一个initok的一个标志进行双重判定,那么就算是第二个线程进入后因为第一个线程退出前就更新了linitok为true
* 所以第二个线程进来后还是不能更新mysql就安全出去临界区了。
*
*/
@Override
public Result get(final String key) {
if (!initOK) {
return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
}
if (cache.containsKey(key)) {
SegmentBuffer buffer = cache.get(key);
if (!buffer.isInitOk()) {
synchronized (buffer) {
if (!buffer.isInitOk()) {
try {
updateSegmentFromDb(key, buffer.getCurrent());
logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
buffer.setInitOk(true);
} catch (Exception e) {
logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
}
}
}
}
return getIdFromSegmentBuffer(cache.get(key));
}
return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}
/**
* 这个方法主要是用来更新并填充好,指定key对应的SegmentBuffer
* StopWatch 是一个计时器,作者考虑到这个方法的性能问题,因此加了一个监控
* 先是判断指定的segmentBuffer是否初始化完成,如果没有初始化完成也就是说没有向数据库去申请id段,那么就去取申请并填充进segmentBuffer
* 如果是已经初始化完成了,第二个分支其实特定指的是第二次申请
*/
public void updateSegmentFromDb(String key, Segment segment) {
StopWatch sw = new Slf4JStopWatch();
SegmentBuffer buffer = segment.getBuffer();
LeafAlloc leafAlloc;
//第一次申请id段
if (!buffer.isInitOk()) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
} else if (buffer.getUpdateTimestamp() == 0) {
//第二次申请id段,因为之前的第一次申请动作谈不上更新,因此在第二次的时候将更新时间进行填充
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
} else {
//第N次申请id段动态优化步长,我们看到有一个指定的时间以15分钟为例,如果两次领取间隔少于15分钟那么就将step拉大一倍,但是不会超过系统默认的10W的step
// 这样做的好处其实也是降低mysql压力
//如果两次申请的超过30分钟那么就将步长调整为原来的一半,但是不会小于最小步长
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
//do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
//do nothing with nextStep
} else {
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
LeafAlloc temp = new LeafAlloc();
temp.setKey(key);
temp.setStep(nextStep);
leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(nextStep);
buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
}
// must set value before set max
/**
* 此处很坑,这是第一版本留下的无效注释
* https://github.com/Meituan-Dianping/Leaf/issues/16
* 可以不用强制要求的,因为是单线程更新,并且buffer还没有就绪因此不存在优先可见的问题
*/
long value = leafAlloc.getMaxId() - buffer.getStep();
segment.getValue().set(value);
segment.setMax(leafAlloc.getMaxId());
segment.setStep(buffer.getStep());
sw.stop("updateSegmentFromDb", key + " " + segment);
}
/**
* 核心处理方法
* 通过读写锁提升并发,读锁主要负责id的自增,但是如果只是自增那么靠automic操作就够,因此还涉及到segment的切换,因此此处使用了读写锁进行分离
* 当需要切换segment的时候读锁也会被挂起来,因为如果不挂起的话会出现脏读。
* 方法的核心思想总结如下
* 通过while循环,死循环的去取数据,先是拿到读锁,此处总结一下 JUC包里面的读写锁的特性,读读可并行,读写不可并行,写写不可并行。
* 在这里从概念上先完成梳理
* 1、备用buffer的更新是由单线程完成的,这里面是通过cas更新ThreadRunning实现的,因此备用buffer的更新是安全的
* 2、id的自增是通过AutomicLong实现的因此也不存在自增时候的线程安全问题
* 3、主备buffer的切换是由读写锁来进行控制的,读锁生效时候时候要么能够自增成功则返回,要么自增不成功,线程开始抢写锁,如果抢上,那么新来的读锁请求就会被挂起,
* 直到写锁完成buffer的切换,然后通过while循环自增后返回id
*/
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
while (true) {
buffer.rLock().lock();
try {
final Segment segment = buffer.getCurrent();
if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
service.execute(new Runnable() {
@Override
public void run() {
Segment next = buffer.getSegments()[buffer.nextPos()];
boolean updateOk = false;
try {
updateSegmentFromDb(buffer.getKey(), next);
updateOk = true;
logger.info("update segment {} from db {}", buffer.getKey(), next);
} catch (Exception e) {
logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} finally {
if (updateOk) {
buffer.wLock().lock();
buffer.setNextReady(true);
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
} else {
buffer.getThreadRunning().set(false);
}
}
}
});
}
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
} finally {
buffer.rLock().unlock();
}
waitAndSleep(buffer);
buffer.wLock().lock();
try {
//这里进行这么判断是因为可能有多个写锁排队在这里,一个写锁更新成了后,那么后面的线程直接取就好,不需要走后续的修改操作了。
final Segment segment = buffer.getCurrent();
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
//检查备用buffer是否完成了准备,如果准备完成则进行切换,如果未准备完成则抛出异常代表主buffer还有从buffer都没有准备好,系统暂时不可用。
//产生的原因可能是刷新线程池阻塞,这可能性还是蛮小的,这也是为什么中间件作者在 update代码段加入 stopwatch监控的原因吧。
if (buffer.isNextReady()) {
buffer.switchPos();
buffer.setNextReady(false);
} else {
logger.error("Both two segments in {} are not ready!", buffer);
return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
}
} finally {
buffer.wLock().unlock();
}
}
}
/**
* 让当前线程进入死循环等待,为了降低无效的cpu轮训如果循环次数超过一万后就休眠10ms
*/
private void waitAndSleep(SegmentBuffer buffer) {
int roll = 0;
while (buffer.getThreadRunning().get()) {
roll += 1;
if(roll > 10000) {
try {
TimeUnit.MILLISECONDS.sleep(10);
break;
} catch (InterruptedException e) {
logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
break;
}
}
}
}
public List<LeafAlloc> getAllLeafAllocs() {
return dao.getAllLeafAllocs();
}
public Map<String, SegmentBuffer> getCache() {
return cache;
}
public IDAllocDao getDao() {
return dao;
}
public void setDao(IDAllocDao dao) {
this.dao = dao;
}
}
核心类的源码分析都记录在注释之中了,下面贴一张图方便理解,来自于美团团队
雪花id生成器
美团使用的是zookeeper实现的,zookeeper在此中间件中扮演的角色总结如下,用来存储每个分布式节点的ip,port信息,主要是每天机器的时间戳,这个信息用于保障每个节点生成的id不会出现回拨现象,出现回拨的原因分析如下
41bit的计算方式是机器的当前时间减去系统初始化的时间的时间戳,leaf给的是 :Thu Nov 04 2010 09:42:54 GMT+0800 (中国标准时间) 1288834974657L的差值填充41bit的空间,从id的组成我们可以知道10bit的工作机器id不同就能保障一定的程度id不同性,但是人如果41bit的位置出现时间回拨后那么单一worker生成的id就可能会出现重复的id。
源码分析
这个类主要是处理zk的连接,以及数据存储策略,包括如果分配workerid,以及workerid也会存储在每个节点的本地化文件中,从代码中我们可以知道,每个zk的path是这样的
/snowflake/leafname/forever/ip:port-0000000001 后面的数字就是workerid由zk分配的
public class SnowflakeZookeeperHolder {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeZookeeperHolder.class);
private String zk_AddressNode = null;//保存自身的key ip:port-000000001
private String listenAddress = null;//保存自身的key ip:port
private int workerID;
private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("leaf.name");
private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties";
private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有数据持久的节点
private String ip;
private String port;
private String connectionString;
private long lastUpdateTime;
public SnowflakeZookeeperHolder(String ip, String port, String connectionString) {
this.ip = ip;
this.port = port;
this.listenAddress = ip + ":" + port;
this.connectionString = connectionString;
}
/**
* 启动应用使用zk的curator客户端连接zk,判断leaf指定业务的根节点是否存在,leaf中使用的划分逻辑如下
* /snowflake/leafname/forever/ip:port-0000000001
* /snowflake/leafname(不同业务可以使用不同名字)订单,用户/forever/ip:port(这里指的是提供id生产服务的机器)-0000000001(顺序节点序号)
* 里面的内容存的是endpoint内容{"ip","xxx.xxx.xxx.xxx","port":"8080","timestamp":"timestamp"}
* 先扫描业务目录,如果业务目录不存在那么说明第一次启动这个业务,因此当前主机要把自己的信息保存进去,默认id为0,这里面可能会存在潜在的并发问题??
*/
public boolean init() {
try {
CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
curator.start();
Stat stat = curator.checkExists().forPath(PATH_FOREVER);
if (stat == null) {
//不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
zk_AddressNode = createNode(curator);
//worker id 默认是0
//guozc 潜在并发问题??两个节点同时去创建节点都成功了,因为worker默认是0可能会造成本地文件存储的id为0,极端情况下?
updateLocalWorkerID(workerID);
//定时上报本机时间给forever节点
//定时任务每三秒钟上报一下本机信息,里面关键信息是每次上报的时间戳,防止数显时钟回拨
ScheduledUploadData(curator, zk_AddressNode);
return true;
} else {
//业务目录存在那么就检查是否有自己的节点
Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001
Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)
//存在根节点,先检查是否有属于自己的根节点
List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
for (String key : keys) {
String[] nodeKey = key.split("-");
realNode.put(nodeKey[0], key);
nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
}
Integer workerid = nodeMap.get(listenAddress);
if (workerid != null) {
//有自己的节点,zk_AddressNode=ip:port
zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
workerID = workerid;//启动worder时使用会使用
if (!checkInitTimeStamp(curator, zk_AddressNode)) {
throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
}
//准备创建临时节点
doService(curator);
updateLocalWorkerID(workerID);
LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID);
} else {
//表示新启动的节点,创建持久节点 ,不用check时间
String newNode = createNode(curator);
zk_AddressNode = newNode;
String[] nodeKey = newNode.split("-");
workerID = Integer.parseInt(nodeKey[1]);
doService(curator);
updateLocalWorkerID(workerID);
LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID);
}
}
} catch (Exception e) {
LOGGER.error("Start node ERROR {}", e);
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1);
return false;
}
}
return true;
}
private void doService(CuratorFramework curator) {
ScheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001
}
private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "schedule-upload-time");
thread.setDaemon(true);
return thread;
}
}).scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateNewData(curator, zk_AddressNode);
}
}, 1L, 3L, TimeUnit.SECONDS);//每3s上报数据
}
/**
* 检查zookeeper中数据是否小于当前机器的系统时间,因为每个机器在zk中都有一个自己的节点用于存储endpoint数据
*/
private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNode) throws Exception {
byte[] bytes = curator.getData().forPath(zk_AddressNode);
Endpoint endPoint = deBuildData(new String(bytes));
//该节点的时间不能小于最后一次上报的时间
return !(endPoint.getTimestamp() > System.currentTimeMillis());
}
/**
* 创建持久顺序节点 ,并把节点数据放入 value
*
* @param curator
* @return
* @throws Exception
*/
private String createNode(CuratorFramework curator) throws Exception {
try {
return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes());
} catch (Exception e) {
LOGGER.error("create node error msg {} ", e.getMessage());
throw e;
}
}
private void updateNewData(CuratorFramework curator, String path) {
try {
if (System.currentTimeMillis() < lastUpdateTime) {
return;
}
curator.setData().forPath(path, buildData().getBytes());
lastUpdateTime = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.info("update init data error path is {} error is {}", path, e);
}
}
/**
* 构建需要上传的数据
*
* @return
*/
private String buildData() throws JsonProcessingException {
Endpoint endpoint = new Endpoint(ip, port, System.currentTimeMillis());
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(endpoint);
return json;
}
/**
* 将json字符串转换为endpoint对象
*/
private Endpoint deBuildData(String json) throws IOException {
ObjectMapper mapper = new ObjectMapper();
Endpoint endpoint = mapper.readValue(json, Endpoint.class);
return endpoint;
}
/**
* 在节点文件系统上缓存一个workid值,zk失效,机器重启时保证能够正常启动
*
* @param workerID
*/
private void updateLocalWorkerID(int workerID) {
File leafConfFile = new File(PROP_PATH.replace("{port}", port));
boolean exists = leafConfFile.exists();
LOGGER.info("file exists status is {}", exists);
if (exists) {
try {
FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false);
LOGGER.info("update file cache workerID is {}", workerID);
} catch (IOException e) {
LOGGER.error("update file cache error ", e);
}
} else {
//不存在文件,父目录页肯定不存在
try {
boolean mkdirs = leafConfFile.getParentFile().mkdirs();
LOGGER.info("init local file cache create parent dis status is {}, worker id is {}", mkdirs, workerID);
if (mkdirs) {
if (leafConfFile.createNewFile()) {
FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false);
LOGGER.info("local file cache workerID is {}", workerID);
}
} else {
LOGGER.warn("create parent dir error===");
}
} catch (IOException e) {
LOGGER.warn("craete workerID conf file error", e);
}
}
}
private CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
return CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.build();
}
/**
* 上报数据结构
*/
static class Endpoint {
private String ip;
private String port;
private long timestamp;
public Endpoint() {
}
public Endpoint(String ip, String port, long timestamp) {
this.ip = ip;
this.port = port;
this.timestamp = timestamp;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}
public String getZk_AddressNode() {
return zk_AddressNode;
}
public void setZk_AddressNode(String zk_AddressNode) {
this.zk_AddressNode = zk_AddressNode;
}
public String getListenAddress() {
return listenAddress;
}
public void setListenAddress(String listenAddress) {
this.listenAddress = listenAddress;
}
public int getWorkerID() {
return workerID;
}
public void setWorkerID(int workerID) {
this.workerID = workerID;
}
public static void main(String[] args) {
try {
System.out.println(Integer.parseInt("0000000008"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
另外一个核心类是雪花的实现细节,包括了一个id是如何生成的,还有就是时间是如何更新到zk的,还有就是人如果当前毫秒中id分配不够了leaf是如何处理的
/**
* 使用位运算拼接一个long类型的id出来,主要是利用时间做高41位的内容,中间是10bit的机器id,也基本够用了,一个服务的id生成理论上不会超过1023个服务节点
* 最后的12bit用来做递增
*/
public class SnowflakeIDGenImpl implements IDGen {
@Override
public boolean init() {
return true;
}
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeIDGenImpl.class);
//开始时间戳
//个数字可以指定为任意小于当前时间的数字,这样就能让竞争对手无法知道我们的id信息了,这就解决了基于mysql的segement方案造成的容易被竞争对手监控的问题了,因为有时间维度的参与,对手不知道我们每时每刻的id发放信息
private final long twepoch;
//wokerid占用的位数
private final long workerIdBits = 10L;
//worker最大的id1023
private final long maxWorkerId = ~(-1L << workerIdBits);//最大能够分配的workerid =1023
//每毫秒的id数字最大值
private final long sequenceBits = 12L;
//workerid的偏移量
private final long workerIdShift = sequenceBits;
//时间戳位移数
private final long timestampLeftShift = sequenceBits + workerIdBits;
//每个毫秒生成id数的掩码,用来进行与运算提高运算效率,他让自己的高位是1,其他都是0那么在与的时候如果没满则是sequence 如果是0说明与的那个数字二进制后12位全是0了,也就是满了,因此会休息一个死循环的时间然后继续生成id
private final long sequenceMask = ~(-1L << sequenceBits);
//工作节点的id
private long workerId;
//每个毫秒自增id数字
private long sequence = 0L;
//保存上次生成id时候的时间戳
private long lastTimestamp = -1L;
//new一个随机函数对象,多线程公用,并发性问题交给了synchronized关键字,并且公用对象后降低了new的成本
private static final Random RANDOM = new Random();
public SnowflakeIDGenImpl(String zkAddress, int port) {
//Thu Nov 04 2010 09:42:54 GMT+0800 (中国标准时间)
this(zkAddress, port, 1288834974657L);
}
/**
* @param zkAddress zk地址
* @param port snowflake监听端口
* @param twepoch
*
* 初始化应用,主要是SnowflakeZookeeperHolder 里面的初始化,包括节点的创建,或者数据的同步,还有主要是完成时间的检查,方式工作节点始终回拨
* 并且将workerid进行赋值,方便生成id时候使用
*/
public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) {
this.twepoch = twepoch;
Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime");
final String ip = Utils.getIp();
SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress);
LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port);
boolean initFlag = holder.init();
if (initFlag) {
workerId = holder.getWorkerID();
LOGGER.info("START SUCCESS USE ZK WORKERID-{}", workerId);
} else {
Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok");
}
Preconditions.checkArgument(workerId >= 0 && workerId <= maxWorkerId, "workerID must gte 0 and lte 1023");
}
/**
* 核心方法这个就是获得雪花id的方法,因为是按照时间轴进行发布的,因此不存在不同的业务key的隔离,因为所有的业务的id都不会重复,(就是这么的任性)
* 先取到系统时间戳,然后跟对象中的 lastTimestamp比较如果系统时间比对象时间回拨了5毫秒那么久稍作休息wait一下,也就是等待两倍的毫秒数,因为左移动1二进制翻一倍,
* 如果线程醒过来后还是有偏移量那么就返回错误。如果偏移量超过5毫秒,那么代表着偏移量太大,那么就返回错误,
* 如果对象中的 lastTimestamp 与当前机器中系统时间一样,这里面说明一下,这种情况下肯定是比较高的并发情况下的必然了,因为每次发放id后对象时间都会被置为当时取的系统时间
* 也就是一个毫秒中会发憷多个id,那么处理逻辑就是给 sequence不停的加一,这里面的与其实就是2的12次方-1,也就是整了个sequence的最大值,这样出现0代表 sequence + 1变成了
* 2的12次方-1了,那么也就是意味着并发真的很大,一毫秒中的id被打光了,那么系统就调用 tilNextMillis 进行死循环的等待,因为这种等待是毫秒级的,所以使用了while循环
* 如果是新的毫秒是就生成一个随机数,作为sequence的新值,紧接着对lastTimestamp赋值,然后利用位运算生成一个long的id进行返回
*/
@Override
public synchronized Result get(String key) {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
wait(offset << 1);
timestamp = timeGen();
if (timestamp < lastTimestamp) {
return new Result(-1, Status.EXCEPTION);
}
} catch (InterruptedException e) {
LOGGER.error("wait interrupted");
return new Result(-2, Status.EXCEPTION);
}
} else {
return new Result(-3, Status.EXCEPTION);
}
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 为0的时候表示是下一毫秒时间开始对seq做随机
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//如果是新的ms开始
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
return new Result(id, Status.SUCCESS);
}
/**
* 通过死循环的形式确保时间进行了后移,因为最多也就是停留一毫秒,所以使用死循环的形式代价更低
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
protected long timeGen() {
return System.currentTimeMillis();
}
public long getWorkerId() {
return workerId;
}
public static void main(String[] args) {
Date date = new Date("Mon 6 Jan 1997 13:3:00");
long id = ((System.currentTimeMillis()- date.getTime()) << 22L) | (10 << 12L) | (1) & ~(-1L << 12L);
System.out.println(id);
System.out.println(10 << 12L);
System.out.println(Long.toBinaryString(id));
}
}