优雅地使用 DataX 和 Python 同步数据

0. 前置条件

  • MySQL数据库
  • Python环境

1. 下载 DataX 工具 (可选)

cd /opt/
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
tar -zxvf datax.tar.gz

2. 配置项

3. 执行流程-DataX

  1. 由xxl-job或其他调度工具发起同步
  2. 同步任务 读取作业信息
  3. 数据库 读取对应 param_server_id, from_server_id, to_server_id 的连接方式
  4. (如有) 读取动态参数. 例如 param_sql 为
        select max(update_time) as start_time from ods_order
    
    则解析运行参数 run_param = {'start_time':'2026-01-01 11:11:11'}
  5. 生成最终取数sql, 例如 from_sql 为
        select a,b,c,d from erp_order where update_time>'{start_time}'
    
    则生成最终执行sql
        select a,b,c,d from erp_order where update_time>'2026-01-01 11:11:11'
    
  6. 用上述参数生成 datax json 配置文件到临时目录
  7. 生成 datax 命令行并执行任务
  8. 解析输出结果, 获取同步行数. 或读取报错信息
  9. 记录日志

4. 执行流程-Pandas

  1. 由xxl-job或其他调度工具发起同步
  2. 同步任务 读取作业信息
  3. 数据库 读取对应 param_server_id, from_server_id, to_server_id 的连接方式
  4. (如有) 读取动态参数. 例如 param_sql 为
        select max(update_time) as start_time from ods_order
    
    则解析运行参数 run_param = {'start_time':'2026-01-01 11:11:11'}
  5. 生成最终取数sql, 例如 from_sql 为
        select a,b,c,d from erp_order where update_time>'{start_time}'
    
    则生成最终执行sql
        select a,b,c,d from erp_order where update_time>'2026-01-01 11:11:11'
    
  6. 执行 before_write
  7. 读取 最终执行sql 结果到 DataFrame
  8. 写入目标数据
  9. 执行 after_write
  10. 记录日志

5. 完整部署步骤

5.1. 下载代码

  • git clone https://github.com/ts7ming/CheapETL
  • (或 git clone https://gitee.com/ts7ming/CheapETL)

5.2. 准备环境

  • 在MySQL执行 etl.sql

  • 创建 settings.py

    DS_CONFIG = {
        'conn_type': 'mysql',
        'host': 'localhost', 
        'username': 'root', 
        'password': 'qiming', 
        'port': '3306', 
        'db_name': 'dw'
    }
    WORK_DIR = '/app/CheapETL/'
    DATAX_PY = '/opt/datax/bin/data.py'
    PY_PATH = 'python3'
    

5.3. 添加数据源

  • 在 MySQL etl_server 表添加数据源id和连接信息
    • 如果用datax写入 doris, 需要单独新建数据源id, port值为 fe_port,be_port 例如 9030,8030

5.4. 配置同步任务

  • 在 MySQL etl_job_sync 表添加同步配置

5.5. 执行同步任务

  • 通过 xxl-job, crontab 或其他方式执行 python3 script_path sync_id param
    • 数据量小的同步: script_path = /app/CheapETL/sync.py
    • 数据量大的同步: script_path = /app/CheapETL/sync_datax.py
    • sync_id = etl_job_sync.id
    • param是指定参数, 优先级高于 etl_job_sync.param_sql
  • 例如: python3 /app/CheapETL/sync_datax.py 2001
  • 例如: python3 /app/CheapETL/sync.py 2002 --start_date "$(date -d '-1 day' +%Y-%m-%d)" --end_date "$(date +%Y-%m-%d)"
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容