DataX 增量同步数据

全量数据导出请查看DataX mongodb导出数据到mysql

Datax UDF手册

datax.py mongodb2mysql_inc.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 4
      }
    },
    "content": [{
      "reader": {
        "name": "mongodbreader",
        "parameter": {
          "address": ["*.*.*.*:27017"],
          "userName": "DataXTest",
          "userPassword": "123456",
          "dbName": "weixin",
          "collectionName": "fileids_wxpy",
          "column": [{
            "index":0,
            "name": "_id",
            "type": "string"
          }, {
            "index":1,
            "name": "crawler_time",
            "type": "string"
          }, {
            "index":2,
            "name": "file_url",
            "type": "string"
          }, {
            "index":3,
            "name": "flag",
            "type": "string"
          }, {
            "index":4,
            "name": "logo_url",
            "type": "string"
          }, {
            "index":5,
            "name": "source",
            "type": "string"
          }, {
            "index":6,
            "name": "update_date",
            "type": "string"
          }, {
            "index":7,
            "name": "update_time",
            "type": "long"
          }, {
            "index":8,
            "name": "wx_id",
            "type": "string"
          }, {
            "index":9,
            "name": "wx_name",
            "type": "string"
          }]
        }
      },
       "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",
                        "crawler_time",
                        "file_url",
                        "flag",
                        "logo_url",
                        "source",
                        "update_date",
                        "update_time",
                        "wx_id",
                        "wx_name"
            ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8", 
                                "table": ["fileids_wxpy"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                },
                "transformer": [
                    {
                        "name": "dx_filter",
                        "parameter": 
                            {
                            "columnIndex":1,
                            "paras":["<","1560493441"]
                            }  
                    }
                ]
    }]
  }
}

运行

# python 环境为2.7
python datax.py mongodb2mysql_inc.json

运行结果

2019-06-14 15:22:58.886 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-06-14 15:22:58.886 [job-0] INFO  StandAloneJobContainerCommunicator - Total 53 records, 18669 bytes | Speed 93B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Transfermor Success 51848 records | Transformer Error 0 records | Transformer Filter 51795 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 15:22:58.887 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-06-14 15:19:37
任务结束时刻                    : 2019-06-14 15:22:58
任务总计耗时                    :                201s
任务平均流量                    :               93B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                  53
读写失败总数                    :                   0

2019-06-14 15:22:58.887 [job-0] INFO  JobContainer - 
Transformer成功记录总数         :               51848
Transformer失败记录总数         :                   0
Transformer过滤记录总数         :               51795

扩展: 定时同步实现

  • mysql_max_timestamp2csv.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://*.*.*.*:x:3306/weixin?characterEncoding=utf8"], 
                                "querySql": [
                                    "SELECT max(crawler_time) FROM fileids_wxpy"
                                ]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "fileName": "mysql_max_timestamp_result",
                        "fileFormat": "csv",
                        "path": "/root/datax/bin",
                        "writeMode": "truncate"
                    }
                }
            }
        ],
        "setting": { 
            "speed": {
                    "channel": 2
                }
        }
    }
}
  • datax.py mongodb2mysql_inc.json(这里与上面的同名文件只有过滤条件时间戳不同,此处固定为"timestamp",方便shell脚本替换更新)
{
  "job": {
    "setting": {
      "speed": {
        "channel": 4
      }
    },
    "content": [{
      "reader": {
        "name": "mongodbreader",
        "parameter": {
          "address": ["*.*.*.*:27017"],
          "userName": "DataXTest",
          "userPassword": "123456",
          "dbName": "weixin",
          "collectionName": "fileids_wxpy",
          "column": [{
            "index":0,
            "name": "_id",
            "type": "string"
          }, {
            "index":1,
            "name": "crawler_time",
            "type": "string"
          }, {
            "index":2,
            "name": "file_url",
            "type": "string"
          }, {
            "index":3,
            "name": "flag",
            "type": "string"
          }, {
            "index":4,
            "name": "logo_url",
            "type": "string"
          }, {
            "index":5,
            "name": "source",
            "type": "string"
          }, {
            "index":6,
            "name": "update_date",
            "type": "string"
          }, {
            "index":7,
            "name": "update_time",
            "type": "long"
          }, {
            "index":8,
            "name": "wx_id",
            "type": "string"
          }, {
            "index":9,
            "name": "wx_name",
            "type": "string"
          }]
        }
      },
       "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",
                        "crawler_time",
                        "file_url",
                        "flag",
                        "logo_url",
                        "source",
                        "update_date",
                        "update_time",
                        "wx_id",
                        "wx_name"
            ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8", 
                                "table": ["fileids_wxpy"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                },
                "transformer": [
                    {
                        "name": "dx_filter",
                        "parameter": 
                            {
                            "columnIndex":1,
                            "paras":["<","timestamp"]
                            }  
                    }
                ]
    }]
  }
}
  • cron_datax_mongodb2mysql.sh
python2 /root/datax/bin/datax.py /root/datax/bin/mysql_max_timestamp2csv.json
# $?是shell变量,表示"最后一次执行命令"的退出状态.0为成功,非0为失败, -ne 为不等于
if [ $? -ne 0 ]; then
  echo "minute_data_sync.sh error, can not get max_time from target db!"
  exit 1
fi
# 找到 DataX 写入的文本文件,并将内容读取到一个变量中
RESULT_FILE=`ls /root/datax/bin/mysql_max_timestamp_result_*`
MAX_TIME=`cat $RESULT_FILE`
echo "$RESULT_FILE   $MAX_TIME"
# 如果最大时间戳不为 null 的话, 修改全部同步的配置,进行增量更新;
if [ "$MAX_TIME" != "null" ]; then
  # 设置增量更新过滤条件
  WHERE="$MAX_TIME"
  # 将timestamp字符串替换为上次同步的最大时间戳
  sed "s/timestamp/$WHERE/g" mongodb2mysql_inc.json > mongodb2mysql_inc_tmp.json
  #echo "增量更新"
  python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql_inc_tmp.json
  # 删除临时文件
  rm ./mongodb2mysql_inc_tmp.json
else
  # echo "全部更新"
  python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql.json
fi
  • 通过linux 自带的crontab进行定时管理
30 22 * * * cd /root/datax/bin && sh cron_datax_mongodb2mysql.sh >>/root/datax/bin/cron_datax_mongodb2mysql.log
  • 定时运行日志
vim /root/datax/bin/cron_datax_mongodb2mysql.log
······
2019-06-14 17:14:36.178 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-06-14 17:14:36.178 [job-0] INFO  StandAloneJobContainerCommunicator - Total 65 records, 22919 bytes | Speed 114B/s, 0 records/s | Error 1 records, 350 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 171.039s | Transfermor Success 52013 records | Transformer Error 0 records | Transformer Filter 51948 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 17:14:36.179 [job-0] INFO  JobContainer -
任务启动时刻                    : 2019-06-14 17:11:13
任务结束时刻                    : 2019-06-14 17:14:36
任务总计耗时                    :                202s
任务平均流量                    :              114B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                  65
读写失败总数                    :                   1

2019-06-14 17:14:36.179 [job-0] INFO  JobContainer -
Transformer成功记录总数         :               52013
Transformer失败记录总数         :                   0
Transformer过滤记录总数         :               51948

总结

  • 优点: 就不说了,太多了
  • 缺点:缺乏对增量更新的内置支持,但因为DataX的灵活架构,可以通过shell脚本等方式方便实现增量同步

对于DataX中支持querySql语法的源数据库推荐参考下文使用 DataX 增量同步数据,从数据源头过滤数据,可以很好的提高同步效率

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容