Flume TaildirSource源码浅析

背景

在日常工作中,可能会有这种需求,类似于监控一个目录下新文件的产生,并且这些文件会实时的追加内容,例如ngnix的切割日至,或者某些服务器上的仿真日志等。

实现

在之前公司工作中,有过类似需求,也自己实现过,后来Flume 1.7正式发布了 TaildirSource。我们来看下Flume内部是怎么实现这个功能的。

源码

首先先思考一下大体的实现思路是怎样的,最简单的方法是记录下该目录下每个文件上一次的读取位置。在读取之后更新最新的读取位置。



在process方法中,


Paste_Image.png

通过对每个符合要求的文件进行处理,其中的updateTailFiles方法是获取当前的更新的inode 列表,具体的内部实现是判断文件最后的更新时间之类的一大坨,看看注释就好了,

主要逻辑不难, 但它是怎么容错的呢,在Flume宕机重启之后,是如何知道上一次传输的位置的呢,这里,Flume将相应的记录位置保存在文件中,来看源码。



其中的有两个定时的单线程executor service,会定时保存当前的位置,重启时会load这个文件,这样其实会有一个小问题,就是当channel已经处理过event,然后在两次executor service启动期间,系统宕机了,这样再重启之后呢,会有这部分数据的重传。也就是说这里保证的at least once,
还有一个可能出现不一致的点,existingInodes 是一个copyOnWriteList, 在executor service 运行过程中会有不一致的情况。
如果要保证exactly once, 要怎么做呢。如果是比较简单的实现方式的话,如果是我个人来做,会为每个event分配个递增的id,通过在保存在channel端最新的处理的event的id来比较,如果event的id比channel端的id旧,那就丢弃,否则就更新channel的id。
可能是flume这样的日志传输工具都不是为了金钱交易的场景设计的,所以就没有严格的执行exactly once语义,个人猜测啊。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,399评论 19 139
  • 博客原文 翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档 引言 概述 Apache Flume...
    rabbitGYK阅读 13,939评论 13 34
  • ¥开启¥ 【iAPP实现进入界面执行逐一显】 〖2017-08-25 15:22:14〗 《//首先开一个线程,因...
    小菜c阅读 11,725评论 0 17
  • Flume架构与实践 Flume是一款在线数据采集的系统,典型的应用场景是作为数据的总线,在线的进行日志的采集、分...
    mike_zhangliang阅读 6,365评论 0 2
  • 介绍 概述 Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用...
    ximengchj阅读 8,881评论 0 13