Datax 按照时间戳的定时增量抽取脚本

使用技术: Datax /crontab /python

大概看了下python的语法,因为需要增量做数据抽取,手上暂时没有趁手的兵器,就先用datax吧。 网上有其他方案https://blog.csdn.net/quadimodo/article/details/82186788,貌似写死的执行时间,没采用这种方法。

1. 前提步骤:

安装datax及配置

datax目录:/home/datax/datax/

脚本目录:/home/datax/jobs

log目录:/home/datax/jobs/log/

2. 编写基于时间戳的执行脚本(python脚本)

python脚本语言,写起来也比较方便快捷,我是使用vim编写的,不能写中文,没再继续研究。

脚本思路:

1) 如果使用datax去做增量,需要根据某个时间去判断,那需要将时间传给实际执行的datax.py脚本中去执行,datax支持脚本变量

  1. 基于时间戳做增量,时间戳是一个参数并且是一个可变量,我的思路是将这个文件放到一个X.record的文件中记录时间,本次执行的时候获得上次执行时间lastExecuteTime和当前时间currentTime,抽取的数据就是介于此二者之间的数据。(此处有个小坑,就是datax脚本里面根据时间选择的时候应该是大于等于lastExecuteTime还是小于等于currentTime,数量大的时候抽取数据是不一样的,建议大于等于lastExecuteTime 且小于currentTime)

3) 定时任务使用linux系统的crontab做定时,时间不能设置过短。


[root@localhost ~]# cat /home/datax/jobTimer/dataxScheduler.py

#encoding="utf-8"

# two args , first: datax config file path, logfile path

import time

import sys

import os

print("going to execute")

configFilePath = sys.argv[1]

logFilePath = sys.argv[2]

lastTimeExecuteRecord = sys.argv[3]

print "=============================================="

print "configFilePath      :", configFilePath

print "configFilePath      :", logFilePath

print "lastTimeExecute File :",lastTimeExecuteRecord

print "=============================================="

lastExecuteTime=""

try:

    fo = open(lastTimeExecuteRecord, "r")

    lastExecuteTime = fo.read()

except IOError:

    lastExecuteTime = '1970-01-01 00:00:00'

print("last time execute time:  " + lastExecuteTime)

currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

print("currentTime is        :"+ currentTime)

#os.system("python /home/datax/datax/bin/datax.py " + configFilePath + " --lastTime" +  lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath)

script2execute  = "python /home/datax/datax/bin/datax.py %s -p \"-DlastTime='%s' -DcurrentTime='%s'\" >> %s"%( configFilePath, lastExecuteTime, currentTime,logFilePath)

print("to be excute script:"+script2execute)

os.system(script2execute)

print("script execute ending")

# update timestamp to file

fo = open(lastTimeExecuteRecord, "w+")

fo.write(currentTime)

fo.close()

print "ending---",lastTimeExecuteRecord

3. 编写完后修改datax的执行配置json文件


{

"job": {

"setting": {

"speed":{

"byte": 10485760,

"channel": "5"

}

},

"content": [

{

"reader": {

"name":"oraclereader",

"parameter": {

"column": ["...."],  // 自定义

"connection": [

{

"jdbcUrl":["jdbc:oracle:thin:@。。。"], // 自定义

"table": ["table_test"]// 自定义

}

],

"where": " copy_time >= to_date('${lastTime}', 'yyyy-MM-dd HH24:mi:ss') and copy_time< to_date('${currentTime}', 'yyyy-MM-dd HH24:mi:ss')",  // 读取增量数据的逻辑

"password":"***", // 自定义

"username":"***" // 自定义

}

},

"writer": {

"name":"oraclewriter",

"parameter":{

"preSql": [

"delete from ***"

],

"column":[ "。。。"],// 自定义

                       "connection":[

                           {

                               "jdbcUrl":"。。。",// 自定义

                               "table":[

                                   "。。。"// 自定义

                               ]

                           }

                       ],

                       "password":"***",// 自定义

                       "username":"***"// 自定义

                   }

}}]

}}

4. 执行


python /home/datax/jobTimer/dataxScheduler.py \

  '/home/datax/jobs/test_job.json'  \

'/home/datax/jobs/log/test_job.log'  \

'/home/datax/jobTimer/record/test_job.record'

执行过后: test_job.record中文件内容为


[root@xxxx jobs]# cat /home/datax/jobTimer/record/test_job.record

2019-04-18 09:40:01

5. 这个方案的不足之处和改进方案

1) 由于datax没法做数据update,对于不存在update和delete的场景,使用这个方案还可以

2) 有些业务数据表的数据更新时间可能不准确,使用触发器的方式产生新的时间戳会更可靠点

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容