9. sharding-jdbc源码之最大努力型事务

阿飞Javaer,转载请注明原创出处,谢谢!

BASE Transaction

  • Best efforts delivery transaction (已经实现).
  • Try confirm cancel transaction (待定).

Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务。

最大努力送达型事务

在分布式数据库的场景下,相信对于该数据库的操作最终一定可以成功,所以通过最大努力反复尝试送达操作。

最大努力送达型事务的架构图

最大努力送达型事务的架构图

摘自sharding-jdbc使用指南☞事务支持

执行过程有以下几种情况:

  1. 执行成功--如图所示,执行结果事件->监听执行事件->执行成功->清理事务日志
  2. 执行失败,同步重试成功--如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行成功->清理事务日志
  3. 执行失败,同步重试失败,异步重试成功--如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行成功->清理事务日志
  4. 执行失败,同步重试失败,异步重试失败,事务日志保留----如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行失败->... ...

说明:不管执行结果如何,执行前事件都会记录事务日志;执行事件类型包括3种:BEFORE_EXECUTEEXECUTE_FAILUREEXECUTE_SUCCESS;另外,这里的"同步"不是绝对的同步执行,而是通过google-guava的EventBus发布事件后,在监听端判断是EXECUTE_FAILURE事件,最多重试syncMaxDeliveryTryTimes次;后面对BestEffortsDeliveryListener的源码分析有介绍;这里的"异步"通过外挂实现,在后面的文章10. sharding-jdbc源码之异步送达JOB会有分析;

适用场景

  • 根据主键删除数据。
  • 更新记录永久状态,如更新通知送达状态。

使用限制

  • 使用最大努力送达型柔性事务的SQL需要满足幂等性。
  • INSERT语句要求必须包含主键,且不能是自增主键。
  • UPDATE语句要求幂等,不能是UPDATE xxx SET x=x+1
  • DELETE语句无要求。

开发示例

// 1. 配置SoftTransactionConfiguration
SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);
// 配置相关请看后面的备注
transactionConfig.setXXX();

// 2. 初始化SoftTransactionManager
SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);
transactionManager.init();

// 3. 获取BEDSoftTransaction
BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);

// 4. 开启事务
transaction.begin(connection);

// 5. 执行JDBC
/* 
    code here
*/
* 
// 6.关闭事务
transaction.end();

备注:SoftTransactionConfiguration支持的配置以及含义请参考sharding-jdbc使用指南☞事务支持,这段开发示例的代码也摘自这里;也可参考sharding-jdbc-transaction模块中com.dangdang.ddframe.rdb.transaction.soft.integrate.SoftTransactionTest如何使用柔性事务,但是这里的代码需要稍作修改,否则只是普通的执行逻辑,不是sharding-jdbc的执行逻辑

@Test
public void bedSoftTransactionTest() throws SQLException {
    SoftTransactionManager transactionManagerFactory = new SoftTransactionManager(getSoftTransactionConfiguration(getShardingDataSource()));
    // 初始化柔性事务管理器
    transactionManagerFactory.init();
    BEDSoftTransaction transactionManager = (BEDSoftTransaction) transactionManagerFactory.getTransaction(SoftTransactionType.BestEffortsDelivery);
    transactionManager.begin(getShardingDataSource().getConnection());
    // 执行INSERT SQL(DML类型),如果执行过程中异常,会在`BestEffortsDeliveryListener`中重试
    insert();
    transactionManager.end();
}

private void insert() {
    String dbSchema = "insert into transaction_test(id, remark) values (2, ?)";
    try (
            // 将.getConnection("db_trans", SQLType.DML)移除,这样的话,得到的才是ShardingConnection 
            Connection conn = getShardingDataSource().getConnection();
            PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
        preparedStatement.setString(1, "JUST TEST IT .");
        preparedStatement.executeUpdate();
    } catch (final SQLException e) {
        e.printStackTrace();
    }
}

核心源码分析

通过3. sharding-jdbc源码之路由&执行中对ExecutorEngine的分析可知,sharding-jdbc在执行SQL前后,分别调用EventBusInstance.getInstance().post()提交了事件,那么调用EventBusInstance.getInstance().register()的地方,就是柔性事务处理的地方,通过查看源码的调用关系可知,只有SoftTransactionManager.init()调用了EventBusInstance.getInstance().register(),所以柔性事务实现的核心在SoftTransactionManager这里;

柔性事务管理器

柔性事务实现在SoftTransactionManager中,核心源码如下:

public final class SoftTransactionManager {

    // 柔性事务配置对象   
    @Getter
    private final SoftTransactionConfiguration transactionConfig;
    
