分布式解决方案
一、分布式锁
1.1 分布式锁介绍
由于传统的锁(synchronized、Lock)是基于Tomacat服务器内部对象的,搭建了集群之后两个Tomcat服务器就没办法获取到同一个锁资源(对象),导致锁失效。所以要使用分布式锁来处理。
1.2 分布式锁解决方案
1.2.1 搭建环境
Step1:创建SpringBoot工程
Step2:编写抢购业务
@RestController
public class SecondKillController {
// 1. 准备商品库存(模拟数据库)
public static Map<String, Integer> itemStock = new HashMap<>();
// 2. 准备商品订单
public static Map<String, Integer> itemOrder = new HashMap<>();
// 3. 使用静态代码块来初始化库存和订单
static {
itemStock.put("牙刷", 10000);
itemOrder.put("牙刷", 0);
}
@RequestMapping("/kill")
public String kill(String item) {
// 4. 减库存
Integer stockNum = itemStock.get(item);
if (stock <= 0) {
return "商品库存不足";
}
Thread.sleep(100);
itemStock.put(item, stockNum - 1);
// 5. 创建订单
Thread.sleep(100);
itemOrder.put(item, itemOrder.get(item) + 1);
// 6. 返回信息
return "抢购成功!!" + item + ":剩余库存为:" + itemStock.get(item) + ";订单数为:" + itemOrder.get(item);
}
}
Step3:下载ab压力测试,可以指定并发数和请求数
到apache官网中下载。下载完成后解压,用cmd进入到解压路径
ab -n 请求数 -c 并发数 访问的路径
Step4:测试,会发现有超卖的问题,即库存数加订单数大于10000
1.2.2 Zookeeper实现分布式锁的原理
因为Zookeeper使用的是临时有序节点,当客户端断开与Zookeeper的连接之后就会自动删除临时节点,不会出现业务代码出现异常后无法释放锁的情况
1.2.3 Zookeeper实现分布式锁
Step1:导入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
Step2:编写配置类,用于连接Zookeeper
@Configuration
public class ZkConfig {
@Bean
public CuratorFramework getCf() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 2); // 指定重试策略(当前是每3000ms重试1次, 一共重试2次)
CuratorFramework cf = CuratorFrameworkFactory.builder()
// 指定要连接的所有Zookeeper节点
.connectString("192.168.199.109:2821, 192.168.199.109:2822, 192.168.199.109:2823")
.retryPolicy(retryPolicy)
.build();
cf.start(); // 开启后才能连接
return cf;
}
}
Step3:在抢购业务代码中加上分布式锁
@RestController
public class SecondKillController {
// 1. 准备商品库存(模拟数据库)
public static Map<String, Integer> itemStock = new HashMap<>();
// 2. 准备商品订单
public static Map<String, Integer> itemOrder = new HashMap<>();
// 3. 使用静态代码块来初始化库存和订单
static {
itemStock.put("牙刷", 10000);
itemOrder.put("牙刷", 0);
}
@Autowired
private CuratorFramework cf; // 注入Zookeeper连接
@RequestMapping("/kill")
public String kill(String item) {
// Zookeeper中提供的实现分布式锁的API, 创建时指定Zookeeper连接和要在哪个节点下创建临时有序节点
// 指定的节点不一定要存在, 如果不存在Zookeeper会先帮我们创建出节点
InterProcessMutex lock = new InterProcessMutex(cf, "/lock");
// 加锁......
lock.acquire();
// 4. 减库存
Integer stockNum = itemStock.get(item);
if (stock <= 0) {
return "商品库存不足";
}
Thread.sleep(100);
itemStock.put(item, stockNum - 1);
// 5. 创建订单
Thread.sleep(100);
itemOrder.put(item, itemOrder.get(item) + 1);
// 释放锁......
lock.release();
// 6. 返回信息
return "抢购成功!!" + item + ":剩余库存为:" + itemStock.get(item) + ";订单数为:" + itemOrder.get(item);
}
}
Step4:再次使用ab压力测试,发现超卖问题得到解决
补充:lock还有一种指定超时等待的加锁方式
@RequestMapping("/kill")
public String kill(String item) {
InterProcessMutex lock = new InterProcessMutex(cf, "/lock");
// 加锁......
if (lock.acquire(1, TimeUnit.SECONDS)) { // 指定排队多久就放弃获得锁资源
/**
* 减库存, 加订单的业务
*/
} else {
return "没有抢到商品";
}
}
1.2.4 Redis实现分布式锁的原理
使用Redis的好处是轻量级
1.2.5 Redis实现分布式锁
Step1:导入依赖和修改配置文件,连接Redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
redis:
hots: 192.168.199.109
port: 6379
Step2:编写工具类,手动编写Redis加锁和释放锁的方式
@Component
public class RedisLockUtil {
@Autowired
private StringRedisTemplate template;
public boolean lock(String key, String value, int second) {\
// 使用setnxex方法, 在创建key的时候指定上key的生存时间
return template.opsForValue().setIfAbsent(key, value, second, TimeUnit.SECONDS);
}
public void unlock(String key) {
// 删除key即释放锁
template.delete(key);
}
}
Step3:在抢购业务代码中加上Redis实现的分布式锁
@RestController
public class SecondKillController {
// 1. 准备商品库存(模拟数据库)
public static Map<String, Integer> itemStock = new HashMap<>();
// 2. 准备商品订单
public static Map<String, Integer> itemOrder = new HashMap<>();
// 3. 使用静态代码块来初始化库存和订单
static {
itemStock.put("牙刷", 10000);
itemOrder.put("牙刷", 0);
}
@Autowired
private RedisLockUtil redisLock; // 注入Redis实现分布式锁的工具类
@RequestMapping("/kill")
public String kill(String item) {
// 加锁...... value随便写就可以
if (redisLock.lock(item, System.currentTimeMillis() + "", 1) {
// 4. 减库存
Integer stockNum = itemStock.get(item);
if (stock <= 0) {
return "商品库存不足";
}
Thread.sleep(100);
itemStock.put(item, stockNum - 1);
// 5. 创建订单
Thread.sleep(100);
itemOrder.put(item, itemOrder.get(item) + 1);
// 释放锁......
redisLock.unlock(item);
// 6. 返回信息
return "抢购成功!!" + item + ":剩余库存为:" + itemStock.get(item) + ";订单数为:" + itemOrder.get(item);
} else {
return "没有抢到商品";
}
}
}
Step4:使用ab压力测试
两者比较:Redis执行会快一些,因为Redis实现的原理是当key已经存在就马上放弃获取锁资源;而Zookeeper会排队等待获取锁资源,并且达到了我们指定的等待时间后才放弃获取
二、分布式任务
2.1 分布式任务介绍
存在的问题:
- 如果服务中存在定时任务(Quartz),在搭建集群之后,集群中的每个服务都会存在定时任务,而定时任务是到时间就会自动执行,所以分布式会导致定时任务重复执行的问题
- 如果服务中有一个比较大的定时任务,需要我们手动进行拆分成多个小任务,那么我们应该如何去分配指定的服务器来执行任务
- 当执行定时任务的服务出现故障时,需要如何处理
2.2 分布式任务解决方案
2.2.1 Elastic-Job介绍
官网:http://elasticjob.io/index_zh.html
是由当当网基于Quartz + Zookeeper的二次开发产品,可以帮助我们解决分布式任务中存在的问题:
- 基于Zookeeper的分布式锁,保证只有一个服务去执行定时任务
- 基于Zookeeper实现了注册中心,自动帮助我们去调度指定的服务来执行拆分好的定时任务
- 基于Zookeeper实现了注册中心,基于心跳的方式,自动去监测服务的健康情况
2.2.2 Elastic-Job实现分布式任务
Step1:创建SpringBoot工程作为服务
Step2:导入依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
Step3:创建Elastic-Job配置类,配置Zookeeper实现的注册中心的信息
@Configuration
public class ElasticJobConfig {
/**
* 注册中心
*/
@Bean
public CoordinatorRegistryCenter center() {
// ZookeeperConfiguration里第一个参数是指定连接Zookeeper的信息, 第二个参数是指定创建elastic-job使用的znode名称
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
new ZookeeperConfiguration(
"192.168.199.109:2181,192.168.199.109:2182,192.168.199.109:2183",
"elastic-job-demo"));
// 初始化注册中心后再返回
regCenter.init();
return regCenter;
}
}
Step4:在服务中创建一个定时任务(Quartz)
@Component
public calss MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
// 对任务进行拆分
case 0:
System.out.println("执行0任务!!");
case 1:
System.out.println("执行1任务!!");
case 2:
System.out.println("执行2任务!!");
// case n: ...
}
}
}
Step5:在配置类中创建调度器,配置执行定时任务的周期,并开始调度任务
/**
* 调度器(传入Job和注册中心)
*/
@Bean
public SpringJobScheduler scheduler(MyElasticJob job, CoordinatorRegistryCenter center) {
// 1. 定义作业配置的核心信息(包含触发器)
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration
// "名字随意", "Cron表达式", 指定当前定时任务被拆分成了几个小任务
.newBuilder("demoSimpleJob", "*/2 * * * * ?", 3)
.shardingItemParameters("0=A,1=B,2=C").build();
// 2. 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig)
.build();
// 3. 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration
.newBuilder(simpleJobConfig).build();
// 4. 定义SpringJobScheduler
SpringJobScheduler scheduler = new SpringJobScheduler(job, center, simpleJobRootConfig);
// 调度器初始化后再返回
scheduler.init()
return scheduler;
}
Step6:复制一份该服务的工程,修改端口号,搭建该服务的集群
Step7:测试。将两个工程都启动,可以看到Elastic-Job自动帮我们将拆分好的任务分配给不同的工程来执行;当我们突然关闭其中一个工程之后,所有任务都会自动交给另一个工程执行。
三、分布式事务
3.1 分布式事务介绍
-
执行一个功能时,会涉及到调用多个服务,多个服务需要分别去操作数据库。传统的事务操作是根据Connection对象来决定的,在搭建了分布式之后,两个服务肯定拿不到同一个Connection对象,所以传统的事务操作失效。
-
当后期数据量增大的时候,如果涉及到数据库的分库分表,一个服务要调用多个数据库,肯定也不是同一个Connection对象,所以传统的事务操作同样会失效
3.2 Base理论
CAP理论:C:一致性,A:可用性,P分区容错性。分布式环境下只能三者取其二,并且P是必须要满足的。
- Eureka:AP,保证了可用性,舍弃了一致性。
- Zookeeper:CP,每一个节点必须能够找到Master之后才能对外提供服务,保证了一致性,舍弃了可用性。
Base理论:BA:基本可用,S:中间状态,E:最终一致性。
- Base理论是基于CAP理论演化而来的,是对CAP理论中的一致性和可用性的一个权衡的结果。
- 核心思想:我们无法做到强一致性,但是每一个应用都可以根据自身的业务特点,采用一些适当的方式来权衡,最终达到一致性。
- BA:分布式系统中一些服务出现了问题,允许损失掉部分服务的可用性,保证核心功能的高可用。比如:双十一并发量特别大,就会允许修改配送地址等一些服务损失可用性,但是要保证下订单的服务是高可用的。
- S:允许整个系统存在一个中间状态,并不会影响正常的去使用整个系统,允许数据的同步存在延迟。比如:双十一并发量特别大,不可能直接操作数据库,都是操作缓存,也不能保证缓存中的数据和数据库中的数据时刻一致,它们的同步会存在一个延迟。
- E:整个系统中的所有数据经过一定的时间后,最终能够达到一致的状态,不需要保证数据的强一致性(即时刻保持一致)。
3.3 分布式事务解决方案
事务分为两种:柔性事务(基于Base理论),刚性事务(传统的ACID)
3.3.1 2PC(两段式提交)
2PC两段式提交分为两个阶段:
- 准备阶段。参与者需要开启事务,执行SQL,保证数据库的日志中已经存在相应的数据,执行SQL成功后参与者会向TransactionManager发送准备OK的通知。
- 提交阶段。当TransactionManager收到了所有参与者的通知之后,向所有参与者发送Commit请求,通知所有的参与者一起提交事务。
两段式提交存在的问题:
- 执行的性能很低,一般是传统事务的10倍以上。
- TransactionManager是没有超时机制的,如果其中一个参与者出现了问题,一直没有给TransactionManager发送OK信号,则TransactionManager也无法向所有的参与者发送Commit请求。
- TransactionManager存在单点故障问题,如果TransactionManager宕机,则整个两段式提交不可用。
3.3.2 3PC(三段式提交)
三段式提交在二段式提交的基础上引入了超时机制,即参与者在长时间没有响应过后,TransactionManager会让所有的参与者回滚事务;并且在二段提交的基础上又多了一个步骤,在提交事务之前再询问一次参与者数据库中的日志信息是否已经完善。
3.3.3 TCC机制
TCC:
T:Try。尝试去执行具体业务代码(锁定数据库)。(比如我们看到的 "下单中 ing")
C:Comfirm。Try成功了,执行修改数据库。
C:Cancel。Try失败了,回退数据库。
3.3.4 RabbitMQ分布式事务
RabbitMQ在发送消息时有comfirm机制,可以保证消息一定能发送到RabbitMQ中;同时在RabbitMQ的消费者中有手动ACK机制,可以保证消费者一定能消费到RabbitMQ中的消息。(Base理论中的 E:最终一致性)
3.3.5 LCN实现分布式事务
基于3PC三段式提交和TCC实现的
LCN的官网:http://txlcn.org/zh-cn/docs/start.html
Step1:创建SpringBoot工程,用作TransactionManager
Step1.1:添加依赖
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tm</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
Step1.2:编写yml配置文件
server:
port: 8080
spring:
# 连接MySQL
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///lcn?serverTimezone=UTC # 库名是"lcn"
username: root
password: 1234
# 连接Redis
redis:
host: 192.168.199.109
port: 6379
# 指定当前工程作为TransactionManager的端口号
tx-lcn:
manager:
port: 8070
Step1.3:在启动类中添加注解 @EnableTransactionManagerServer
Step1.4:在我们所连接的数据库中创建TransactionManager的信息记录表(LCN的官网有创建方式)
Step2:创建SpringBoot工程,用作参与者,订单服务(调用商品服务,操作订单,必须商品服务中减库存成功才能创建订单)
Step2.1:导入依赖
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tc</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-txmsg-netty</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
Step2.2:编写参与者的yml配置文件
server:
port: 8081 # 注意: 端口号不要冲突
spring:
# 连接MySQL
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///lcn?serverTimezone=UTC # 库名是"lcn"
username: root
password: 1234
# 连接Redis
redis:
host: 192.168.199.109
port: 6379
# 指定要连接的TransactionManager的端口号
tx-lcn:
client: # 表明当前服务作为参与者
port: 8070
Step2.3:在启动类中添加 @EnableDistributedTransaction注解
Step2.4:编写Controller、Service、Mapper来调用商品服务,并在订单表中创建订单
注意:在Service的方法上添加 @Transactional 和 @LcnTransaction 注解才能使LCN生效
@Override
@Transactional
@LcnTransaction
public void createOrder() {
restTemplate.getForObject("http://localhost:8082/item", String.class);
int i = 1/0; // 加入异常
orderMapper.add();
}
Step3:创建SpringBoot工程,用作参与者,商品服务(操作商品库存订单)
具体配置与Step2相同,只是Controller、Service、Mapper中实现的具体功能不同
Step4:测试。访问订单服务的Controller方法,查看数据库,是否数据库中的订单表和商品表都回滚