引言
最近公司准备使用elasticsearch(es),将数据库的记录同步到es中,往外提供查询的功能,而同步工具在选择上有主要几种:
1:elasticsearch-jdbc:作者蛮努力的,源源不断的更新,Java写的
2:elasticsearch-river:太监了,已经两三年没更新了
3:Go-mysql-elastic:国人写的,还未稳定
4:logstash-input-jdbc:Ruby写的,且是官方推荐
由于本人是写Java出身,所以尽量选择Java的同步工具,所以Go、Ruby就没有做更多考虑,而river的话,也是好久没更新了,所以最终选择下来准备使用elasticsearch-jdbc,说干就干,咋就撸起袖子开干了
调研及安装
elasticsearch-jdbc的安装不复杂,把zip包拉下来,放到一个环境跑起来就行了,具体可以去度娘及Google都行
问题
环境跑起来后,就看看功能是不是满足要求,还不错,除了物理删除支持不是很好外,其他的增量数据,全量数据都能进行同步,但是问题来了,用jconsole连上环境却发现,这个线程数源源不断的增加,如下:
我们知道线程其实是很消耗系统资源的,内存,Cpu等等,如果这个线程数量一致增加,那最终是OOM的结果,这样的结果是不能进行生产上运行的,最终我们要解决这个问题的,故苦逼的排查开始了:
1:查文档,翻看issue,我擦,还真有同道中人,有一个国人用中文提了一个问题,和我碰到的问题是一样的;
作者也回复了,这个issue会在最新的版本中修复掉,但是他最新版本发布是啥时候,不知道啊,只能自己排查了。
2:用jstack来看了当前线程的状态,发现是wait-condition,wait。其他详细信息没有了,我们知道线程的状态一般出现wait-condition都是因为网络请求过去了,但是却一直没有返回,或者写文件未关闭,只能是拉源码自己看了
3:elasticsearch-jdbc的源码还是比较精简的,其就是一个source,slink,其中source是从RDMS里面读取数据,而slink是将数据写入到Es中,而这两个都是会有网络请求的数据,于是开始注代码
把source的读取注释掉,如下:
这个是嵌入了source的拉取和slink的推送(不太清楚作者的命名是怎么想的),只要把beforeFetch(),fetch()注释掉,source就不起作用了。
把afterFetch()注释掉,slink就不起作用了,然而发现没什么用,线程数还是源源不断的增加
4:只能啃代码了
elasticsearch-jdbc的增量同步的思路是提交一个Cron表达式或者时间间隔,然后每次都是创建一个同步对象来进行同步的,如下:
而JDBCImporter是每一次调度都会进行一次新建,所以相当于每一次进行同步该对象都是新的。仔细看了JDBCImporter的代码
仔细看看prepare()方法
executorService由于JDBCImporter的新创建调度,又会进行重新一次设置,最终的结果就是每一次调度executorService都是最新的,而同步完成后,却没有释放掉该线程池,最终修改也比较简单。
在finally中,把这个新建的线程池关闭掉,并且设置为null值,让他等待Gc就行了。修改完重新打包部署,观察下线程数,不再继续增加了
最终代码上传在:
https://github.com/linking12/YuGong
总结
尽管java的gc让程序员不再关注对象的消亡,但是对线程池这类的一些公共资源的一些维护还是需要慎重
另外就是想表达的是:我们使用任何一个开源项目还是首先的了解下他的一些设计思路,最好是阅读一遍源码