温故知新:初识DataX

本文为学习笔记,会随着学习深入持续更新,仅供参考

一、datax干嘛的和之前的seatunnel使用上有什么区别

DataX 是阿里巴巴开发的一个开源数据同步工具。
DataX :更加专注于批量数据的同步和迁移,适用于数据仓库建设、数据备份和数据迁移等场景。主要支持批处理模式,通过配置 JSON 文件来定义数据同步任务。不依赖于其他大数据计算框架。
SeaTunnel :侧重于实时数据处理和批处理任务,适用于实时 ETL、数据流处理和数据分析等场景。支持流处理和批处理两种模式,更适合实时数据处理任务。基于 Apache Spark 和 Apache Flink 构建,具有分布式计算的优势和扩展性。


二、dataX实现数据全量同步举例

1. 安装 DataX

首先,需要安装 DataX。可以从 GitHub 上下载 DataX 源码或预编译的二进制包:

git clone https://github.com/alibaba/DataX.git

进入 DataX 目录并编译:

cd DataX
mvn clean package assembly:assembly -Dmaven.test.skip=true

2. 配置 JSON 文件

DataX 使用 JSON 文件来配置数据同步任务。以 MySQL 到 Hadoop 的数据同步为例,首先需要创建一个 JSON 配置文件,例如 mysql_to_hadoop.json
以下是一个简单的配置示例:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3  // 并行度,可以根据需要调整
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "your_mysql_username",
            "password": "your_mysql_password",
            "connection": [
              {
                "table": ["your_table_name"],
                "jdbcUrl": ["jdbc:mysql://your_mysql_host:3306/your_database"]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://your_hadoop_cluster",
            "fileType": "text",
            "path": "/user/hadoop/your_hdfs_path/",
            "fileName": "your_file_name",
            "column": [
              {"name": "column1", "type": "string"},
              {"name": "column2", "type": "int"},
              {"name": "column3", "type": "date"}
            ],
            "writeMode":"truncate",
            "fieldDelimiter": "\t"
          }
        }
      }
    ]
  }
}

3. 配置项说明

  • reader 部分:配置 MySQL 数据源的连接信息,包括用户名、密码、表名和 JDBC URL。
  • writer 部分:配置 Hadoop HDFS 的目标路径、文件类型(如 text、orc、parquet 等)、文件名、字段信息及写入模式。
    setting:任务的全局设置。
    在 DataX 的 hdfswriter 配置中,writeMode 参数用于指定写入数据到目标 HDFS 时的模式
    append:追加模式。如果目标路径下已经存在文件,将在文件末尾追加新数据,而不会覆盖原有数据。
    nonConflict:非冲突模式。如果目标路径下已经存在文件,任务会报错并终止,以避免覆盖现有文件。
    truncate:截断模式。如果目标路径下已经存在文件,将删除现有文件,然后写入新数据
    hadoopConfig:传递 Hadoop 相关配置项,如果配置了 dfs.replication 为 1,表示每个文件块只保留一个副本。生产环境可以相应提高。
    参数详细说明

4. 执行同步任务

在命令行中执行以下命令运行 DataX 任务:

python {DATAX_HOME}/bin/datax.py /path/to/your/mysql_to_hadoop.json

5. 检查同步结果

同步完成后,可以通过 HDFS 命令行或其他工具检查数据是否成功写入到指定的 HDFS 路径。

hdfs dfs -ls /user/hadoop/your_hdfs_path/
hdfs dfs -cat /user/hadoop/your_hdfs_path/your_file_name

三、DataX实现数据增量同步举例

a. 时间戳字段或自增字段:通过数据表中的时间戳字段(如 last_update_time)或自增字段(如 id),在每次同步时记录最后同步的位置,下次同步时只同步新增加或更新的数据。

b. 支持的插件:一些 DataX 插件支持增量同步,如 MySQL Reader 和 Writer 插件,通过配置 SQL 查询条件来实现增量同步。

示例配置

还是以mysql到hdfs为例。

1. 配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "${username}",
            "password": "${password}",
            "column": [
              "id", "name", "update_time"
            ],
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "${table}"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://${host}:${port}/${database}"
                ]
              }
            ],
            "where": "update_time > '${last_update_time}'"
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://${hdfs_host}:${hdfs_port}",
            "fileType": "text",
            "path": "/path/to/hdfs/${table}",
            "fileName": "${table}",
            "column": [
              {"name": "id", "type": "bigint"},
              {"name": "name", "type": "string"},
              {"name": "update_time", "type": "timestamp"}
            ],
            "writeMode": "append",
            "fieldDelimiter": "\t",
            "compress": "gzip",
            "hadoopConfig": {
              "dfs.replication": "1"
            }
          }
        }
      }
    ]
  }
}

