DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
在DataX的官网介绍文档中,其使用十分简单。下载安装包之后,使用python datax.py [demo.json]
命令即可进行数据同步。虽然其启动命令使用的是python脚本,但是看其安装包之后发现,只有启动的部分配置环境变量使用的是python,其余具体源码都是使用的java。既然底层是用java写的,所以萌发了使用Spring来时备份数据的想法。
一、DataX3.0基本结构
在之前的一篇博文中,在Intell Idea中启动了DataX,证明了使用Java项目引用DataX是可行的。下面简单分析一下DataX的源码。
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
总的来说DataX项目由FrameWork(core包、common包和transformer包)以及ReadPlugin和WritePlugin组成,对于DataX所支持的数据库,都有一对XXXreader和XXXwriter包。下面就是从DataX的github项目clone下来的源码包的结构目录。
--DataX
----common
----core
----transformer
----XXXreader
----XXXwriter
----...
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework(core、common和transfer模块)用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
二、基本框架模块代码解析
入口方法是core包下面的 com.alibaba.datax.core.Engine.main(String[] args)
方法,直接调用了Engine类的entry()
方法
2.1 Engine
启动类代码解析
-
entry()
方法
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
String jobPath = cl.getOptionValue("job");
Configuration configuration = ConfigParser.parse(jobPath);
...
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
...
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
entry()
方法主要作用:
1、 获取项目启动参数:job、jobid和mode;
2、 使用ConfigParser工具类从jobpath即传输任务的json文件获取configuration,并对jobId以及mode进行了验证,随后将configuration作为入参,调用Engine类的start()
方法。
-
start()
方法
public void start(Configuration allConf) {
ColumnCast.bind(allConf);
LoadUtil.bind(allConf);
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
...
if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
}
...
container.start();
}
start()
方法主要作用:
1、初始化一些配置,检查任务的model;
2、根据Configuration的内容创建JobContainer,真正启动任务是调用JobContainer的start()
方法。
2.2 JobContainer
类解析
-
start()
方法
public void start() {
this.userConf = this.configuration.clone();
this.preHandle();
this.init();
this.prepare();
this.totalStage = this.split();
this.schedule();
this.post();
this.postHandle();
this.invokeHooks();
}
从start()
方法可以看出datax进行数据备份的一系列流程,从预处理,初始化,到实际调用对应的reader和writer的插件,有兴趣的读者可以自行查看源代码。
三、打包DataX项目
解析了DataX的源代码之后,我们已经知道了从哪里可以调用DataX的备份功能。我们可以将从github上clone源码到本地,使用maven将项目打包放在本地仓库,有条件的话可以上传到私服。
在core包的pom.xml文件里面加上插件如下:
<!-- 打包源码 -->
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<configuration>
<attach>true</attach>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
使用mvn clean install
命令将包安装到本地仓库,同理可以打包自己需要的reader和writer包,比如我想将Oracle的表导入到MySql数据库,那么我就需要打包oraclereader和mysqlwriter这两个包。
四、创建boot项目,使用maven包
依赖都已经准备好,下一步就可以来创建boot工程了,除了Springboot项目所需要的相关依赖以外,pom文件里面还要加上DataX的相关依赖如下:
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
从第二节我们知道,入口是Engine
类的entry()
方法调用自身的start()
方法,但是entry()
方法的入参比较复杂,再加上在实际应用中,Datax的job.json任务文件里面配置的数据库的密码是加密的,所以我自定义了一个DataxUtil类,然后去调用Engine
类的start()
方法。
五、多线程时的错误
在使用定时任务的过程中,我发现当两个job在同一时刻开始,并且reader和writer不同的时候,会出现找不到对应的reader或者writer的异常。比如说 job1和job2都在凌晨01点整执行,job1是从Oracle备份数据到MySQL,而job2是从MySQL备份数据到MySQL,就会报找不到oraclereader plugin或者mysqlreader plugin的错误。如果两个job的reader和writer分别相同,比如说都是从Oracle备份到MySQL,或者都是是从MySQL备份到Oracle则可以正常运行。
经过一番查找,发现问题就出现在LoadUtil
这个类。在2.2节介绍的JobContainer
类的start()
方法里面调用了preHandle()
方法,preHandle()
方法里面使用了LoadUtil
来加载对应的reader和writer,而LoadUtil的loadJobPlugin()
方法线程不安全,从而导致了前一个job加载到一半的reader或者writer会被其他线程篡改,导致前一个job的reader或者writer不可用。
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);
解决方法有两个:
1、强行将定时任务改成串行的,前一个job结束之后才调用下一个job;
2、修改LoadUtil类,保证loadJobPlugin()
方法的线程安全。
以下是LoadUtil类需要修改的部分:
- 删除以下两行
private static Configuration pluginRegisterCenter;
private static Map<String, JarLoader> jarLoaderCenter = new HashMap<String, JarLoader>();
替换成:
/**
* 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别
* 具体pluginName,故使用pluginType.pluginName作为key放置在该map中
*/
private static ThreadLocal<Configuration> pluginRegisterCenter = new InheritableThreadLocal<Configuration>();
/** jarLoader的缓冲 */
private final static Map<String, JarLoader> jarLoaderCenter = new ConcurrentHashMap<String, JarLoader>();
- 修改bind方法,改为:
public static void bind(final Configuration pluginConfigs) {
pluginRegisterCenter.set(pluginConfigs);
}
- 修改getPluginConf方法,改为:
private static Configuration getPluginConf(PluginType pluginType,
String pluginName) {
Configuration pluginConf = pluginRegisterCenter.get()
.getConfiguration(generatePluginKey(pluginType, pluginName));
if (null == pluginConf) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
String.format("DataX不能找到插件[%s]的配置.",
pluginName));
}
return pluginConf;
}
至此就将LoadUtil改成了线程安全的类。
不过估计在DataX设计之初,就是面向运维人员,所以才会使用python命令以使其运行对应job。故其可能在多线程支持方面可能会有欠缺,所以在自行使用的时候,最好是不要使用多线程。一是由于源码本身就不支持多线程,即使对目前暴露出的问题修改了部分源码,由于没有深入阅读源码,可能会在将来遇到其他方面的问题;二是复制效率问题,多个线程一起传输数据时,如果都是大量数据的传输,可能会对内存、IO、还有网络造成争用,造成其他问题。
六、spring项目代码
boot项目代码待上传。