DataX同步ES数据到CK

前言

存在一些业务场景,需要离线同步数据到异构数据库,DataX算是一个不错的选择。不过开源版本只支持单进程,多线程,如果是需要多进程,需要业务在reader层面查询的时候就规划好对应进程需要读取的数据分片。

1.png

2.png

实践

当前例子为 elasticsearch 同步数据到 clickhouse
1,下载DataX源码编译(参考https://github.com/alibaba/DataX/blob/master/userGuid.md
git clone https://github.com/alibaba/DataX.git
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
编译生成文件 datax.tar.gz

3.png

2,下载DataX-elasticsearch源码(https://github.com/Kestrong/datax-elasticsearch 这个插件可用支持nested数据结构)编译
git clone https://github.com/Kestrong/datax-elasticsearch
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
生成插件文件

4.png

3,将打包好的elasticsearchreader插件发到datax/plugin/reader目录下,压缩并上传到远程机器。


5.png

4,校验datax是否可用
cd datax/bin
python datax.py -r streamreader -w streamwriter 如果生成了模板文件则表示datax编译包正常。

5,上传配置文件 es2ck_demo.json

{"job":{"setting":{"speed":{"channel":40}},"content":[{"reader":{"name":"elasticsearchreader","parameter":{"endpoint":"http://es_ip:19200","accessId":"elastic","accessKey":"xxxxx","index":"oss_storage_cap*","type":"_doc","searchType":"dfs_query_then_fetch","headers":{},"scroll":"3m","discovery":"true","search":[{"query":{"match_all":{}}}],"table":{"name":"es_view_oss_storage_cap","column":[{"name":"user"},{"name":"objectAddr"},{"name":"collectStamp"},{"name":"objectBuckets","alias":"buckets"}]}}},"writer":{"name":"clickhousewriter","parameter":{"batchByteSize":134217728,"batchSize":65536,"column":["user","objectAddr","collectStamp","buckets"],"connection":[{"jdbcUrl":"jdbc:clickhouse://ck_ip:28123/object_log","table":["oss_storage_capacity_all"]}],"dryRun":false,"password":"xxxx","postSql":[],"preSql":[],"username":"default","writeMode":"insert"}}}]}}

6,如果数据量很大,可以指定一个时间段,先同步一批

##es 查看指定时间段内的数据
GET oss_storage_cap*/_count
{"query":{"bool":{"filter":{"range":{"insert_time":{"from":"2020-05-28T00:00:00.000+0800","to":"2020-05-28T23:59:59.000+0800","include_lower":true,"include_upper":true,"boost":1.0}}}}}}
 
 
##修改配置
"search": [{
    "size": 10000, ##这个参数比较重要,要不然读取速度会很慢
    "query": {
        "bool": {
            "filter": {
                "range": {
                    "insert_time": {
                        "from": "2020-05-28T00:00:00.000+0800",
                        "to": "2020-05-28T23:59:59.000+0800",
                        "include_lower": true,
                        "include_upper": true,
                        "boost": 1.0
                    }
                }
            }
        }
    }
}],

可以看到只同步了指定时间段内的数据


1.png
2.png

6,开启同步
执行 python ../bin/datax.py es2ck_demo.json


3.png

Tips

1,由于es的scroll分页查询,默认查询的条数是number_of_shards*size 建议手动设置size大小,否则同步效果速度很慢,例"search":[{"size":10000,"query":{"match_all":{}}}]
2,Query_Then_Fetch vs DFS-Query_Then_Fetch https://www.elastic.co/cn/blog/understanding-query-then-fetch-vs-dfs-query-then-fetch dfs方式在做查询更精确,但是如果是导出Query_Then_Fetch速度应该更快

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容