2. 使用脚本自动更新时间戳

为了实现真正的增量同步,需要在每次同步完成后更新 last_update_time。可以通过脚本来实现:

#!/bin/bash

# MySQL 配置信息
MYSQL_HOST="your_mysql_host"
MYSQL_PORT="your_mysql_port"
MYSQL_DATABASE="your_database"
MYSQL_USERNAME="your_username"
MYSQL_PASSWORD="your_password"
# HDFS 配置信息
HDFS_HOST="your_hdfs_host"
HDFS_PORT="your_hdfs_port"
# 定义变量
LAST_UPDATE_FILE="last_update_time.txt"
JOB_TEMPLATE="job_template.json"
TABLES=("table1" "table2" "table3")  # 需要同步的表名列表
# 读取最后一次同步的时间戳
if [ ! -f "$LAST_UPDATE_FILE" ]; then
  echo "1970-01-01 00:00:00" > $LAST_UPDATE_FILE
fi
LAST_UPDATE_TIME=$(cat $LAST_UPDATE_FILE)

# 遍历所有表并生成配置文件
for TABLE in "${TABLES[@]}"; do
  JOB_FILE="job_${TABLE}.json"
  sed -e "s/\${username}/${MYSQL_USERNAME}/g" \
      -e "s/\${password}/${MYSQL_PASSWORD}/g" \
      -e "s/\${host}/${MYSQL_HOST}/g" \
      -e "s/\${port}/${MYSQL_PORT}/g" \
      -e "s/\${database}/${MYSQL_DATABASE}/g" \
      -e "s/\${table}/${TABLE}/g" \
      -e "s/\${last_update_time}/${LAST_UPDATE_TIME}/g" \
      -e "s/\${hdfs_host}/${HDFS_HOST}/g" \
      -e "s/\${hdfs_port}/${HDFS_PORT}/g" \
      $JOB_TEMPLATE > $JOB_FILE
  # 执行 DataX 同步任务
  python /path/to/datax/bin/datax.py $JOB_FILE

  # 检查是否成功
  if [ $? -ne 0 ]; then
    echo "DataX job for table ${TABLE} failed!"
    exit 1
  fi
done
# 更新 last_update_time
NEW_LAST_UPDATE_TIME=$(date '+%Y-%m-%d %H:%M:%S')
echo $NEW_LAST_UPDATE_TIME > $LAST_UPDATE_FILE
echo "DataX incremental sync completed successfully."

四、其他问题

1、如果想设置mysql和hdfs中的字段不一致怎么设置?
  • reader

    • column:定义从 MySQL 读取的列名。这些列名需要与数据库中的实际列名一致。
  • writer

    • column:定义写入 HDFS 的列名和数据类型。这里可以与 reader 中的列名不同,实现字段名的映射和数据类型的转换。
注意事项
  1. 字段顺序:确保 readerwriter 中的列顺序一致,以保证数据正确映射。例如,reader 读取的第一个列 id 将映射到 writer 的第一个列 user_id
  2. 数据类型转换:DataX 在处理数据时会尝试进行类型转换,但为了避免数据丢失或格式错误,尽量确保源数据类型和目标数据类型兼容。例如,将 MySQL 的 DECIMAL 转换为 HDFS 的 DOUBLE
  3. 数据验证:在数据同步过程中,可以通过设置 errorLimit 来控制允许的错误数量和比例,以便及时发现和处理数据转换中的问题。

参考链接

1、官方文档

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容

  • github:https://github.com/alibaba/DataX[https://github.co...
    yichen_china阅读 2,840评论 0 0
  • 一、DataX Web是什么 DataX web是在DataX的基础上开发的分布式的数据同步工具,方便DataX的...
    文景大大阅读 14,700评论 1 12
  • 概览 DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS...
    tracy_668阅读 691评论 0 1
  • 1 DataX 1.1 引言 有个项目的数据量高达五千万,但是因为报表那块数据不太准确,业务库和报表库又是跨库操作...
    上善若泪阅读 821评论 0 1
  • DataX介绍: DataX 是阿里开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Or...
    达微阅读 4,644评论 0 10