数据迁移工具DataX入门

一、DataX是什么

DataX是阿里巴巴开源的离线数据同步工具,实现了包括主流RDBMS数据库、NoSQL、大数据计算系统在内的多种异构数据源之间高效进行数据同步的功能。

二、为什么要使用DataX

DataX设计理念

为了解决异构数据源的同步问题,DataX将复杂的网状同步链路优化成了星型数据链路,由DataX作为中间传输载体来负责连接各种数据源,以此来降低整个异构数据源同步链路的复杂度。当需要新接入一个数据源的时候,只需要考虑将该新的数据源对接到DataX即可,就能跟已有的所有数据源无缝同步。

DataX架构设计

DataX由FrameWork+Plugin的形式构建,数据源的读取和写入分别Reader和Writer实现:

  • Reader,数据采集模块,负责采集数据源中的数据,并将数据发送给FrameWork;
  • Writer,数据写入模块,负责从Framework中取数据,并将数据写入到数据源中;
  • Framework,用于连接Reader和Writer,作为以上两者的数据传输通道,处理缓冲、流量控制、并发、数据转换等核心问题。
DataX工作原理

DataX的工作模式是单机多线程形式,不支持分布式的方式,这是它和其它数据同步工具的重要区别之一。

  1. 每一个数据同步作业,我们称之为Job,在DataX收到一个Job之后,就启动一个进程来完成整个作业的过程。如果有多个数据同步作业需要同时执行,那就要么排队等待,要么再启动一个DataX进程。Job模块是作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup管理等功能。

  2. Job启动后,会根据不同的源端切分策略,将Job切分为多个小的任务Task,以便于多线程并发执行它们。Task便是DataX中的最小执行单元,每一个Task都负责一部分数据的同步工作。

  3. 切分好Task之后,Job会调用Scheduler模块,根据配置的并发任务数将Task重新组合,组装成TaskGroup,每一个TaskGroup负责以一定的并发度来运行分配好的所有Task,默认情况下的并发度为5。

  4. 每一个Task都有TaskGroup负责启动和控制,Task启动后,会按照上图中介绍的Reader-Channel-Writer来完成其对应的数据同步工作。

  5. Job负责监控并等待多个TaskGroup任务完成后就退出,否则异常退出。

DataX还具有如下的核心优势:

  1. 可靠的数据质量监控,能解决数据传输过程中类型失真问题,提供作业全链路流量和数据量的监控,并提供脏数据探测功能;
  2. 丰富的数据转换功能,作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能,还可以让数据在传输过程中轻松完成数据脱敏、数据补全、数据过滤等数据转换功能,还可以让用户自定义转换函数使用;
  3. 精准的速率控制,提供了通道控制、记录流控制、字节流控制三种流控模式,可以随意控制作业速率,让作业在存储介质可承受的范围内达到最佳的同步速率;
  4. 强劲的同步性能;
  5. 健壮的容错机制,在遇到外部因素干扰(网络、数据源不稳定)情况下,会自动进行线程级别、进程级别、作业级别的局部或者全局重试,保证用户作业的稳定运行;
  6. 极简使用体验,提供Linux和Windows版本,下载即可使用,作业过程中,会打印大量的关键信息,包括传输速度、插件性能、进程的CPU、JVM和GC情况等,任务结束后还会打印该任务的总体运行情况。

和其它大数据ETL工具相比:

功能 DataX Sqoop
运行模式 单进程多线程 MR分布式
MySQL读写 单机压力大,读写粒度容易控制 MR模式重,出错后处理麻烦
Hive读写 单机压力大 很好
文件格式 orc支持 orc不支持,可添加
分布式 不支持,可通过调度框架规避 支持
流控 无,需要定制
统计信息 无,分布式的数据搜集不方便
数据校验 无,分布式的数据搜集不方便
监控 无,需要定制

如果需要支持的数据源比较多,建议使用DataX,如果数据来源比较单一,且只是要导入到HDFS,流程很简单,可以考虑使用Sqoop。

三、如何使用DataX

3.1 下载和安装

  • 准备好Linux服务器;
  • 安装Java运行环境;
  • 安装Python运行环境;
  • 下载Datax工具包,解压缩到合适的目录;

进入到datax的bin目录下,运行自带的示例:

python datax.py /root/datax/datax/job/job.json

运行后控制台显示运行成功即表示DataX安装完成。

3.2 Job描述介绍

当你决定使用某个Reader和Writer之后,我们可以通过命令来获取一个模板Json:

# 指定需要的Reader和Writer来获取模板Json
python datax.py -r streamreader -w streamwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

# 指定需要的Reader和Writer的文档给地址
Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

# 说明定制好的Json如何启动
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        # 指定列信息
                        "column": [],
                        # 指定记录条数
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        # 是否将数据打印到控制台
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                # 指定并发度,可以是channel、bytes、records三种类型
                "channel": ""
            }
        }
    }
}

大体分为四个部分:

  • Job描述
  • Reader描述和配置
  • Writer描述和配置
  • Job参数配置

我们复制如上json内容,新建一个stream.json放到/root/zx-test下面,内容设置如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"zx-test"
                            },
                            {
                                "type":"string",
                                "value":"999"
                            }
                        ],
                        "sliceRecordCount": "10"
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "UTF-8",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

然后输入命令执行:

python datax.py /root/zx-test/stream.json

...
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
2023-01-14 11:00:32.389 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[101]ms
2023-01-14 11:00:32.390 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2023-01-14 11:00:42.283 [job-0] INFO  StandAloneJobContainerCommunicator - Total 10 records, 100 bytes | Speed 10B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-01-14 11:00:42.284 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2023-01-14 11:00:42.284 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2023-01-14 11:00:42.284 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.
2023-01-14 11:00:42.284 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...

