分布式事务概述
说起分布式事务,是个让人又爱又恨的话题。恨他,因为这个世界性难题始终没有一个完美的解决方案。而爱他,因为他引出了一系列解巧妙决方案,不得不感叹一代又一代计算机人的智慧。这篇文章我们就来谈谈分布式事务的一些解决方案,其中互联网公司中最常用哪种?MQ又如何和事务联系起来呢?
模型
在正式开始介绍之前,我们先通过两张图例来看看什么是分布式事务的情况。
上图我们再熟悉不过,就是传统的本地事务,我们操作的数据库是一个单数据源,通过数据库提供的事务就可以轻松完成,业务上通过spring的声明式事务注解@Transitional就可以,可是如果多数据源的情况呢?
像这样其实就是分布式事务的一种情况了,跨库事务。很明显本地事务那套方式已经玩不起来了,要怎么办呢?当然是两阶段提交喽,别急,先卖个关子,后面我们详细来解释,下面再来看一种情况。
以上大概是互联网最常见的一种模型了,微服务间的分布式事务。这种情况甚至服务间存在于不同的jvm进程。这里要使用的方式就是大名鼎鼎的TCC了,这个我们后面再详细探讨具体的实现方式。
好了,了解了分布式事务产生的模型,我们就可以开始正式开始吹牛逼之旅了,等等,貌似还需要交代几个小小的理论。
BASE理论
为什么说分布式事务是个难题?因为要达到强一致性(数据更新后立即达到一致状态)是非常困难的,所以有了中间状态(软状态),即允许数据在某一状态下是不一致的,但是要尽可能保证最终一致性。这其实也就是BASE理论了,他的定义如下:
base理论
- 基本可用(Basically Availability)
指分布式系统出现不可预知错误的时候,允许损失部分可用性。 - 软状态(Soft State)
也就是中间状态,允许存在这种状态并认为不会影响系统的整体可用性,即允许不同节点在的数据传输之间存在延迟。 - 最终一致(Eventual Consistency)
在数据更新操作之后,在经过一定时间的同步之后,最终都能达到一个一致的状态。不需要保证系统的强一致性。
两阶段提交(Two Phase Commitment)
所谓两阶段提交,顾名思义,就是把事务的提交分成两个阶段,但是注意,这两个阶段是在一组操作中的,不要误以为是两组操作。可能这么说不是很明白,我们再来看张图例
还不明白也没关系,等介绍完了XA和TCC再回过头看看就GET了。好了,下面我们就从XA开始,正式开始来了解分布式事务的那些解决方案。
XA/JTA方案
XA是业界关于分布式管理的一个规范,而JTA是JAVA的一个实现。
在XA中,我们引入了一个中间协调者的角色。在第一阶段中,所有的参与者需要锁住要操作的资源,进行操作,然后通知协调者已经准备就绪,可以提交事务。
第二阶段时协调者收到了某个参与者发送的请求,得知了他们都已经达到了可以提交事务的状态,接着像所有参与者发送commit命令,事务提交。如果有一方参与者执行失败,那协调器就会发送rollback命令,各个参与者都回滚。
都说talk is cheap,show me the code,下面我们就来看看上述过程使用JTA实现的一组代码
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1", "root", "root");
XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 执行rm1上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);// One of TMNOFLAGS, TMJOIN,
// or TMRESUME.
PreparedStatement ps1 = conn1.prepareStatement("INSERT into user(name) VALUES ('tianshouzhi')");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事务分支id
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = conn2.prepareStatement("INSERT into user(name) VALUES ('wangxiaoxiao')");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2:提交所有事务分支
boolean onePhase = false; // TM判断有2个事务分支,所以不能优化为一阶段提交
// 所有事务分支都prepare成功,提交所有事务分支
if (rm1_prepare == XAResource.XA_OK && rm2_prepare == XAResource.XA_OK) {
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {// 如果有事务分支没有成功,则回滚
rm1.rollback(xid1);
rm1.rollback(xid2);
}
} catch (XAException e) {
// 如果出现异常,也要进行回滚
e.printStackTrace();
}
再来看看atomikos的实现方式,atomikos的免费开源版也实现了XA
private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
// 连接池基本属性
Properties p = new Properties();
p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
p.setProperty("user", "root");
p.setProperty("password", "root");
// 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
// atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同
ds.setUniqueResourceName(dbName);
ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
ds.setXaProperties(p);
return ds;
}
public static void main(String[] args) throws Exception {
AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");
Connection conn1 = null;
Connection conn2 = null;
PreparedStatement ps1 = null;
PreparedStatement ps2 = null;
UserTransaction userTransaction = new UserTransactionImp();
try {
// 开启事务
userTransaction.begin();
// 执行db1上的sql
conn1 = ds1.getConnection();
ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
ps1.setString(1, "tianshouzhi");
ps1.executeUpdate();
ResultSet generatedKeys = ps1.getGeneratedKeys();
int userId = -1;
while (generatedKeys.next()) {
userId = generatedKeys.getInt(1);// 获得自动生成的userId
}
// 模拟异常 ,直接进入catch代码块,2个都不会提交
// int i=1/0;
// 执行db2上的sql
conn2 = ds2.getConnection();
ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
ps2.setInt(1, userId);
ps2.setDouble(2, 10000000);
ps2.executeUpdate();
// 两阶段提交
userTransaction.commit();
} catch (Exception e) {
try {
e.printStackTrace();
userTransaction.rollback();
} catch (SystemException e1) {
e1.printStackTrace();
}
} finally {
try {
ps1.close();
ps2.close();
conn1.close();
conn2.close();
ds1.close();
ds2.close();
} catch (Exception ignore) {
}
}
}
简单很多了对吧?其实它们的原理时十分类似的,所以如果有面试问到atomikos的具体实现方式,你懂得!
说了这么多,其实这种方式更多的是提供一种解决问题的思路,实际环境中时不太可能这么玩的,因为这种方式性能太差了,他需要锁住相应的资源,互联网项目中有着很高并发吞吐量,这种方式很明显不适合,所以还是得引入我们今天要讨论的第二种方式:TCC
TCC两阶段补偿性方案
根据上面的图例,也可以看出TCC就是 Try-Confirm-Cancel的简称,为了让大家更好的理解这种方案,我举个例子来说明。
假设我们需要一张从合肥飞往大理的机票,但是没有直飞,怎么办?难道一张一张买吗,更不用说还要留意中转时间是否合理等等问题,所以我们往往会选择一个机票预订平台,让他帮我们一次性购买这两张机票。
这就是一个典型的分布式事务的场景了,机票预订平台需要同时向两家航空公司(不同DB,不同SERVER)发送下单请求,要么同时成功,要么同时失败。很明显XA那种方案是完全不适用的,总不能把人家的表资源给锁了,谁会让你这么干。所以我们需要的是一种业务上的手段。
好,这种业务手段其实就是TCC,在第一阶段中,机票预订平台会像两家航空公司提供的API接口发送请求,预留我们需要的机票。第二阶段中, 如果预留操作任意一方不成功,就发送取消请求,订票不成功。如果成功呢?发送确认请求,完成下单操作。这样一来,就保证了预定机票这组操作要么同时成功,要么不成功。中间的预留就是中间状态,但是最终保证了数据的一致性。
你可能有这样的问题,要是确认阶段有一方失败怎么办?首先呢,这个失败几率不高,但是对于互联网公司来说,即使很低的几率对应的订单量可能也是非常庞大的,所以需要两方共同提供一定的机制,进一步提高订票的成功率。比如,机票预订平台在收到确认失败的消息时,可能会有一定的重试机制,若重试若干次时候依然不成功,才会认为是真正的失败。航空公司则会对接口保证幂等性,对网络超时失败的情况(订单其实已经生成)也要有一定的处置方式。
如果还是失败呢?BASE定理允许产生一定的不可用,所以我们要对这种情况进行补偿。通常使用日志或者MQ的方式进行补偿,甚至最后还是需要通过人工对账的方式。
说到这里已经不难看出,XA是一种资源层面的分布式事务,在两阶段提交的整个过程中,它都会一致持有资源的锁,是强一致性,而TCC则是业务层面的分布式事务,不会持有资源锁,保证的是最终一致性。最后再给大家提供一个实现了tcc的框架,有兴趣的话可以多看看,我们这里就不提供具体代码了。
MQ事务方案
终于到了最后一种方案了,其实也很简单,先看图
服务1先向MQ发送一条中间状态的prepare消息,此时这条消息不会被消费者收到。接着继续执行服务1中的业务逻辑,成功后再向MQ发送confirm消息,将这条消息从中间状态改为可被消费者接受的状态,消费者收到消息后执行己方业务逻辑,成功后向MQ发送ACK。
这样同样保证了分布式事务,且因为存在中间状态,所以保证的也是最终一致性。如果消费者一方收取消息出现异常或ack请求超时呢?MQ一般都有一定的消息补发重试机制,所以要做好接口的幂等优化。如果confirm请求失败呢?这时候消息队列需要像服务1对应的业务发送定时消息来确认当前状态,如果已经成功,再修改中间状态即可。
总结
无论哪种方案都不能十全十美的保证分布式事务,所以一定要做好补偿。总而言之,业界对于这一难题的解决方案都是柔性事务+补偿机制,强调的是最终一致性。要想保证强一致性又不影响性能,这就是一个世界性难题了。不过牛人辈出,说不定哪一天我们就能见到这样的方案了不是吗?