本文为学习笔记,会随着学习深入持续更新,仅供参考
一、datax干嘛的和之前的seatunnel使用上有什么区别
DataX 是阿里巴巴开发的一个开源数据同步工具。
DataX :更加专注于批量数据的同步和迁移,适用于数据仓库建设、数据备份和数据迁移等场景。主要支持批处理模式,通过配置 JSON 文件来定义数据同步任务。不依赖于其他大数据计算框架。
SeaTunnel :侧重于实时数据处理和批处理任务,适用于实时 ETL、数据流处理和数据分析等场景。支持流处理和批处理两种模式,更适合实时数据处理任务。基于 Apache Spark 和 Apache Flink 构建,具有分布式计算的优势和扩展性。
二、dataX实现数据全量同步举例
1. 安装 DataX
首先,需要安装 DataX。可以从 GitHub 上下载 DataX 源码或预编译的二进制包:
git clone https://github.com/alibaba/DataX.git
进入 DataX 目录并编译:
cd DataX
mvn clean package assembly:assembly -Dmaven.test.skip=true
2. 配置 JSON 文件
DataX 使用 JSON 文件来配置数据同步任务。以 MySQL 到 Hadoop 的数据同步为例,首先需要创建一个 JSON 配置文件,例如 mysql_to_hadoop.json
。
以下是一个简单的配置示例:
{
"job": {
"setting": {
"speed": {
"channel": 3 // 并行度,可以根据需要调整
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "your_mysql_username",
"password": "your_mysql_password",
"connection": [
{
"table": ["your_table_name"],
"jdbcUrl": ["jdbc:mysql://your_mysql_host:3306/your_database"]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://your_hadoop_cluster",
"fileType": "text",
"path": "/user/hadoop/your_hdfs_path/",
"fileName": "your_file_name",
"column": [
{"name": "column1", "type": "string"},
{"name": "column2", "type": "int"},
{"name": "column3", "type": "date"}
],
"writeMode":"truncate",
"fieldDelimiter": "\t"
}
}
}
]
}
}
3. 配置项说明
- reader 部分:配置 MySQL 数据源的连接信息,包括用户名、密码、表名和 JDBC URL。
-
writer 部分:配置 Hadoop HDFS 的目标路径、文件类型(如 text、orc、parquet 等)、文件名、字段信息及写入模式。
setting:任务的全局设置。
在 DataX 的 hdfswriter 配置中,writeMode 参数用于指定写入数据到目标 HDFS 时的模式
append:追加模式。如果目标路径下已经存在文件,将在文件末尾追加新数据,而不会覆盖原有数据。
nonConflict:非冲突模式。如果目标路径下已经存在文件,任务会报错并终止,以避免覆盖现有文件。
truncate:截断模式。如果目标路径下已经存在文件,将删除现有文件,然后写入新数据
hadoopConfig:传递 Hadoop 相关配置项,如果配置了 dfs.replication 为 1,表示每个文件块只保留一个副本。生产环境可以相应提高。
参数详细说明
4. 执行同步任务
在命令行中执行以下命令运行 DataX 任务:
python {DATAX_HOME}/bin/datax.py /path/to/your/mysql_to_hadoop.json
5. 检查同步结果
同步完成后,可以通过 HDFS 命令行或其他工具检查数据是否成功写入到指定的 HDFS 路径。
hdfs dfs -ls /user/hadoop/your_hdfs_path/
hdfs dfs -cat /user/hadoop/your_hdfs_path/your_file_name
三、DataX实现数据增量同步举例
a. 时间戳字段或自增字段:通过数据表中的时间戳字段(如 last_update_time
)或自增字段(如 id
),在每次同步时记录最后同步的位置,下次同步时只同步新增加或更新的数据。
b. 支持的插件:一些 DataX 插件支持增量同步,如 MySQL Reader 和 Writer 插件,通过配置 SQL 查询条件来实现增量同步。
示例配置
还是以mysql到hdfs为例。
1. 配置文件
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${username}",
"password": "${password}",
"column": [
"id", "name", "update_time"
],
"splitPk": "id",
"connection": [
{
"table": [
"${table}"
],
"jdbcUrl": [
"jdbc:mysql://${host}:${port}/${database}"
]
}
],
"where": "update_time > '${last_update_time}'"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://${hdfs_host}:${hdfs_port}",
"fileType": "text",
"path": "/path/to/hdfs/${table}",
"fileName": "${table}",
"column": [
{"name": "id", "type": "bigint"},
{"name": "name", "type": "string"},
{"name": "update_time", "type": "timestamp"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "gzip",
"hadoopConfig": {
"dfs.replication": "1"
}
}
}
}
]
}
}
2. 使用脚本自动更新时间戳
为了实现真正的增量同步,需要在每次同步完成后更新 last_update_time
。可以通过脚本来实现:
#!/bin/bash
# MySQL 配置信息
MYSQL_HOST="your_mysql_host"
MYSQL_PORT="your_mysql_port"
MYSQL_DATABASE="your_database"
MYSQL_USERNAME="your_username"
MYSQL_PASSWORD="your_password"
# HDFS 配置信息
HDFS_HOST="your_hdfs_host"
HDFS_PORT="your_hdfs_port"
# 定义变量
LAST_UPDATE_FILE="last_update_time.txt"
JOB_TEMPLATE="job_template.json"
TABLES=("table1" "table2" "table3") # 需要同步的表名列表
# 读取最后一次同步的时间戳
if [ ! -f "$LAST_UPDATE_FILE" ]; then
echo "1970-01-01 00:00:00" > $LAST_UPDATE_FILE
fi
LAST_UPDATE_TIME=$(cat $LAST_UPDATE_FILE)
# 遍历所有表并生成配置文件
for TABLE in "${TABLES[@]}"; do
JOB_FILE="job_${TABLE}.json"
sed -e "s/\${username}/${MYSQL_USERNAME}/g" \
-e "s/\${password}/${MYSQL_PASSWORD}/g" \
-e "s/\${host}/${MYSQL_HOST}/g" \
-e "s/\${port}/${MYSQL_PORT}/g" \
-e "s/\${database}/${MYSQL_DATABASE}/g" \
-e "s/\${table}/${TABLE}/g" \
-e "s/\${last_update_time}/${LAST_UPDATE_TIME}/g" \
-e "s/\${hdfs_host}/${HDFS_HOST}/g" \
-e "s/\${hdfs_port}/${HDFS_PORT}/g" \
$JOB_TEMPLATE > $JOB_FILE
# 执行 DataX 同步任务
python /path/to/datax/bin/datax.py $JOB_FILE
# 检查是否成功
if [ $? -ne 0 ]; then
echo "DataX job for table ${TABLE} failed!"
exit 1
fi
done
# 更新 last_update_time
NEW_LAST_UPDATE_TIME=$(date '+%Y-%m-%d %H:%M:%S')
echo $NEW_LAST_UPDATE_TIME > $LAST_UPDATE_FILE
echo "DataX incremental sync completed successfully."
四、其他问题
1、如果想设置mysql和hdfs中的字段不一致怎么设置?
-
reader:
- column:定义从 MySQL 读取的列名。这些列名需要与数据库中的实际列名一致。
-
writer:
-
column:定义写入 HDFS 的列名和数据类型。这里可以与
reader
中的列名不同,实现字段名的映射和数据类型的转换。
-
column:定义写入 HDFS 的列名和数据类型。这里可以与
注意事项
-
字段顺序:确保
reader
和writer
中的列顺序一致,以保证数据正确映射。例如,reader
读取的第一个列id
将映射到writer
的第一个列user_id
。 -
数据类型转换:DataX 在处理数据时会尝试进行类型转换,但为了避免数据丢失或格式错误,尽量确保源数据类型和目标数据类型兼容。例如,将 MySQL 的
DECIMAL
转换为 HDFS 的DOUBLE
。 -
数据验证:在数据同步过程中,可以通过设置
errorLimit
来控制允许的错误数量和比例,以便及时发现和处理数据转换中的问题。
参考链接
1、官方文档