3.3 mysql-hdfs案例

前提条件:

  1. 准备一个MySQL数据库;
  2. 准备一个健康的HDFS集群;

首先,我们创建一张数据表,并创建一些数据:

CREATE TABLE `zx_user` (
  `user_id` bigint(20) NOT NULL COMMENT '用户ID',
  `user_name` varchar(30) DEFAULT NULL COMMENT '用户姓名',
  `age` int(11) DEFAULT NULL COMMENT '用户年龄',
  `user_email` varchar(50) DEFAULT NULL COMMENT '用户邮箱',
  `create_by` varchar(100) DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_by` varchar(100) DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  `deleted` int(11) DEFAULT '0' COMMENT '0-未删除;1-已删除',
  PRIMARY KEY (`user_id`),
  UNIQUE KEY `zx_user_un` (`user_id`),
  UNIQUE KEY `zx_user_un2` (`user_email`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO zx_user (user_id,user_name,age,user_email,create_by,create_date,update_by,update_date,deleted) VALUES
     (1,'Jone',18,'test1@baomidou.com',NULL,NULL,NULL,NULL,0),
     (2,'Jack',20,'test2@baomidou.com',NULL,NULL,NULL,NULL,0),
     (5,'Anna',27,'test5@baomidou.com',NULL,NULL,NULL,NULL,0),
     (7,'Anna',27,'test7@baomidou.com','System','2021-06-17 14:36:03','System','2021-06-17 14:54:51',0);

然后我们需要获取一个mysql-hdfs案例的json示例:python datax.py -r mysqlreader -w hdfswriter,并根据实际情况进行修改:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "user_id",
                            "user_name",
                            "age",
                            "user_email"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://x.x.x.x:3306/zhangxun"
                                ],
                                "table": [
                                    "zx_user"
                                ]
                            }
                        ],
                        "password": "******",
                        "username": "root",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name":"user_id",
                                "type":"BIGINT"
                            },
                            {
                                "name":"user_name",
                                "type":"STRING"
                            },
                            {
                                "name":"age",
                                "type":"INT"
                            },
                            {
                                "name":"user_email",
                                "type":"STRING"
                            }
                        ],
                        "compress": "NONE",
                        "defaultFS": "hdfs://x.x.x.x:9000",
                        "fieldDelimiter": ",",
                        "fileName": "zx_user",
                        "fileType": "text",
                        "path": "/zx/datax",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

关于mysqlreader和hdfswriter的详细参数配置项可以参考官方对应插件的文档,上面写的都很详细。

然后我们执行命令开始Job:

python datax.py /root/zx-test/mysql2hdfs.json
...
2023-01-14 15:03:15.319 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...
2023-01-14 15:03:15.422 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-01-14 15:03:04
任务结束时刻                    : 2023-01-14 15:03:15
任务总计耗时                    :                 11s
任务平均流量                    :               10B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   4
读写失败总数                    :                   0

显示执行成功了,我们打开HDFS文件浏览器查看文件确实已经存在了:

mysql2hdfs结果

我们查看其中的内容为:

[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user__d81bd99e_0d6f_45a1_9a80_04ca475fc83d
1,Jone,18,test1@baomidou.com
2,Jack,20,test2@baomidou.com
5,Anna,27,test5@baomidou.com
7,Anna,27,test7@baomidou.com

3.4 hdfs-mysql案例

在以上3.3案例的基础上,我们首先获取一个案例json,python datax.py -r hdfswriter -w mysqlreader,然后修改成需要的:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": ["*"],
                        "defaultFS": "hdfs://x.x.x.x:9000",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "path": "/zx/datax/zx_user.txt"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "user_id",
                            "user_name",
                            "age",
                            "user_email"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://x.x.x.x:3306/zhangxun",
                                "table": [
                                    "zx_user"
                                ]
                            }
                        ],
                        "password": "******",
                        "preSql": [],
                        "session": [],
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

然后,我们准备一个zx_user.txt,编辑内容如下:

11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com
[root@bigdata01 hadoop]# hdfs dfs -put /root/zx-test/zx_user.txt /zx/datax
[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user.txt
11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com

如此将需要同步到MySQL数据库的数据文件准备好了,然后,我们执行同步命令:

python datax.py /root/zx-test/hdfs2mysql.json
...
2023-01-14 17:22:04.815 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2023-01-14 17:22:04.815 [job-0] INFO  JobContainer - DataX Reader.Job [hdfsreader] do post work.
2023-01-14 17:22:04.816 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...
任务启动时刻                    : 2023-01-14 17:21:53
任务结束时刻                    : 2023-01-14 17:22:04
任务总计耗时                    :                 11s
任务平均流量                    :                7B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

Job显示执行成功了,我们查询下数据库表中的数据,发现数据为:

1   Jone    18  test1@baomidou.com                  0
2   Jack    20  test2@baomidou.com                  0
5   Anna    27  test5@baomidou.com                  0
7   Anna    27  test7@baomidou.com  System  2021-06-17 14:36:03 System  2021-06-17 14:54:51 0
11  slide   21  slide@baomidou.com                  0
21  mify    20  mify@baomidou.com                   0
51  kitty   27  kitty@baomidou.com                  0

如此表示导入成功了。

四、总结

DataX由于是阿里巴巴开源的,中文文档比较完善,各种插件的说明教程也很全,基本很容易上手,本文旨在帮助入门,一些高级主题还需要另外实验和学习。

参考文档:

alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。 (github.com)

DataX/introduction.md at master · alibaba/DataX (github.com)

DataX/userGuid.md at master · alibaba/DataX (github.com)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容