DataX简介
DataX是什么?它是干什么用的?下面是官方给的介绍。
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
设计理念
传统的异构数据库之间的同步使用的是一对一的同步策略,实现起来极其复杂,如下图左边的同步模型。DataX的出现打破了这种观念,使用中心化的方式,将复杂的网状的同步链路变成了星型数据链路,如下图右边的同步模型。DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步,加大了代码的复用性。
DataX本身采用Framework + plugin的架构,将数据源读取和写入抽象成为Reader/Writer插件。
- Framework:Framework用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
- Reader:DataX采集模块,负责将数据源的数据输入到DataX的Framework。
- Writer:负责从DataX的Framework中拉取数据,写入到对应的目的地。
DataX提供了丰富的插件,足够覆盖大多数场景,此外DataX还支持自定义插件,以满足个性化需求。DataX自定义插件开发指南
DataX核心架构
DataX 3.0 开源版本支持单机多线程模式完成同步作业运行(从源码上看是支持集群运行的)。
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作(从源码里面看其实是先启动Writer,再启动Reader)。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。(通过该退出值可以判断程序是否异常,可以做一些相应的操作,如:邮件告警)
安装部署
准备工作
- 下载:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
- 在本地或者服务器上解压
- 需要python 2.6+环境(DataX的启动脚本是python文件,也可以自己将里面执行jar包的命令拿出来)
测试
python bin/datax.py job/job.json
出现下图内容说明DataX功能测试完成
开始开发
下面以比较简单的mysql -> mysql为例讲解.
- 官方提供的文档默认是使用table模式的。该模式配置起来比较麻烦,下面例子采用querySql模式,使用起来简单易懂。
- writeMode:包含insert, replace, update,其中update采用的是INSERT... ON DUPLICATE KEY UPDATE 的方式,意思是当insert已经存在的记录时,执行update
- 在writer阶段可以执行preSql,表示在插入数据之前需要执行的操作
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"querySql": ["SELECT * FROM table1"],
"jdbcUrl": ["jdbc:mysql://localhost:3306/datax?useSSL=false&useUnicode=true&characterEncoding=UTF-8"]
}
],
"password": "root",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"writeMode": "update",
"preSql": [
"delete from table2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/datax?useSSL=false&useUnicode=true&characterEncoding=UTF-8",
"table": ["table2"]
}
],
"password": "root",
"username": "root"
}
}
}
],
"setting": {
"speed": {
"byte": 1048576,
"channel": 5
}
}
}
}
验证
执行DataX启动脚本以验证结果:bin/datax.py job/test.josn