    /**
     * Initialize B.A.S.E transaction manager.
     * @throws SQLException SQL exception
     */
    public void init() throws SQLException {
        // 初始化注册最大努力送达型柔性事务监听器;
        EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
        if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
            // 如果事务日志数据源类型是关系型数据库,则创建事务日志表transaction_log
            createTable();
        }
        // 内嵌的最大努力送达型异步JOB任务,依赖当当开源的elastic-job
        if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
            new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
        }
    }
    
    // 从这里可知创建的事务日志表表名是transaction_log(所以需要保证每个库中用户没有自定义创建transaction_log表)
    private void createTable() throws SQLException {
        String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("
                + "`id` VARCHAR(40) NOT NULL, "
                + "`transaction_type` VARCHAR(30) NOT NULL, "
                + "`data_source` VARCHAR(255) NOT NULL, "
                + "`sql` TEXT NOT NULL, "
                + "`parameters` TEXT NOT NULL, "
                + "`creation_time` LONG NOT NULL, "
                + "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "
                + "PRIMARY KEY (`id`));";
        try (
                Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();
                PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
            preparedStatement.executeUpdate();
        }
    }

从这段源码可知,柔性事务的几个重点如下,接下来一一根据源码进行分析;

  • 事务日志存储器;
  • 最大努力送达型事务监听器;
  • 异步送达JOB任务;

1.事务日志存储器

柔性事务日志接口类为TransactionLogStorage.java,有两个实现类:

  1. RdbTransactionLogStorage:关系型数据库存储柔性事务日志;
  2. MemoryTransactionLogStorage:内存存储柔性事务日志;

1.1.1事务日志核心接口

TransactionLogStorage中几个重要接口在两个实现类中的实现:

  • void add(TransactionLog):Rdb实现就是把事务日志TransactionLog 插入到transaction_log表中,Memory实现就是把事务日志保存到ConcurrentHashMap中;
  • void remove(String id):Rdb实现就是从transaction_log表中删除事务日志,Memory实现从ConcurrentHashMap中删除事务日志;
  • void increaseAsyncDeliveryTryTimes(String id):异步增加送达重试次数,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb实现就是update transaction_log表中async_delivery_try_times字段加1;Memory实现就是TransactionLog中重新给asyncDeliveryTryTimes赋值new AtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet()
  • findEligibleTransactionLogs(): 查询需要处理的事务日志,条件是:①异步处理次数async_delivery_try_times小于参数最大处里次数maxDeliveryTryTimes,②transaction_type是BestEffortsDelivery,③系统当前时间与事务日志的创建时间差要超过参数maxDeliveryTryDelayMillis,每次最多查询参数size条;Rdb实现通过sql从transaction_log表中查询,Memory实现遍历ConcurrentHashMap匹配符合条件的TransactionLog;
  • boolean processData():Rdb实现执行TransactionLog中的sql,如果执行过程中抛出异常,那么调用increaseAsyncDeliveryTryTimes()增加送达重试次数并抛出异常,如果执行成功,删除事务日志,并返回true;Memory实现直接返回false(因为processData()的目的是执行TransactionLog中的sql,而Memory类型无法触及数据库,所以返回false)

1.1.2事务日志存储核心源码

RdbTransactionLogStorage.java实现源码:

public final class RdbTransactionLogStorage implements TransactionLogStorage {
    
    private final DataSource dataSource;
    
    @Override
    public void add(final TransactionLog transactionLog) {
        // 保存事务日志到rdb中的sql
        String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";
        try (
            Connection conn = dataSource.getConnection();
            PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
            ... ...
            preparedStatement.executeUpdate();
        } catch (final SQLException ex) {
            throw new TransactionLogStorageException(ex);
        }
    }
    
    @Override
    public void remove(final String id) {
        // 根据id删除事务日志的sql
        String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";
        try (
            Connection conn = dataSource.getConnection();
            PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
            preparedStatement.setString(1, id);
            preparedStatement.executeUpdate();
        } catch (final SQLException ex) {
            throw new TransactionLogStorageException(ex);
        }
    }
    
    @Override
    public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {
        List<TransactionLog> result = new ArrayList<>(size);
        // 执行该sql查询需要处理的事务日志,最多取size条;
        String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";
        try (Connection conn = dataSource.getConnection()) {
            try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
                ... ...
                preparedStatement.setLong(3, System.currentTimeMillis() - maxDeliveryTryDelayMillis);
                ... ...
            }
        } catch (final SQLException ex) {
            throw new TransactionLogStorageException(ex);
        }
        return result;
    }
    
    @Override
    public void increaseAsyncDeliveryTryTimes(final String id) {
        // 更新处理次数+1
        String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";
        try (
            ... ...
        } catch (final SQLException ex) {
            throw new TransactionLogStorageException(ex);
        }
    }
    
    @Override
    public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {
        try (
            Connection conn = connection;
            // 执行TransactionLog中的sql
            PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {
            for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {
                preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));
            }
            preparedStatement.executeUpdate();
        } catch (final SQLException ex) {
            如果抛出异常,表示执行sql失败,那么把增加处理次数并把异常抛出去;
            increaseAsyncDeliveryTryTimes(transactionLog.getId());
            throw new TransactionCompensationException(ex);
        }
        // 如果没有抛出异常,表示执行sql成功,那么删除该事务日志;
        remove(transactionLog.getId());
        return true;
    }
}

