本文为学习笔记,会随着学习深入持续更新,仅供参考
场景:mysql到hdfs;hdfs到doris
1、mysql到hdfs参考配置文件
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${username}",
"password": "${password}",
"column": [
"id"
],
"connection": [
{
"table": [
"${table}"
],
"jdbcUrl": [
"jdbc:MySQL://${host}:3306/${database}?useSSL=false&allowPublicKeyRetrieval=true"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://${hdfs_host}:${hdfs_port}",
"fileType": "text",
"path": "${path}",
"fileName": "${table}",
"column": [
{"name": "id", "type": "bigint"}
],
"writeMode":"append",
"fieldDelimiter": "\t",
"compress": "gzip",
"hadoopConfig": {
"dfs.replication": "1"
}
}
}
}
]
}
}
2、hdfs到doris参考配置文件
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "${hive_path}/${hive_table}/dt=${day}",
"defaultFS": "hdfs://${hdfs_host}:${hdfs_port}",
"fileType": "orc",
"column": [
{
"index": 0,
"name": "agg_time",
"type": "string"
},
{
"name": "dt",
"type": "string",
"value": "${day}"
}
],
"fieldDelimiter": "\t",
"encoding": "UTF-8",
"nullFormat": "\\N"
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": [
"${doris_host}:8030"
],
"column": [
"agg_time",
"time_day"
],
"username": "${doris_user}",
"password": "${doris_password}",
"postSql": [],
"preSql": [],
"flushInterval": 30000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://${doris_host}:9030/${doris_db}",
"table": [
"${doris_table}"
],
"selectedDatabase": "${doris_db}"
}
]
}
}
}
]
}
}
注意:
1、这里需要提取hive的分区时间(dt)作为doris的一个时间字段(time_day),处理方式为传入指定日期的时间,比如今天处理昨天的数据,这个时间就是昨天
2、doris的loadUrl的端口是fe的端口,下边的jdbcUrl的端口才是数据库的端口
3、这里的数据采用的追加的方式
4、json的格式要正确否则会报错
5、如果官方给的jar包,缺少一些reader或writer就需要改为源码安装
参考文件
1、DataX源码
2、Dolphinscheduler调度Datax任务读取Hive分区表案例
3、Dorsi官网
4、Doris写入时报Content-Length header already present异常处理