背景
在日常工作中,可能会有这种需求,类似于监控一个目录下新文件的产生,并且这些文件会实时的追加内容,例如ngnix的切割日至,或者某些服务器上的仿真日志等。
实现
在之前公司工作中,有过类似需求,也自己实现过,后来Flume 1.7正式发布了 TaildirSource。我们来看下Flume内部是怎么实现这个功能的。
源码
首先先思考一下大体的实现思路是怎样的,最简单的方法是记录下该目录下每个文件上一次的读取位置。在读取之后更新最新的读取位置。
在process方法中,
通过对每个符合要求的文件进行处理,其中的updateTailFiles方法是获取当前的更新的inode 列表,具体的内部实现是判断文件最后的更新时间之类的一大坨,看看注释就好了,
主要逻辑不难, 但它是怎么容错的呢,在Flume宕机重启之后,是如何知道上一次传输的位置的呢,这里,Flume将相应的记录位置保存在文件中,来看源码。
其中的有两个定时的单线程executor service,会定时保存当前的位置,重启时会load这个文件,这样其实会有一个小问题,就是当channel已经处理过event,然后在两次executor service启动期间,系统宕机了,这样再重启之后呢,会有这部分数据的重传。也就是说这里保证的at least once,
还有一个可能出现不一致的点,existingInodes 是一个copyOnWriteList, 在executor service 运行过程中会有不一致的情况。
如果要保证exactly once, 要怎么做呢。如果是比较简单的实现方式的话,如果是我个人来做,会为每个event分配个递增的id,通过在保存在channel端最新的处理的event的id来比较,如果event的id比channel端的id旧,那就丢弃,否则就更新channel的id。
可能是flume这样的日志传输工具都不是为了金钱交易的场景设计的,所以就没有严格的执行exactly once语义,个人猜测啊。