1.1.3事务日志存储样例

id transction_type data_source sql parameters creation_time async_delivery_try_times
85c141c4-1b8f-4e54-9010-0cc661bb1864 BestEffortsDelivery db_trans insert into transaction_test(id, remark) values (3, ?) ["TEST BY AFEI."] 1517899200989 0

transaction_log中存储的事务日志样例:

id transction_type data_source sql parameters creation_time async_delivery_try_times
85c141c4-1b8f-4e54-9010-0cc661bb1864 BestEffortsDelivery db_trans insert into transaction_test(id, remark) values (3, ?) ["TEST BY AFEI."] 1517899200989 0

1.2最大努力送达型事务监听器

核心源码如下:

/**
 * Best efforts delivery B.A.S.E transaction listener.
 * 
 * @author zhangliang
 */
@Slf4j
public final class BestEffortsDeliveryListener {
    
    @Subscribe
    @AllowConcurrentEvents
    // 从方法可知,只监听DML执行事件(DML即数据维护语言,包括INSERT, UPDATE, DELETE)
    public void listen(final DMLExecutionEvent event) {
        // 判断是否需要继续,判断逻辑为:事务存在,并且是BestEffortsDelivery类型事务
        if (!isProcessContinuously()) {
            return;
        }
        // 从柔性事务管理器中得到柔性事务配置
        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
        // 得到配置的柔性事务存储器
        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
        // 这里肯定是最大努力送达型事务(如果不是BEDSoftTransaction,isProcessContinuously()就是false)
        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
        // 根据事件类型做不同处理
        switch (event.getEventExecutionType()) {
            case BEFORE_EXECUTE:
                // 如果执行前事件,那么先保存事务日志;
                //TODO for batch SQL need split to 2-level records
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
                return;
            case EXECUTE_SUCCESS: 
                // 如果执行成功事件,那么删除事务日志;
                transactionLogStorage.remove(event.getId());
                return;
            case EXECUTE_FAILURE: 
                boolean deliverySuccess = false;
                // 如果执行成功事件,最大努力送达型最多尝试3次(可配置,SoftTransactionConfiguration中的参数syncMaxDeliveryTryTimes);
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    // 如果在该Listener中执行成功,那么返回,不需要再尝试
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                        // 通过执行"select 1"判断conn是否是有效的数据库连接;如果不是有效的数据库连接,释放掉并重新获取一个数据库连接;
                        if (!isValidConnection(conn)) {
                            bedSoftTransaction.getConnection().release(conn);
                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                            isNewConnection = true;
                        }
                        preparedStatement = conn.prepareStatement(event.getSql());
                        //TODO for batch event need split to 2-level records
                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
                        }
                        // 因为只监控DML,所以调用executeUpdate()
                        preparedStatement.executeUpdate();
                        // executeUpdate()后能执行到这里,说明执行成功;根据id删除事务日志;
                        deliverySuccess = true;
                        transactionLogStorage.remove(event.getId());
                    } catch (final SQLException ex) {
                        // 如果sql执行有异常,那么输出error日志
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
                        close(isNewConnection, conn, preparedStatement);
                    }
                }
                return;
            default: 
                // 值支持三种事件类型,对于其他值,抛出异常
                throw new UnsupportedOperationException(event.getEventExecutionType().toString());
        }
    }
    
}

BestEffortsDeliveryListener源码总结:

  • 执行前,插入事务日志;
  • 执行成功,则删除事务日志;
  • 执行失败,则最大努力尝试syncMaxDeliveryTryTimes次;

1.3 异步送达JOB任务

  • 部署用于存储事务日志的数据库。
  • 部署用于异步作业使用的zookeeper。
  • 配置YAML文件,参照示例文件config.yaml。
  • 下载并解压文件sharding-jdbc-transaction-async-job-$VERSION.tar,通过start.sh脚本启动异步作业。

异步送达JOB任务基于elastic-job,所以需要部署zookeeper;

异步送达JOB任务将在下一张详细讲解10. sharding-jdbc源码之异步送达JOB

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容