1.前言
datax是阿里出品,最初是为了解决淘宝数据交换的问题,据说淘宝有30%的数据交换是通过datax完成的。
2.介绍
DataX 是一个开源异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Data目前已经支持常用的插件体系,主流的RDBMS,NOSQL,大数据计算系统都已接入。
3.源码解析
从github上clone源码到本地,源码地址:https://github.com/alibaba/DataX。
DataX源码由Framework(core包,common包和transformer包)及 plugin(ReadPlugin和WritePlugin)组成。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
3.1入口类Engine
entry()方法:
主要用于获取项目启动参数:job,jobid,mode;
注意:mode分为单机模式和分布式模式,这里指定为standalone 单机模式。
jobid默认值为-1,只有在standalone模式下使用,非 standalone 模式必须提供有效的jobid值。
public static void entry(String jobPath)throws Throwable {
String jobIdString ="-1";
// 指定单机还是分布式模式运行
RUNTIME_MODE ="standalone";
Configuration configuration = ConfigParser.parse(jobPath);
......
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
......
ConfigurationValidate.doValidate(configuration);
Engine engine =new Engine();
engine.start(configuration);
}
start()方法:
主要用于初始化配置,检查job的model信息。
public void start(Configuration allConf) {
// 绑定column转换信息
ColumnCast.bind(allConf);
/**
* 初始化PluginLoader,可以获取各种插件配置
*/
LoadUtil.bind(allConf);
......
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);
container.start();
}
3.2 jobContainer容器
job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报。
start()方法:
jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、post以及destroy和statistics。
public void start() {
LOG.info("DataX jobContainer starts job.");
this.preHandle();
this.init();
this.prepare();
this.totalStage =this.split();
this.schedule();
this.post();
this.postHandle();
this.invokeHooks();
}
init()方法:reader和writer的初始化
private void init() {
......
JobPluginCollector jobPluginCollector =new DefaultJobPluginCollector(
this.getContainerCommunicator());
//必须先Reader ,后Writer
this.jobReader =this.initJobReader(jobPluginCollector);
this.jobWriter =this.initJobWriter(jobPluginCollector);
}
schedule()方法:
任务调度器schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中。
private void schedule() {
/**
* 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
*/
List taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
......
AbstractScheduler scheduler;
scheduler = initStandaloneScheduler(this.configuration);
scheduler.schedule(taskGroupConfigs);
......
/** * 检查任务执行情况 */this.checkLimit();
this.checkLimit();
}
post()方法: 启动各类数据库插件的读写任务。
private void post() {
this.postJobWriter();
this.postJobReader();
}
4. Spring Boot集成DataX
在springboot项目上,通过POM文件引入datax相关jar包
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
同时需要引入数据源读取和写入相关的Reader/Writer插件
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
<scope>system</scope>
<systemPath>${basedir}/src/main/lib/ojdbc6-11.2.0.3.jar</systemPath>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0</version>
<scope>system</scope>
<systemPath>${basedir}/src/main/lib/sqljdbc4-4.0.jar</systemPath>
</dependency>
这里引入mysql 及oracle数据源对应的插件