使用技术: Datax /crontab /python
大概看了下python的语法,因为需要增量做数据抽取,手上暂时没有趁手的兵器,就先用datax吧。 网上有其他方案https://blog.csdn.net/quadimodo/article/details/82186788,貌似写死的执行时间,没采用这种方法。
1. 前提步骤:
安装datax及配置
datax目录:/home/datax/datax/
脚本目录:/home/datax/jobs
log目录:/home/datax/jobs/log/
2. 编写基于时间戳的执行脚本(python脚本)
python脚本语言,写起来也比较方便快捷,我是使用vim编写的,不能写中文,没再继续研究。
脚本思路:
1) 如果使用datax去做增量,需要根据某个时间去判断,那需要将时间传给实际执行的datax.py脚本中去执行,datax支持脚本变量
- 基于时间戳做增量,时间戳是一个参数并且是一个可变量,我的思路是将这个文件放到一个X.record的文件中记录时间,本次执行的时候获得上次执行时间lastExecuteTime和当前时间currentTime,抽取的数据就是介于此二者之间的数据。(此处有个小坑,就是datax脚本里面根据时间选择的时候应该是大于等于lastExecuteTime还是小于等于currentTime,数量大的时候抽取数据是不一样的,建议大于等于lastExecuteTime 且小于currentTime)
3) 定时任务使用linux系统的crontab做定时,时间不能设置过短。
[root@localhost ~]# cat /home/datax/jobTimer/dataxScheduler.py
#encoding="utf-8"
# two args , first: datax config file path, logfile path
import time
import sys
import os
print("going to execute")
configFilePath = sys.argv[1]
logFilePath = sys.argv[2]
lastTimeExecuteRecord = sys.argv[3]
print "=============================================="
print "configFilePath :", configFilePath
print "configFilePath :", logFilePath
print "lastTimeExecute File :",lastTimeExecuteRecord
print "=============================================="
lastExecuteTime=""
try:
fo = open(lastTimeExecuteRecord, "r")
lastExecuteTime = fo.read()
except IOError:
lastExecuteTime = '1970-01-01 00:00:00'
print("last time execute time: " + lastExecuteTime)
currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("currentTime is :"+ currentTime)
#os.system("python /home/datax/datax/bin/datax.py " + configFilePath + " --lastTime" + lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath)
script2execute = "python /home/datax/datax/bin/datax.py %s -p \"-DlastTime='%s' -DcurrentTime='%s'\" >> %s"%( configFilePath, lastExecuteTime, currentTime,logFilePath)
print("to be excute script:"+script2execute)
os.system(script2execute)
print("script execute ending")
# update timestamp to file
fo = open(lastTimeExecuteRecord, "w+")
fo.write(currentTime)
fo.close()
print "ending---",lastTimeExecuteRecord
3. 编写完后修改datax的执行配置json文件
{
"job": {
"setting": {
"speed":{
"byte": 10485760,
"channel": "5"
}
},
"content": [
{
"reader": {
"name":"oraclereader",
"parameter": {
"column": ["...."], // 自定义
"connection": [
{
"jdbcUrl":["jdbc:oracle:thin:@。。。"], // 自定义
"table": ["table_test"]// 自定义
}
],
"where": " copy_time >= to_date('${lastTime}', 'yyyy-MM-dd HH24:mi:ss') and copy_time< to_date('${currentTime}', 'yyyy-MM-dd HH24:mi:ss')", // 读取增量数据的逻辑
"password":"***", // 自定义
"username":"***" // 自定义
}
},
"writer": {
"name":"oraclewriter",
"parameter":{
"preSql": [
"delete from ***"
],
"column":[ "。。。"],// 自定义
"connection":[
{
"jdbcUrl":"。。。",// 自定义
"table":[
"。。。"// 自定义
]
}
],
"password":"***",// 自定义
"username":"***"// 自定义
}
}}]
}}
4. 执行
python /home/datax/jobTimer/dataxScheduler.py \
'/home/datax/jobs/test_job.json' \
'/home/datax/jobs/log/test_job.log' \
'/home/datax/jobTimer/record/test_job.record'
执行过后: test_job.record中文件内容为
[root@xxxx jobs]# cat /home/datax/jobTimer/record/test_job.record
2019-04-18 09:40:01
5. 这个方案的不足之处和改进方案
1) 由于datax没法做数据update,对于不存在update和delete的场景,使用这个方案还可以
2) 有些业务数据表的数据更新时间可能不准确,使用触发器的方式产生新的时间戳会更可靠点