阿里开源数据同步工具——DataX源码揭秘

1.前言

datax是阿里出品,最初是为了解决淘宝数据交换的问题,据说淘宝有30%的数据交换是通过datax完成的。

2.介绍

DataX 是一个开源异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Data目前已经支持常用的插件体系,主流的RDBMS,NOSQL,大数据计算系统都已接入。


3.源码解析

从github上clone源码到本地,源码地址:https://github.com/alibaba/DataX

DataX源码由Framework(core包,common包和transformer包)及 plugin(ReadPlugin和WritePlugin)组成。

Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

3.1入口类Engine

  \bullet entry()方法:

  主要用于获取项目启动参数:job,jobid,mode;

  注意:mode分为单机模式和分布式模式,这里指定为standalone 单机模式。

            jobid默认值为-1,只有在standalone模式下使用,非 standalone 模式必须提供有效的jobid值。

public static void entry(String jobPath)throws Throwable {

        String jobIdString ="-1";

// 指定单机还是分布式模式运行

        RUNTIME_MODE ="standalone";

Configuration configuration = ConfigParser.parse(jobPath);

......

configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

......

ConfigurationValidate.doValidate(configuration);

Engine engine =new Engine();

engine.start(configuration);

}

\bullet start()方法:

主要用于初始化配置,检查job的model信息。

public void start(Configuration allConf) {

// 绑定column转换信息

    ColumnCast.bind(allConf);

/**

* 初始化PluginLoader,可以获取各种插件配置

*/

    LoadUtil.bind(allConf);

......

Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);

//初始化PerfTrace

    PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);

perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);

container.start();

}

3.2 jobContainer容器

job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报。

\bullet start()方法:

jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、post以及destroy和statistics。

public void start() {

LOG.info("DataX jobContainer starts job.");

this.preHandle();

this.init();

this.prepare();

this.totalStage =this.split();

this.schedule();

this.post();

this.postHandle();

this.invokeHooks();

}

\bullet init()方法:reader和writer的初始化

private void init() {

......

JobPluginCollector jobPluginCollector =new DefaultJobPluginCollector(

this.getContainerCommunicator());

//必须先Reader ,后Writer

this.jobReader =this.initJobReader(jobPluginCollector);

this.jobWriter =this.initJobWriter(jobPluginCollector);

}

\bullet schedule()方法:

任务调度器schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中。

private void schedule() {

/**

* 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务

*/

    List taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,

this.needChannelNumber, channelsPerTaskGroup);

......

AbstractScheduler scheduler;

scheduler = initStandaloneScheduler(this.configuration);

scheduler.schedule(taskGroupConfigs);

......

/** * 检查任务执行情况 */this.checkLimit();

this.checkLimit();

}


\bullet post()方法: 启动各类数据库插件的读写任务。

private void post() {

this.postJobWriter();

this.postJobReader();

}

4. Spring Boot集成DataX

在springboot项目上,通过POM文件引入datax相关jar包

<dependency>

<groupId>com.alibaba.datax</groupId>

<artifactId>datax-core</artifactId>

<version>0.0.1-SNAPSHOT</version>

</dependency>

同时需要引入数据源读取和写入相关的Reader/Writer插件

<dependency>

<groupId>com.oracle</groupId>

<artifactId>ojdbc6</artifactId>

<version>11.2.0.3</version>

<scope>system</scope>

<systemPath>${basedir}/src/main/lib/ojdbc6-11.2.0.3.jar</systemPath>

</dependency>

<dependency>

<groupId>com.microsoft.sqlserver</groupId>

<artifactId>sqljdbc4</artifactId>

<version>4.0</version>

<scope>system</scope>

<systemPath>${basedir}/src/main/lib/sqljdbc4-4.0.jar</systemPath>

</dependency>

这里引入mysql 及oracle数据源对应的插件

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容