在传统业务中我们的数据库都是单机的,数据库本身就提供了ACID。但随着业务的增长我们需要分库分表将数据放到多个数据库中,这个时候单个库的事务就无法满足需求了,就需要理解和掌握分布式事务了。
什么是分布式事务
分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
打个比方,一个电商系统的付款服务和订单的服务是两个应用,并且付款表和订单表不在一个数据库。那么如何保证用户付完款之后就能给用户创建好订单,并且创建订单失败就会给用户退款。而且在一个大型电商系统里每一个数据库都不止一个节点,又如何保证每个节点的数据都是一致的呢。
这个时候加州大学的计算机科学家 Eric Brewer 提出了CAP定理来奠定了分布式系统设计的基础。
CAP理论
- C-Consistent,操作成功后,所有节点看到的数据都是一样的。对于数据分布在不同节点上的数据上来说,如果在某个节点更新了数据,那么在其他节点如果都能读取到这个最新的数据,那么就称为强一致,如果有某个节点没有读取到,那就是分布式不一致。
- A-Availability,可用性,服务全部一致可用,在规定时间内完成合理的响应。可用性的两个关键一个是合理的时间,一个是合理的响应。合理的时间指的是请求不能无限被阻塞,应该在合理的时间给出返回。合理的响应指的是系统应该明确返回结果并且结果是正确的。
- P-Partition tolerance,分区容错性。指分布式系统在遇到某个节点故障后,仍能够对外提供服务。也就是说在分布式集群中某一个节点出现故障,但是整个集群都正常的。
CAP理论有一个很关键的定理:一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。也就是说一个分布式系统是不可能同时满足强一致、高可用、分区容错性的,通常系统设计的时候会牺牲某一个指标来实现另外两个指标。
CA without P:如果不要求P,则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的。传统的关系型数据库RDBMS:Oracle、MySQL就是CA。
CP without A:如果不要求A,相当于每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统。设计成CP的系统其实不少,最典型的就是分布式数据库,如Zookeeper、HBase等。
AP wihtout C:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。但系统依然能正常提供服务。
需要说一下在分布式系统中网络无法100%可靠,分区其实是一个必然现象,如果我们选择了CA而放弃了P,那么当发生分区现象时,为了保证一致性,这个时候必须拒绝请求,但是A又不允许,所以分布式系统理论上不可能选择CA架构,只能选择CP或者AP架构。
BASE理论
eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论。
BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(Strong Consistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency)。BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的缩写。
- Basically Available(基本可用): 指分布式系统在出现不可预知故障的时候,允许损失部分可用性。比如电商大促,服务降级的体现。
- Soft-state(软状态):指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性。
- Eventually consistent(最终一致):强调的是所有的数据更新操作,在经过一段时间的同步之后,最终都能够达到一个一致的状态。
在BASE中用软状态和最终一致,保证了延迟后的一致性。BASE和 ACID 是相反的,它完全不同于ACID的强一致性模型,而是通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。
分布式事务解决方案
有了上面分布式理论基础之后就有了常用的几种解决方案,在说到解决方案之前不得不提醒一下:在业务规模不大的情况下设计系统尽量去规避分布式事务问题。比如电商系统的早期阶段,单机架构就能满足业务就不要用微服务的架构,或者说微服务架构早期可以把服务拆分的更粗一些来避免跨库业务。因为无论是那种解决方案都要花费不少的时间成本。
通过XA协议实现两阶段提交
XA是由X/Open组织提出的分布式事务规范,整体是由一个事务管理器(TM)和多个资源管理器(RM)组成,RM一般就是指我们的数据库而TM相当于程序中的数据源。整个事务过程分为两个阶段提交,prepare和commit。
- 第一阶段TM要求所有的RM进行数据库预提交操作,所有RM都OK了才会进入第二阶段,只要一个RM返回失败就会全部回滚并终止。
- 第二阶段TM要求所有的RM提交数据,要注意的是后面commit如果出错的话并不会回滚已经提交的commit。
MySQL从5.0.3开始支持XA分布式事务,且只有InnoDB存储引擎支持。在我们J2EE项目中可用使用Atomikos(充当TM)来做XA分布式事务。这里用springboot举例:
- 引入atomikos包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
- 配置数据源和事务管理器,需要注意的是数据源要使用MysqlXADataSource,事务管理器需要使用JtaTransactionManager
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.sql.DataSource;
import javax.transaction.UserTransaction;
@Configuration
public class DBConfig {
@Bean("db20")
public DataSource db20(){
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUser("root");
mysqlXADataSource.setPassword("123456");
mysqlXADataSource.setUrl("jdbc:mysql://192.168.3.20:3306/shard_order?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8");
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
return atomikosDataSourceBean;
}
@Bean("db21")
public DataSource db21(){
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUser("root");
mysqlXADataSource.setPassword("123456");
mysqlXADataSource.setUrl("jdbc:mysql://192.168.3.21:3306/shard_order?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8");
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
return atomikosDataSourceBean;
}
@Bean("xaTransaction")
public JtaTransactionManager jtaTransactionManager(){
UserTransaction userTransaction = new UserTransactionImp();
UserTransactionManager userTransactionManager = new UserTransactionManager();
return new JtaTransactionManager(userTransaction,userTransactionManager);
}
}
- 在service层调用
@Service
public class OrderService {
@Transactional(transactionManager = "xaTransaction")
public void insertTest(@Qualifier("db20") DataSource dataSource20,
@Qualifier("db21")DataSource dataSource21){
JdbcTemplate jdbc195 = new JdbcTemplate(dataSource20);
String sql1 = "INSERT INTO `order_info_1`(`id`, `order_amount`, `order_status`, `user_id`) VALUES (5, 5.00, 1, 4)";
int i = jdbc195.update(sql1);
System.out.println("**************影响的行数:"+i);
JdbcTemplate jdbc197 = new JdbcTemplate(dataSource21);
String sql2 = "INSERT INTO `order_info_1`(`id`, `order_amount`, `order_status`, `user_id`) VALUES (6, 6.00, 2, 4);";
int i1 = jdbc197.update(sql2);
System.out.println("**************影响的行数:" + i1);
}
}
这样就写好了一个分布式事务,上面两个sql只要其中一个失败就会回滚。
可以看到使用XA协议的方案做分布式事务非常简单,对代码完全没有侵入性。而且主流的数据库和数据库中间件Sharding-JDBC、MyCat等都默认支持XA协议,但XA协议有一个缺点就是性能比较低,通常会比本地事务性能差十倍。
通过XA协议实现两阶段提交其实是CAP里的CA,它的特点是强一致性,但是可以通过Mysql等数据库的主从复制来满足P。XA的适用场景是一些并发量不是很高的业务,在业务从小规模到中等规模过度的时候可以选择XA来做分布式事务。
TCC事务补偿机制
TCC分别对应Try、Confirm和Cancel三种操作实现的。最早是由Pat Helland于2007年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。
- Try阶段:尝试执行,完成所有业务检查(一致性),预留必须业务资源(准隔离性)
- Confirm阶段:确认执行真正执行业务,不作任何业务检查,只使用Try阶段预留的业务资源,Confirm操作满足幂等性。要求具备幂等设计,Confirm失败后需要进行重试。
- Cancel阶段:取消执行,释放Try阶段预留的业务资源。
使用方法其实就是针对每一个操作都需要提前注册一个与其对应的补偿操作,在执行失败后按照失败节点向前补偿,撤销之前的操作。
举个例子:A和B两家银行,一个用户从A到B进行转账,A减1000成功,B加1000成功整个事务结束,如果B没有加成功则通知A执行加1000进行补偿操作。
代码实现如下:
@Service
public class TransferAccountService {
@Autowired
@Qualifier("db20JdbcTemplate")
private JdbcTemplate db20JdbcTemplate;
@Autowired
@Qualifier("db21JdbcTemplate")
private JdbcTemplate db21JdbcTemplate;
@Transactional(transactionManager = "db20TransactionManager")
public void transfer() {
//银行A开始转账
int ares = db20JdbcTemplate.update("update user_account set account=account-1000 where user_id=2");
int res = 0;
try {
//第一步try 验证银行A是否转账成功
if (ares == 0) {
return;
}
// int i = 1/0; // 运行点A发生异常
//第二步Confirm 银行B开始转账
res = db21JdbcTemplate.update("update user_account set account=account+1000 where user_id=2");
//int i = 1/0; // 运行点B发生异常
} catch (Exception e) {
e.printStackTrace();
//第三步Cancel 一旦转账失败进行补偿
//这个补偿要判断好银行B的转账是否操作成功,如果在运行点A发的异常就说明转账没有成功这个时候才需要补偿,如果是在运行点B发生的异常那么再补偿就是过度补偿
if (res == 0) {
//银行B没有转账成功补偿银行A
db20JdbcTemplate.update("update user_account set account=account+1000 where user_id=2");
}
}
}
TCC其实很好理解就是在程序里每一个分布式逻辑都按照T,C,C三个步骤去做对应的处理即可,不过TCC有一个很大的问题就是代码入侵性很强复杂度很高。要做本身业务的基础上做一些额外的事,并且对程序员和测试的要求会比较高。try的粒度很有讲究一定要判断是否需要Cancel,不然就会发生过度补偿的问题。
TCC相比XA协议不需要TM事务管理器来统一管理事务,性能会比XA协议要好很多。
使用本地消息表实现最终一致性
本地消息表这个方案最初是ebay提出,是基于BASE理论设计的,是最终一致性模型。
此方案也很好理解,核心是将需要分布式处理的任务通过消息日志的方式来异步执行。消息日志可以存储到数据库或消息队列,再通过业务规则自动或人工发起重试。人工重试更多的是应用于支付场景,通过对账系统对事后问题的处理。
举一个支付订单的场景:
首先需要在用户扣除账户金额的同时将所支付的订单消息存入到消息表中。
@Transactional(transactionManager = "db20TransactionManager")
public int payment(int uid, int order_id, int amount) {
//查询用户账户信息
List<Map<String,Object>> userAccountList = db20JdbcTemplate.queryForList("select id,user_id,account from user_account where user_id = ?",uid);
if (userAccountList.size() == 0) {
return 1;
}
UserAccount userAccount = new UserAccount();
try {
BeanUtils.populate(userAccount,userAccountList.get(0));
}catch (Exception e){
e.printStackTrace();
return 1;
}
int account = userAccount.getAccount();
if (account < amount) {
return 2;
}
//更新用户账户金额
userAccount.setAccount(account - amount);
db20JdbcTemplate.update("UPDATE `user_account` SET `account` = ? WHERE `user_id` = ?;", userAccount.getAccount(), userAccount.getUser_id());
PayMsg payMsg = new PayMsg();
payMsg.setId(1001);
payMsg.setOrder_id(order_id);
payMsg.setStatus(0);//0-未发送,1-发送成功,2-超次数
payMsg.setFail_count(0);
//写入本地消息表
db20JdbcTemplate.update("INSERT INTO `pay_msg`(`id`, `order_id`, `status`, `fail_count`) VALUES (?, ?, ?, ?)", payMsg.getId(), payMsg.getOrder_id(), payMsg.getStatus(), payMsg.getFail_count());
return 0;
}
然后不断轮询消息表的数据去更改订单状态,这里是通过http的方式去调用更改订单状态的接口,也可以通过RPC的方式。
@Scheduled(cron = "0/5 * * * * ?")
public void orderNotify() throws Exception{
List<PayMsg> payMsgList =new ArrayList<>();
//查询未处理成功的消息
List<Map<String,Object>> mapList = db20JdbcTemplate.queryForList("select id,order_id,status,fail_count from pay_msg where status = 0");
if(mapList.isEmpty()){
return;
}
for(Map<String,Object> map : mapList){
PayMsg payMsg = new PayMsg();
BeanUtils.populate(payMsg,map);
payMsgList.add(payMsg);
}
for (PayMsg payMsg: payMsgList) {
int order_id = payMsg.getOrder_id();
//调用订单接口来更改消息状态,这里会有重试,最多重试五次
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
HttpGet httpGet = new HttpGet("http://localhost:8080/handleorder?id="+order_id);
CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
String response = EntityUtils.toString(httpResponse.getEntity());
System.out.println("************调用结果:"+response);
if("success".equals(response)){
payMsg.setStatus(1);
}else{
int count = payMsg.getFail_count();
payMsg.setFail_count(count+1);
if(count+1>5){
payMsg.setStatus(2);
}
}
db20JdbcTemplate.update("update pay_msg set status=?,fail_count=? where id=?",payMsg.getStatus(),payMsg.getFail_count(),payMsg.getId());
}
}
这样一个简单的分布式事务方案就OK了,要注意的是要保证业务的幂等性。相比XA和TCC,本地消息表的方案代码入侵性比TCC更少一些,也因为只会用到本地事务会比XA的性能更好一点,适用于对一致性要求不高(不需要很及时)的场景。
以上所有的示例代码可以看这里https://github.com/burgleaf/distributed-transaction