dataX中的writeModel

本文中的writeModel主要是Mysql、Oracle等传统关系数据库中的writeMode。dataX导入到hive是直接写文件,不会支持这些writeModel。

预备知识

Mysql中的ON DUPLICATE KEY UPDATE

使用 ON DUPLICATE KEY UPDATE语句的时候,如果你插入的记录导致主键或唯一索引重复,那么Mysql就会认为该条记录存在,则执行update语句而不是insert语句;反之则执行insert语句而不是更新语句。
新建表user,id作为user的主键,并插入一条id = 1的数据

CREATE TABLE `user` (
  `id` int(12) NOT NULL ,
  `username` varchar(32) DEFAULT NULL,
  `password` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB;

INSERT INTO user VALUES (1,'GJMZ1','123456');

执行以下语句之后user表中然后只有一条语句,只是将password改为'1234567',这就是ON DUPLICATE KEY UPDATE的功能。

INSERT INTO user VALUES (1,'GJMZ1','1234567')ON DUPLICATE KEY UPDATE username = 'GJMZ1',password='1234567';

如果去掉表中的主键,执行上面两条sql之后,表中会有两行数据,因为执行ON DUPLICATE KEY UPDATE语句时,mysql是通过主键或者唯一索引来判断两条数据是否重复。

writeModel的用法

writeModel控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句。

  • insert:将数据源表的数据直接写的到目的表,主要用于全量的导入。实现原理是直接采用insert into;
  • replace和update:如果目标表中包含待写入的数据则更新该行数据,主要用于增量导入。实现原理:在mysql中用ON DUPLICATE KEY UPDATE语句,其他数据库中用replace into.

相关源码

以MysqlWriter为例进行说明。
流程很简单,设置的时候用户配置的每一个字段都要更新,以"?"的方式写进sql中,最后利用PrepareStatement#setXXX进行设置。根据三种模式拼接相应的sql即可,最后将sql保存在Cofiguration#insertOrReplaceTemplate中。

// OriginalConfPretreatmentUtil#dealWriteMode
public static void dealWriteMode(Configuration originalConfig, DataBaseType dataBaseType) {
        List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
        String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
                Constant.CONN_MARK, Key.JDBC_URL, String.class));
        // 默认为:insert 方式
        String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
        List<String> valueHolders = new ArrayList<String>(columns.size());
        for (int i = 0; i < columns.size(); i++) {
            valueHolders.add("?");
        }

        boolean forceUseUpdate = false;
        //ob10的处理
        if (dataBaseType == DataBaseType.MySql && isOB10(jdbcUrl)) {
            forceUseUpdate = true;
        }

        String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate);
        LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
        originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
    
// WriterUtil#getWriteTemplate
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {
        boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")
                || writeMode.trim().toLowerCase().startsWith("replace")
                || writeMode.trim().toLowerCase().startsWith("update");

        if (!isWriteModeLegal) {
            throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                    String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));
        }
        // && writeMode.trim().toLowerCase().startsWith("replace")
        String writeDataSqlTemplate;
        if (forceUseUpdate ||
                ((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) && writeMode.trim().toLowerCase().startsWith("update"))
                ) {
            //update只在mysql下使用

            writeDataSqlTemplate = new StringBuilder()
                    .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
                    .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                    .append(")")
                    .append(onDuplicateKeyUpdateString(columnHolders))
                    .toString();
        } else {

            //这里是保护,如果其他错误的使用了update,需要更换为replace
            if (writeMode.trim().toLowerCase().startsWith("update")) {
                writeMode = "replace";
            }
            writeDataSqlTemplate = new StringBuilder().append(writeMode)
                    .append(" INTO %s (").append(StringUtils.join(columnHolders, ","))
                    .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                    .append(")").toString();
        }

        return writeDataSqlTemplate;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。