创建Spring项目使用DataX进行定时数据库备份

​ DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

在DataX的官网介绍文档中,其使用十分简单。下载安装包之后,使用python datax.py [demo.json]命令即可进行数据同步。虽然其启动命令使用的是python脚本,但是看其安装包之后发现,只有启动的部分配置环境变量使用的是python,其余具体源码都是使用的java。既然底层是用java写的,所以萌发了使用Spring来时备份数据的想法。

一、DataX3.0基本结构

在之前的一篇博文中,在Intell Idea中启动了DataX,证明了使用Java项目引用DataX是可行的。下面简单分析一下DataX的源码。

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

datax

总的来说DataX项目由FrameWork(core包、common包和transformer包)以及ReadPlugin和WritePlugin组成,对于DataX所支持的数据库,都有一对XXXreader和XXXwriter包。下面就是从DataX的github项目clone下来的源码包的结构目录。

--DataX
----common
----core
----transformer
----XXXreader
----XXXwriter
----...
  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework(core、common和transfer模块)用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

二、基本框架模块代码解析

入口方法是core包下面的 com.alibaba.datax.core.Engine.main(String[] args)方法,直接调用了Engine类的entry()方法

2.1 Engine启动类代码解析
  • entry()方法
public static void entry(final String[] args) throws Throwable {
        Options options = new Options();
        options.addOption("job", true, "Job config.");
        options.addOption("jobid", true, "Job unique id.");
        options.addOption("mode", true, "Job runtime mode.");
        String jobPath = cl.getOptionValue("job");
        Configuration configuration = ConfigParser.parse(jobPath);
        ...
        configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
        ...
        ConfigurationValidate.doValidate(configuration);
        Engine engine = new Engine();
        engine.start(configuration);
}

entry()方法主要作用:
1、 获取项目启动参数:job、jobid和mode;
2、 使用ConfigParser工具类从jobpath即传输任务的json文件获取configuration,并对jobId以及mode进行了验证,随后将configuration作为入参,调用Engine类的start()方法。

  • start()方法
public void start(Configuration allConf) {
        ColumnCast.bind(allConf);
        LoadUtil.bind(allConf);

        boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
                .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
         ...
        if (isJob) {
            allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
            container = new JobContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

        }
        ...
        container.start();
    }

start()方法主要作用:
1、初始化一些配置,检查任务的model;
2、根据Configuration的内容创建JobContainer,真正启动任务是调用JobContainer的start()方法。

2.2 JobContainer类解析
  • start()方法
   public void start() {
            this.userConf = this.configuration.clone();
            this.preHandle();
            this.init();
            this.prepare();
            this.totalStage = this.split();
            this.schedule();
            this.post();
            this.postHandle();
            this.invokeHooks();
        }

start()方法可以看出datax进行数据备份的一系列流程,从预处理,初始化,到实际调用对应的reader和writer的插件,有兴趣的读者可以自行查看源代码。

三、打包DataX项目

解析了DataX的源代码之后,我们已经知道了从哪里可以调用DataX的备份功能。我们可以将从github上clone源码到本地,使用maven将项目打包放在本地仓库,有条件的话可以上传到私服。

在core包的pom.xml文件里面加上插件如下:

            <!-- 打包源码 -->
            <plugin>
                <artifactId>maven-source-plugin</artifactId>
                <version>2.1</version>
                <configuration>
                    <attach>true</attach>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

使用mvn clean install命令将包安装到本地仓库,同理可以打包自己需要的reader和writer包,比如我想将Oracle的表导入到MySql数据库,那么我就需要打包oraclereader和mysqlwriter这两个包。

四、创建boot项目,使用maven包

依赖都已经准备好,下一步就可以来创建boot工程了,除了Springboot项目所需要的相关依赖以外,pom文件里面还要加上DataX的相关依赖如下:

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-core</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>mysqlreader</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>mysqlwriter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

从第二节我们知道,入口是Engine类的entry()方法调用自身的start()方法,但是entry()方法的入参比较复杂,再加上在实际应用中,Datax的job.json任务文件里面配置的数据库的密码是加密的,所以我自定义了一个DataxUtil类,然后去调用Engine类的start()方法。

五、多线程时的错误

在使用定时任务的过程中,我发现当两个job在同一时刻开始,并且reader和writer不同的时候,会出现找不到对应的reader或者writer的异常。比如说 job1和job2都在凌晨01点整执行,job1是从Oracle备份数据到MySQL,而job2是从MySQL备份数据到MySQL,就会报找不到oraclereader plugin或者mysqlreader plugin的错误。如果两个job的reader和writer分别相同,比如说都是从Oracle备份到MySQL,或者都是是从MySQL备份到Oracle则可以正常运行。
经过一番查找,发现问题就出现在LoadUtil这个类。在2.2节介绍的JobContainer类的start()方法里面调用了preHandle()方法,preHandle()方法里面使用了LoadUtil来加载对应的reader和writer,而LoadUtil的loadJobPlugin()方法线程不安全,从而导致了前一个job加载到一半的reader或者writer会被其他线程篡改,导致前一个job的reader或者writer不可用。

AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);

解决方法有两个:
1、强行将定时任务改成串行的,前一个job结束之后才调用下一个job;
2、修改LoadUtil类,保证loadJobPlugin()方法的线程安全。

以下是LoadUtil类需要修改的部分:

  • 删除以下两行
private static Configuration pluginRegisterCenter;
private static Map<String, JarLoader> jarLoaderCenter = new HashMap<String, JarLoader>();

替换成:

/**
* 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别
* 具体pluginName,故使用pluginType.pluginName作为key放置在该map中
*/
private static ThreadLocal<Configuration> pluginRegisterCenter = new InheritableThreadLocal<Configuration>();

/** jarLoader的缓冲 */
private final static Map<String, JarLoader> jarLoaderCenter = new ConcurrentHashMap<String, JarLoader>();
  • 修改bind方法,改为:
    public static void bind(final Configuration pluginConfigs) {
        pluginRegisterCenter.set(pluginConfigs);
    }
  • 修改getPluginConf方法,改为:
private static Configuration getPluginConf(PluginType pluginType,
                                               String pluginName) {
        Configuration pluginConf = pluginRegisterCenter.get()
                .getConfiguration(generatePluginKey(pluginType, pluginName));

        if (null == pluginConf) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
                    String.format("DataX不能找到插件[%s]的配置.",
                            pluginName));
        }

        return pluginConf;
    }

至此就将LoadUtil改成了线程安全的类。

不过估计在DataX设计之初,就是面向运维人员,所以才会使用python命令以使其运行对应job。故其可能在多线程支持方面可能会有欠缺,所以在自行使用的时候,最好是不要使用多线程。一是由于源码本身就不支持多线程,即使对目前暴露出的问题修改了部分源码,由于没有深入阅读源码,可能会在将来遇到其他方面的问题;二是复制效率问题,多个线程一起传输数据时,如果都是大量数据的传输,可能会对内存、IO、还有网络造成争用,造成其他问题。

六、spring项目代码

boot项目代码待上传。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容