分布式系统和服务器集群
布式和集群是不⼀样的,分布式⼀定是集群,但是集群不⼀定是分布式。
因为集群就是多个实例⼀起⼯作,分布式将⼀个系统拆分之后那就是多个实例;集群并不⼀定是分布式,因为复制型的集群不是拆分⽽是复制,没有体现出整个系统的功能是分散在各个节点上实现的。
第一部分:一致性Hash算法
Hash算法,⽐如说在安全加密领域MD5、SHA等加密算法,在数据存储和查找⽅⾯有Hash表等, 以上都应⽤到了Hash算法。
为什么需要使⽤Hash?
Hash算法较多的应⽤在数据存储和查找领域,最经典的就是Hash表,它的查询效率⾮常之⾼,其中的哈希算法如果设计的⽐较ok的话,那么Hash表的数据查询时间复杂度可以接近于O(1)。
示例
需求:提供⼀组数据 1,5,7,6,3,4,8,对这组数据进⾏存储,然后随便给定⼀个数n,请你判断n是否存在于刚才的数据集中?
顺序查找法:通过循环来完成,⽐较原始,效率也不⾼
public static void main(String[] args) {
int key = 5;
int a[] = new int[]{1,5,7,6,3,4,8};
for (int i = 0; i < a.length; i++) {
if(a[i]==5){
System.out.println("找到了,下标="+i);
return;
}
}
}
二分查找:排序之后折半查找,相对于顺序查找法会提⾼⼀些效率,但是效率也并不是特别好
public static void main(String[] args) {
int key = 1;
int a[] = new int[]{1,3,4,5,6,7,8};
int height = a.length-1;
int low = 0;
while (height >= low){
int index = (height-low)/2+low;
System.out.println(index);
if(a[index] == key){
System.out.println("找到了,下标="+index);
break;
}
if(a[index]>key){
height = index;
}
if(a[index]<key){
low = index+1;
}
}
}
直接寻址法:直接把数据和数组的下标绑定到⼀起,查找的时候,直接array[n]就取出了数据。优点:速度快,一次查找得到结果。缺点:只适用于关键字的全域比较小,而且没有两个元素的关键字完全相同。
public static void main(String[] args) {
int key = 3;
int a[] = new int[]{1,3,4,5,6,7,8};
int b[] = new int[1000];
for (int i = 0; i < a.length; i++) {
b[a[i]] = i+1;
}
if(b[key] == 0) {
System.out.println(key+"不存在");
} else {
System.out.println("找到了,下标="+(b[key]-1));
}
}
除留余数法:可能会出现Hash冲突
public static void main(String[] args) {
int key = 7;
int a[] = new int[]{1,3,4,5,6,7,8};
int b[] = new int[5];
for (int i = 0; i < a.length; i++) {
int hashCode = a[i]%b.length;
b[hashCode] = i+1;
}
if(b[key%b.length] == 0) {
System.out.println(key+"不存在");
} else {
System.out.println("找到了,下标="+(b[key%b.length]-1));
}
}
由此衍生出开放寻址法和拉链法。
开放地址法:
根据以上hash函数计算数组下标时,当遇到数据存放的冲突时就需要重新找到数组的其他位置。关于开放地址法通常需要有三种方法:线性探测、二次探测、再哈希法。
线性探测:
线性探测就是使用算术取余的方法计算余数,当产生冲突时就通过线性递增的方法进行探测,一直到数组的位置为空,插入数据项即可。
private static int b[] = new int[5];
public static void main(String[] args) {
int a[] = new int[]{1,3,4,5,6};
for (int i = 0; i < a.length; i++) {
int hashCode = getHashCode(a[i]);
b[hashCode] = i+1;
}
for (int i = 0; i < b.length; i++) {
System.out.println(b[i]);
}
}
private static int getHashCode(int i) {
int hashCode = i%b.length;
while(b[hashCode]!=0) {
hashCode++;
hashCode%=b.length;
}
return hashCode;
}
二次探测:
在线性探测过程中会产生数据聚集问题,当数据聚集越来越大时,数据经哈希化后就需要插在聚集的后端。这样会使得效率变得很低。二次探测是防止聚集产生的一种尝试,相隔比较远的单元进行探测,而不是线性一个个的探测。
二次探测是过程是x+1,x+4,x+9,以此类推。二次探测的步数是原始位置相隔的步数的平方。
从数组的x[0]开始探测,第一次探测x[1]不为空,继续探测x[4],依然不为空继续探测,x[9]不为空依次继续,直至不超过数组边界。
再哈希法:
再哈希是把关键字用不同的哈希函数再做一遍哈希化,用这个结果作为步长,对指定的关键字,探测的步长是不变的,可以说不同的关键字可以使用不同的步长,并且步长可以控制。一般来说,再哈希函数可以采用以下这种:
stepSize=constant-(key%constant);
虽然不同的关键字可能会映射到相同的数组单元,但是可能会有不一样的探测步长。假设可以使用步长1~5进行探测。步长是不能为零的,不然就会形成死循环。再哈希的实现方式可以在线性探测的基础上添加一个再哈希函数即可,对应在delete和find方法里面修改相关操作。
//再哈希函数
public int hashFunction2(int key)
{
return 5-key%5;//此处设置为探测步长为1~5
}
...
//在delete方法里添加以下语句
int stepSize=hashFunction2(key);
...
//将++hahshVal替换成以下
hashVal+=stepSize;
...
拉链地址法:
把具有相同散列地址的关键字(同义词)值放在同一个单链表中,称为同义词链表。
有m个散列地址就有m个链表,同时用指针数组A[0,1,2…m-1]存放各个链表的头指针,凡是散列地址为i的记录都以结点方式插入到以A[i]为指针的单链表中。A中各分量的初值为空指针。当新来的元素映射到冲突的数组位置时,就会插入到链表的头部。
简单总结
所以,Hash表的查询效率⾼不⾼取决于Hash算法,hash算法能够让数据平均分布,既能够节省空间⼜能提⾼查询效率。hashcode其实也是通过⼀个Hash算法得来的。
第 1 节 Hash算法应⽤场景
Hash算法在分布式集群架构中的应⽤场景
Hash算法在很多分布式集群产品中都有应⽤,⽐如分布式集群架构Redis、Hadoop、ElasticSearch,Mysql分库分表,Nginx负载均衡等
主要的应⽤场景归纳起来两个
- 请求的负载均衡(⽐如nginx的ip_hash策略)
Nginx的IP_hash策略可以在客户端ip不变的情况下,将其发出的请求始终路由到同⼀个⽬标服务器上,实现会话粘滞,避免处理session共享问题。我们可以对ip地址或者sessionid进⾏计算哈希值,哈希值与服务器数量进⾏取模运算,得到的值就是当前请求应该被路由到的服务器编号,如此,同⼀个客户端ip发送过来的请求就可以路由到同⼀个⽬标服务器,实现会话粘滞。 - 分布式存储
以分布式内存数据库Redis为例,集群中有redis1,redis2,redis3 三台Redis服务器
那么,在进⾏数据存储时,<key1,value1>数据存储到哪个服务器当中呢?针对key进⾏hash处理hash(key1)%3=index, 使⽤余数index锁定存储的具体服务器节点。
第 2 节 普通Hash算法存在的问题
普通Hash算法存在⼀个问题,以ip_hash为例,假定下载⽤户ip固定没有发⽣改变,现在tomcat3出现了问题,down机了,服务器数量由3个变为了2个,之前所有的求模都需要重新计算。
如果在真实⽣产情况下,后台服务器很多台,客户端也有很多,那么影响是很⼤的,缩容和扩容都会存在这样的问题,⼤量⽤户的请求会被路由到其他的⽬标服务器处理,⽤户在原来服务器中的会话都会丢失。
第 3 节 ⼀致性Hash算法
⼀致性哈希算法思路如下:
⾸先有⼀条直线,直线开头和结尾分别定为为1和2的32次⽅减1,这相当于⼀个地址,对于这样⼀条线,弯过来构成⼀个圆环形成闭环,这样的⼀个圆环称为hash环。我们把服务器的ip或者主机名求hash值然后对应到hash环上,那么针对客户端⽤户,也根据它的ip进⾏hash求值,对应到环上某个位置,然后如何确定⼀个客户端路由到哪个服务器处理呢?按照顺时针⽅向找最近的服务器节点
假如将服务器3下线,服务器3下线后,原来路由到3的客户端重新路由到服务器4,对于其他客户端没有影响只是这⼀⼩部分受影响(请求的迁移达到了最⼩,这样的算法对分布式集群来说⾮常合适的,避免了⼤量请求迁移 )
增加服务器5之后,原来路由到3的部分客户端路由到新增服务器5上,对于其他客户端没有影响只是这⼀⼩部分受影响(请求的迁移达到了最⼩,这样的算法对分布式集群来说⾮常合适的,避免了⼤量请求迁移 )
1)如前所述,每⼀台服务器负责⼀段,⼀致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。但是,⼀致性哈希算法在服务节点太少时,容易因为节点分部不均匀⽽造成数据倾斜问题。例如系统中只有两台服务器,其环分布如下,节点2只能负责⾮常⼩的⼀段,⼤量的客户端请求落在了节点1上,这就是数据(请求)倾斜问题
2)为了解决这种数据倾斜问题,⼀致性哈希算法引⼊了虚拟节点机制,即对每⼀个服务节点计算多个哈希,每个计算结果位置都放置⼀个此服务节点,称为虚拟节点。具体做法可以在服务器ip或主机名的后⾯增加编号来实现。比如,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “节点1的ip#1”、“节点1的ip#2”、“节点1的ip#3”、“节点2的ip#1”、“节点2的ip#2”、“节点2的ip#3”的哈希值,于是形成六个虚拟节点,当客户端被路由到虚拟节点的时候其实是被路由到该虚拟节点所对应的真实节点
第 4 节 ⼿写实现⼀致性Hash算法
- 普通Hash算法
public static void main(String[] args) {
// 客户端ip地址
String[] ipAddressList = new String[]{"10.78.12.3","113.25.63.1","126.12.3.8"};
// 服务器数量
int serverCount = 5;
for (int i = 0; i < ipAddressList.length; i++) {
int hashCode = Math.abs(ipAddressList[i].hashCode());
int index = hashCode%serverCount;
System.out.println("客户端:" + ipAddressList[i] + " 被路由到服务器编号为:" + index);
}
}
- ⼀致性Hash算法实现(不含虚拟节点)
public static void main(String[] args) {
// 服务器节点
SortedMap<Integer, String> hashServerMap = new TreeMap<>();
// 服务器地址
String[] tomcatServers = new String[]{"123.111.0.0","123.101.3.1","111.20.35.2","123.98.26.3"};
// 将服务器放入节点
for (String tomcatServer : tomcatServers) {
int hashCode = Math.abs(tomcatServer.hashCode());
hashServerMap.put(hashCode, tomcatServer);
System.out.println("服务器"+tomcatServer+"绑定到节点"+hashCode);
}
// 客户端ip地址
String[] clients = new String[]{"10.78.12.3","113.25.63.1","126.12.3.8"};
// 服务器数量
for (String client : clients) {
int hashcode = Math.abs(client.hashCode());
// 顺时针,往前找。大于等于hashcode的服务器节点全找出来
SortedMap<Integer, String> integerStringSortedMap = hashServerMap.tailMap(hashcode);
Integer firstKey = null;
if(integerStringSortedMap.isEmpty()){
// 往前找不到,就拿第一台机器。如同一个闭环
firstKey = hashServerMap.firstKey();
} else {
firstKey = integerStringSortedMap.firstKey();
}
System.out.println("==========>>>>客户端:" + client+"("+ hashcode + ") 被路由到服务器:" + hashServerMap.get(firstKey));
}
}
- ⼀致性Hash算法实现(含虚拟节点)
public static void main(String[] args) {
// 服务器节点
SortedMap<Integer, String> hashServerMap = new TreeMap<>();
// 定义针对每个真实服务器虚拟出来⼏个节点
int virtualCount = 3;
// 服务器地址
String[] tomcatServers = new String[]{"123.111.0.0","123.101.3.1","111.20.35.2","123.98.26.3"};
// 将服务器放入节点
for (String tomcatServer : tomcatServers) {
int hashCode = Math.abs(tomcatServer.hashCode());
hashServerMap.put(hashCode, tomcatServer);
System.out.println("服务器"+tomcatServer+"绑定到节点"+hashCode);
for (int i = 0; i < virtualCount; i++) {
int virtualHashCode = Math.abs((tomcatServer+"#"+i).hashCode());
System.out.println("服务器"+tomcatServer+"绑虚拟节点"+virtualHashCode);
}
}
// 客户端ip地址
String[] clients = new String[]{"10.78.12.3","113.25.63.1","126.12.3.8"};
// 服务器数量
for (String client : clients) {
int hashcode = Math.abs(client.hashCode());
// 顺时针,往前找。大于等于hashcode的服务器节点全找出来
SortedMap<Integer, String> integerStringSortedMap = hashServerMap.tailMap(hashcode);
Integer firstKey = null;
if(integerStringSortedMap.isEmpty()){
// 往前找不到,就拿第一台机器。如同一个闭环
firstKey = hashServerMap.firstKey();
} else {
firstKey = integerStringSortedMap.firstKey();
}
System.out.println("==========>>>>客户端:" + client+"("+ hashcode + ") 被路由到服务器:" + hashServerMap.get(firstKey));
}
}
第 5 节 Nginx 配置⼀致性Hash负载均衡策略
ngx_http_upstream_consistent_hash 模块是⼀个负载均衡器,使⽤⼀个内部⼀致性hash算法来选择合适的后端节点。
该模块可以根据配置参数采取不同的⽅式将请求均匀映射到后端机器,
consistent_hash $remote_addr:可以根据客户端ip映射
consistent_hash $request_uri:根据客户端请求的uri映射
consistent_hash $args:根据客户端携带的参数进⾏映
ngx_http_upstream_consistent_hash 模块是⼀个第三⽅模块,需要我们下载安装后使⽤
1)github下载nginx⼀致性hash负载均衡模块 https://github.com/replay/ngx_http_consistent_hash
2)将下载的压缩包上传到nginx服务器,并解压
3)我们已经编译安装过nginx,此时进⼊当时nginx的源码⽬录,执⾏如下命令
./configure —add-module=/root/ngx_http_consistent_hash-master
make
make install
4)Nginx就可以使⽤啦,在nginx.conf⽂件中配置
小贴士
哈希表的数组容量为什么最好是质数
答:好的HASH函数需要把原始数据均匀地分布到HASH数组里
用取余做HASH函数的情况,原始数据不大会是真正的随机的,可能有某些规律,比如大部分是偶数,这时候如果HASH数组容量是偶数,容易使原始数据HASH后不会均匀分布。
比如 2 4 6 8 10 12这6个数,如果对 6 取余 得到 2 4 0 2 4 0 只会得到3种HASH值,冲突会很多
如果对 7 取余 得到 2 4 6 1 3 5 得到6种HASH值,没有冲突
同样地,如果数据都是3的倍数,而HASH数组容量是3的倍数,HASH后也容易有冲突
用一个质数则会减少冲突的概率,因为质数只能被1和自身整除。比如长度为11,取模,11以内就不会有索引冲突;但是如果长度是12,取模,12以内2、3、4、6就会出现索引冲突。关于装填因子
hash表中的数据项和表长的比例叫做装填因子。当装填因子不太大时,聚集分布就会比较连贯。此时hash表肯定是某一个部分产生大量聚集,另外一部分还很稀疏。聚集会降低hash表的性能。二次探测产生的聚集
二次探测可以消除在线性探测中产生的聚集问题,但是二次探测还是会产生一种更明确更细的聚集。二次聚集的产生是在二次探测的基础上产生的现象。例如N个数据经hash函数计算后都映射到到数组下标10,探测第二个数字需要以一步长,第三个数字需要以4步长为单位,第四个数字则需要以九为步长。好在二次探测并不常用,解决聚集问题还是有一种更好的办法:再哈希法。
第二部分:集群时钟同步问题
第 1 节 时钟不同步导致的问题
时钟此处指服务器时间,如果集群中各个服务器时钟不⼀致势必导致⼀系列问题,试想 “集群是各个服务器⼀起团队化作战,⼤家⼯作都不在⼀个点上,岂不乱了套!”
举⼀个例⼦,电商⽹站业务中,新增⼀条订单,那么势必会在订单表中增加了⼀条记录,该条记录中应该会有“下单时间”这样的字段,往往我们会在程序中获取当前系统时间插⼊到数据库或者直接从数据库服务器获取时间。那我们的订单⼦系统是集群化部署,或者我们的数据库也是分库分表的集群化部署,然⽽他们的系统时钟缺不⼀致,⽐如有⼀台服务器的时间是昨天,那么这个时候下单时间就成了昨天,那我们的数据将会混乱!如下
第 2 节 集群时钟同步配置
-
集群时钟同步思路
分布式集群中各个服务器节点都可以连接互联网
分布式集群中某⼀个服务器节点可以访问互联⽹或者所有节点都不能够访问互联⽹
操作⽅式:
选取集群中的⼀个服务器节点A(172.17.0.17)作为时间服务器(整个集群时间从这台服务器同步,如果这台服务器能够访问互联⽹,可以让这台服务器和⽹络时间保持同步,如果不能就⼿动设置⼀个时间)
1)⾸先设置好A的时间
把A配置为时间服务器(修改/etc/ntp.conf⽂件)
1、如果有 restrict default ignore,注释掉它
2、添加如下⼏⾏内容
# 放开局域⽹同步功能,172.17.0.0是你的局域⽹⽹段
restrict 172.17.0.0 mask 255.255.255.0 nomodify notrap
# local clock
server 127.127.1.0
fudge 127.127.1.0 stratum 10
3、重启⽣效并配置ntpd服务开机⾃启动
service ntpd restart
chkconfig ntpd on
2)集群中其他节点就可以定时从A服务器同步时间了
ntpdate 172.17.0.17
小贴士
- ⽹络时间同步命令
Linux有定时任务,crond,可以使⽤linux的定时任务,每隔10分钟执⾏⼀次ntpdate命令
#使⽤ ntpdate ⽹络时间同步命令
ntpdate -u ntp.api.bz #从⼀个时间服务器同步时间
第三部分:分布式ID解决方案
为什么需要分布式ID(分布式集群环境下的全局唯⼀ID)
解决方案
- UUID(可以⽤)
UUID 是指Universally Unique Identifier,翻译为中⽂是通⽤唯⼀识别码。产⽣重复 UUID 并造成错误的情况⾮常低,是故⼤可不必考虑此问题。Java中得到⼀个UUID,可以使⽤java.util包提供的⽅法
public static void main(String[] args) {
// ex:92bb507d-8d8c-4ca2-a0bf-b7fe24ec2e32
System.out.println(java.util.UUID.randomUUID().toString());
}
- 独⽴数据库的⾃增ID(不建议用)
⽐如A表分表为A1表和A2表,那么肯定不能让A1表和A2表的ID⾃增,那么ID怎么获取呢?我们可以单独的创建⼀个Mysql数据库,在这个数据库中创建⼀张表,这张表的ID设置为⾃增,其他地⽅需要全局唯⼀ID的时候,就模拟向这个Mysql数据库的这张表中模拟插⼊⼀条记录,此时ID会⾃增,然后我们可以通过Mysql的select last_insert_id() 获取到刚刚这张表中⾃增⽣成的ID.
⽐如,我们创建了⼀个数据库实例global_id_generator,在其中创建了⼀个数据表,表结构如下
-- ----------------------------
-- Table structure for DISTRIBUTE_ID
-- ----------------------------
DROP TABLE IF EXISTS `DISTRIBUTE_ID`;
CREATE TABLE `DISTRIBUTE_ID` (
`id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '主键',
`createtime` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
当分布式集群环境中哪个应⽤需要获取⼀个全局唯⼀的分布式ID的时候,就可以使⽤代码连接这个
数据库实例,执⾏如下sql语句即可。
insert into DISTRIBUTE_ID(createtime) values(NOW());
select LAST_INSERT_ID();
注意:
1)这⾥的createtime字段⽆实际意义,是为了随便插⼊⼀条数据以⾄于能够⾃增id。
2)使⽤独⽴的Mysql实例⽣成分布式id,虽然可⾏,但是性能和可靠性都不够好,因为你需要代码连接到数据库才能获取到id,性能⽆法保障,另外mysql数据库实例挂掉了,那么就⽆法获取分布式id了。
3)有⼀些开发者⼜针对上述的情况将⽤于⽣成分布式id的mysql数据库设计成了⼀个集群架构,那么其实这种⽅式现在基本不⽤,因为过于麻烦了。
-
SnowFlake 雪花算法(可以⽤,推荐)
雪花算法是Twitter推出的⼀个⽤于⽣成分布式ID的策略。
雪花算法是⼀个算法,基于这个算法可以⽣成ID,⽣成的ID是⼀个long型,那么在Java中⼀个long型是8个字节,算下来是64bit,如下是使⽤雪花算法⽣成的⼀个ID的⼆进制形式示意:
- 第一个部分,是 1 个 bit:0,这个是无意义的。
因为二进制里第一个 bit 为如果是 1,那么都是负数,但是我们生成的 id 都是正数,所以第一个 bit 统一都是 0。 - 第二个部分是 41 个 bit:表示的是时间戳。
41 bit 可以表示的数字多达 2^41 - 1,也就是可以标识 2 ^ 41 - 1 个毫秒值,换算成年就是表示 69 年的时间。 - 第三个部分是 5 个 bit:表示的是机房 id,10001。
最多代表 2 ^ 5 个机房(32 个机房)。 - 第四个部分是 5 个 bit:表示的是机器 id,1 1001。
每个机房里可以代表 2 ^ 5 个机器(32 台机器) - 第五个部分是 12 个 bit:表示的序号,就是某个机房某台机器上这一毫秒内同时生成的 id 的序号,0000 00000000。
12 bit 可以代表的最大正整数是 2 ^ 12 - 1 = 4096,也就是说可以用这个 12 bit 代表的数字来区分同一个毫秒内的 4096 个不同的 id。
java实现示例
public class IdWorker {
//因为二进制里第一个 bit 为如果是 1,那么都是负数,但是我们生成的 id 都是正数,所以第一个 bit 统一都是 0。
//机器ID 2进制5位 32位减掉1位 31个
private long workerId;
//机房ID 2进制5位 32位减掉1位 31个
private long datacenterId;
//代表一毫秒内生成的多个id的最新序号 12位 4096 -1 = 4095 个
private long sequence;
//设置一个时间初始值 2^41 - 1 差不多可以用69年
private long twepoch = 1585644268888L;
//5位的机器id
private long workerIdBits = 5L;
//5位的机房id
private long datacenterIdBits = 5L;
//每毫秒内产生的id数 2 的 12次方
private long sequenceBits = 12L;
// 这个是二进制运算,就是5 bit最多只能有31个数字,也就是说机器id最多只能是32以内
private long maxWorkerId = -1L ^ (-1L << workerIdBits);
// 这个是一个意思,就是5 bit最多只能有31个数字,机房id最多只能是32以内
private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private long workerIdShift = sequenceBits;
private long datacenterIdShift = sequenceBits + workerIdBits;
private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private long sequenceMask = -1L ^ (-1L << sequenceBits);
//记录产生时间毫秒数,判断是否是同1毫秒
private long lastTimestamp = -1L;
public long getWorkerId(){
return workerId;
}
public long getDatacenterId() {
return datacenterId;
}
public long getTimestamp() {
return System.currentTimeMillis();
}
public IdWorker(long workerId, long datacenterId, long sequence) {
// 检查机房id和机器id是否超过31 不能小于0
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(
String.format("worker Id can't be greater than %d or less than 0",maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(
String.format("datacenter Id can't be greater than %d or less than 0",maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
this.sequence = sequence;
}
// 这个是核心方法,通过调用nextId()方法,让当前这台机器上的snowflake算法程序生成一个全局唯一的id
public synchronized long nextId() {
// 这儿就是获取当前时间戳,单位是毫秒
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
System.err.printf(
"clock is moving backwards. Rejecting requests until %d.", lastTimestamp);
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds",
lastTimestamp - timestamp));
}
// 下面是说假设在同一个毫秒内,又发送了一个请求生成一个id
// 这个时候就得把seqence序号给递增1,最多就是4096
if (lastTimestamp == timestamp) {
// 这个意思是说一个毫秒内最多只能有4096个数字,无论你传递多少进来,
//这个位运算保证始终就是在4096这个范围内,避免你自己传递个sequence超过了4096这个范围
sequence = (sequence + 1) & sequenceMask;
//当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生ID
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0;
}
// 这儿记录一下最近一次生成id的时间戳,单位是毫秒
lastTimestamp = timestamp;
// 这儿就是最核心的二进制位运算操作,生成一个64bit的id
// 先将当前时间戳左移,放到41 bit那儿;将机房id左移放到5 bit那儿;将机器id左移放到5 bit那儿;将序号放最后12 bit
// 最后拼接起来成一个64 bit的二进制数字,转换成10进制就是个long型
return ((timestamp - twepoch) << timestampLeftShift) |
(datacenterId << datacenterIdShift) |
(workerId << workerIdShift) | sequence;
}
/**
* 当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生ID
* @param lastTimestamp
* @return
*/
private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
//获取当前时间戳
private long timeGen(){
return System.currentTimeMillis();
}
/**
* main 测试类
* @param args
*/
public static void main(String[] args) {
for (int i = 0; i < 22; i++) {
System.out.println(worker.nextId());
}
}
}
另外,⼀切互联⽹公司也基于上述的⽅案封装了⼀些分布式ID⽣成器,⽐如滴滴的tinyid(基于数据库实现)、百度的uidgenerator(基于SnowFlake)和美团的leaf(基于数据库和SnowFlake)等。
- 借助Redis的Incr命令获取全局唯⼀ID(推荐)
Redis Incr 命令将 key 中储存的数字值增⼀。如果 key 不存在,那么 key 的值会先被初始化为 0,然后再执⾏ INCR 操作。
127.0.0.1:6379> incr id
(integer) 1
127.0.0.1:6379> incr id
(integer) 2
第四部分:分布式调度问题(定时任务的分布式)
调度—>定时任务,分布式调度—>在分布式集群环境下定时任务这件事
Elastic-job(当当⽹开源的分布式调度框架)
第 1 节 定时任务的场景
定时任务形式:每隔⼀定时间/特定某⼀时刻执⾏
例如:
- 订单审核、出库
- 订单超时⾃动取消、⽀付退款
- 礼券同步、⽣成、发放作业
- 物流信息推送、抓取作业、退换货处理作业
- 数据积压监控、⽇志监控、服务可⽤性探测作业
- 定时备份数据
- ⾦融系统每天的定时结算
- 数据归档、清理作业
- 报表、离线数据分析作业
第 2 节 什么是分布式调度
什么是分布式任务调度?有两层含义
1)运⾏在分布式集群环境下的调度任务。同⼀个定时任务程序部署多份,只应该有⼀个定时任务在执⾏。
2)分布式调度—>定时任务的分布式—>定时任务的拆分(即为把⼀个⼤的作业任务拆分为多个⼩的作业任务,同时执⾏)
第 3 节 定时任务与消息队列的区别
共同点
- 异步处理
⽐如注册、下单事件 - 应⽤解耦
不管定时任务作业还是MQ都可以作为两个应⽤之间的⻮轮实现应⽤解耦,这个⻮轮可以中转数据,当然单体服务不需要考虑这些,服务拆分的时候往往都会考虑 - 流量削峰
双⼗⼀的时候,任务作业和MQ都可以⽤来扛流量,后端系统根据服务能⼒定时处理订单或者从MQ抓取订单抓取到⼀个订单到来事件的话触发处理,对于前端⽤户来说看到的结果是已经下单成功了,下单是不受任何影响的
不同点
- 定时任务作业是时间驱动,⽽MQ是事件驱动;
时间驱动是不可代替的,⽐如⾦融系统每⽇的利息结算,不是说利息来⼀条(利息到来事件)就算⼀下,⽽往往是通过定时任务批量计算;
所以,定时任务作业更倾向于批处理,MQ倾向于逐条处理;
第 4 节 定时任务的实现⽅式
定时任务的实现⽅式有多种。早期没有定时任务框架的时候,我们会使⽤JDK中的Timer机制和多线程机制(Runnable+线程休眠)来实现定时或者间隔⼀段时间执⾏某⼀段程序;后来有了定时任务框架,比如⼤名鼎鼎的Quartz任务调度框架,使⽤时间表达式(包括:秒、分、时、⽇、周、年)配置某⼀个任务什么时间去执行:
- 引入依赖
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
- 定义⼀个job,需实现Job接⼝
public class DemoJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("我是⼀个定时任务逻辑");
}
}
- 定时任务作业主调度程序
public class QuartzMain {
// 创建作业任务调度器(类似于公交调度站)
public static Scheduler createScheduler() throws SchedulerException {
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
return scheduler;
}
// 创建⼀个作业任务(类似于⼀辆公交⻋)
public static JobDetail createJob() {
JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class);
jobBuilder.withIdentity("jobName", "myJob");
JobDetail jobDetail = jobBuilder.build();
return jobDetail;
}
/**
* 创建作业任务时间触发器(类似于公交⻋出⻋时间表)
* cron表达式由七个位置组成,空格分隔
* 1、Seconds(秒) 0~59
* 2、Minutes(分) 0~59
* 3、Hours(⼩时) 0~23
* 4、Day of Month(天)1~31,注意有的⽉份不⾜31天
* 5、Month(⽉) 0~11,或者
* JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC
* 6、Day of Week(周) 1~7,1=SUN或者 SUN,MON,TUE,WEB,THU,FRI,SAT
* 7、Year(年)1970~2099 可选项
* 示例:
* 0 0 11 * * ? 每天的11点触发执⾏⼀次
* 0 30 10 1 * ? 每⽉1号上午10点半触发执⾏⼀次
*/
public static Trigger createTrigger() {
// 创建时间触发器,按⽇历调度
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName", "myTrigger")
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ? "))
.build();
// 创建触发器,按简单间隔调度
/*SimpleTrigger trigger1 = TriggerBuilder.newTrigger()
.withIdentity("triggerName","myTrigger")
.startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(3)
.repeatForever())
.build();*/
return trigger;
}
// 定时任务作业主调度程序
public static void main(String[] args) throws SchedulerException {
// 创建⼀个作业任务调度器(类似于公交调度站)
Scheduler scheduler = QuartzMain.createScheduler();
// 创建⼀个作业任务(类似于⼀辆公交⻋)
JobDetail job = QuartzMain.createJob();
// 创建⼀个作业任务时间触发器(类似于公交⻋出⻋时间表)
Trigger trigger = QuartzMain.createTrigger();
// 使⽤调度器按照时间触发器执⾏这个作业任务
scheduler.scheduleJob(job, trigger);
scheduler.start();
}
}
以上,是回顾⼀下任务调度框架Quartz的⼤致⽤法,那么在分布式架构环境中使⽤Quartz已经不能更好的满⾜我们需求,我们可以使⽤专业的分布式调度框架,这⾥我们推荐使⽤Elastic-job和XXL-JOB。
第 5 节 分布式调度框架Elastic-Job
5.1 Elastic-Job介绍
Elastic-Job是当当⽹开源的⼀个分布式调度解决⽅案,基于Quartz⼆次开发的,由两个相互独⽴的⼦项⽬Elastic-Job-Lite和Elastic-Job-Cloud组成。我们要学习的是 Elastic-Job-Lite,它定位为轻量级无中心化解决⽅案,使用Jar包的形式提供分布式任务的协调服务,而Elastic-Job-Cloud⼦项⽬需要结合Mesos以及Docker在云环境下使用。
Elastic-Job的github地址:https://github.com/elasticjob
主要功能介绍
- 弹性调度
- 支持任务在分布式场景下的分片和高可用
- 能够水平扩展任务的吞吐量和执行效率
- 任务处理能力随资源配备弹性伸缩
- 资源分配
- 在适合的时间将适合的资源分配给任务并使其生效
- 相同任务聚合至相同的执行器统一处理
- 动态调配追加资源至新分配的任务
- 作业治理
- 失效转移
- 错过作业重新执行
- 自诊断修复
- 作业依赖(TODO)
- 基于有向无环图(DAG)的作业间依赖
- 基于有向无环图(DAG)的作业分片间依赖
- 作业开放生态
- 可扩展的作业类型统一接口
- 丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
- 易于对接业务作业,能够与 Spring 依赖注入无缝整合
- 可视化管控端
- 作业管控端
- 作业执行历史数据追踪
- 注册中心管理
快速入门
- 引入依赖
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>${latest.release.version}</version>
</dependency>
- 作业开发
public class MyJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
- 作业配置
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyJob", 3).cron("0/5 * * * * ?").build();
- 作业调度
public class MyJobDemo {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "my-job"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
// ...
}
}
测试
1)可先启动⼀个进程,然后再启动⼀个进程(两个进程模拟分布式环境下,通⼀个定时任务部署了两份在⼯作)。
2)两个进程逐个启动,观察现象。
3)关闭其中执⾏的进程,观察现象。
Leader节点选举机制
每个Elastic-Job的任务执⾏实例App作为Zookeeper的客户端来操作ZooKeeper的znode
(1)多个实例同时创建/leader节点。
(2)/leader节点只能创建⼀个,后创建的会失败,创建成功的实例会被选为leader节点,执⾏任务。
5.4 Elastic-Job-Lite轻量级去中心化的特点
如何理解轻量级和去中⼼化?
5.5 任务分片
⼀个⼤的⾮常耗时的作业Job,比如:⼀次要处理⼀亿的数据,那这⼀亿的数据存储在数据库中,如果用⼀个作业节点处理⼀亿数据要很久,在互联⽹领域是不太能接受的,互联⽹领域更希望机器的增加去横向扩展处理能⼒。所以,ElasticJob可以把作业分为多个的task(每⼀个task就是⼀个任务分片),每⼀个task交给具体的⼀个机器实例去处理(⼀个机器实例是可以处理多个task的),但是具体每个task执行什么逻辑由我们自己来指定。
Strategy策略定义这些分⽚项怎么去分配到各个机器上去,默认是平均去分,可以定制, 比如某⼀个机器负载 比较高或者预配置比较高,那么就可以写策略。分片和作业本身是通过⼀个注册中心协调的,因为在分布式环境下,状态数据肯定集中到⼀点,才可以在分布式中沟通。
5.6 弹性扩容
新增加⼀个运⾏实例app3,它会⾃动注册到注册中⼼,注册中⼼发现新的服务上线,注册中⼼会通知ElasticJob 进⾏重新分⽚。
注意:
1)分片项也是⼀个JOB配置,修改配置,重新分片,在下⼀次定时运行之前会重新调用分片算法,那么这个分片算法的结果就是:哪台机器运⾏哪⼀个⼀片,这个结果存储到zk中的,主节点会把分片给分好放到注册中心去,然后执行节点从注册中心获取信息(执行节点在定时任务开启的时候获取相应的分片)。
2)如果所有的节点挂掉值剩下⼀个节点,所有分片都会指向剩下的⼀个节点,这也是ElasticJob的高可用。
第五部分:Session共享(一致性)问题
Session共享及Session保持或者叫做Session⼀致性
第 1 节 Session问题原因分析
出现这个问题的原因,从根本上来说是因为Http协议是⽆状态的协议。客户端和服务端在某次会话中产⽣的数据不会被保留下来,所以第⼆次请求服务端⽆法认识到你曾经来过, Http为什么要设计为⽆状态协议?早期都是静态⻚⾯⽆所谓有⽆状态,后来有动态的内容更丰富,就需要有状态,出现了两种⽤于保持Http状态的技术,那就是Cookie和Session。⽽出现上述不停让登录的问题,分析如下图:
场景:nginx默认轮询策略
第 2 节 解决Session⼀致性的⽅案
-
Nginx的 IP_Hash 策略(可以使⽤)
同⼀个客户端IP的请求都会被路由到同⼀个⽬标服务器,也叫做会话粘滞- 优点:
配置简单,不⼊侵应⽤,不需要额外修改代码 - 缺点:
服务器重启Session丢失
存在单点负载⾼的⻛险
单点故障问题
- 优点:
-
Session复制(不推荐)
多个tomcat之间通过修改配置⽂件,达到Session之间的复制- 优点:
不⼊侵应⽤
便于服务器⽔平扩展
能适应各种负载均衡策略
服务器重启或者宕机不会造成Session丢失 - 缺点:
性能低
内存消耗
不能存储太多数据,否则数据越多越影响性能
延迟性
- 优点:
-
Session共享,Session集中存储(推荐)
Session的本质就是缓存,那Session数据为什么不交给专业的缓存中间件呢?⽐如Redis
- 优点:
能适应各种负载均衡策略
服务器重启或者宕机不会造成Session丢失
扩展能⼒强
适合⼤集群数量使⽤ - 缺点:
对应⽤有⼊侵,引⼊了和Redis的交互代码
- 优点: