如果对数据做了更改或是有新数据加入,若再执行全部导入,就有点得不偿失了 。
这时就要用到jdbc 的两个属性 statefile(状态文件) 和 schedule(计划任务时间),并且 sql 语句也要改成动态的 ,改动如下:
"statefile" : "statefile-article.json",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [
{
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
改动后的完整文件 mysql-article.sh
#!/bin/sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib
echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "root",
"password" : "123456",
"statefile" : "statefile-article.json",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [
{
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
"index" : "jdbctest",
"type" : "article",
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"type_mapping": {
"article" : {
"properties" : {
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
运行该文件 :/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh
可以看到 命令行端 被占用,一直在运行,并且在 mysql-article.sh 的同级目录下生成了一个 statefile-article.json 的文件,sql 语句中需要的数据 lastexecutionstart 就保存在该文件中
现在我们来改动一下MySQL 中的数据,增加一条数据,并修改一条 id 等于 5 的数据
INSERT INTO article() VALUES(NULL,'测试JDBC','jam00','2016-11-01 13:34:15','2016-11-01 13:34:15');
UPDATE article SET `subject`='测试JDBC-改动' WHERE id=5;
最多等一分钟,再看看ES 中的数据。
curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty' -d '{
"sort": {
"id": { "order": "desc" }
}
}'
# 返回
...
"hits" : [ {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "6",
"_score" : null,
"_source" : {
"id" : 6,
"subject" : "测试JDBC",
"author" : "jam00",
"create_time" : "2016-11-01T13:34:15.000+08:00",
"update_time" : "2016-11-01T13:34:15.000+08:00"
},
"sort" : [ 6 ]
}, {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "5",
"_score" : null,
"_source" : {
"id" : 5,
"subject" : "测试JDBC-改动",
"author" : "tommy",
"create_time" : "2016-10-31T17:52:07.000+08:00",
"update_time" : "2016-11-01T13:35:41.000+08:00"
},
"sort" : [ 5 ]
}
...
测试成功。
为了让 mysql-article.sh 后台执行,我们可以使用 nohup 命令
nohup /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh &
当我们想停止执行的时候。
ps aux |grep jdbc2.2
#返回
root 26118 0.0 0.1 106092 1212 pts/0 S 14:03 0:00 /bin/sh /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh
root 26123 11.0 4.4 1079192 44932 pts/0 Sl 14:03 0:00 java -cp /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../lib/* -Dlog4j.configurationFile=/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../bin/log4j2.xml org.xbib.tools.Runner org.xbib.tools.JDBCImporter
# 使用 kill 命令关闭进程, 26123 就是上面一句返回的进程号,不用杀掉 26118 ,杀掉26123 这个进程,26118 进程会自动关闭
kill -9 26123
至此,MySQL 数据源的 增量索引和更新就完成了。