Taildir Source 源代码解析

flume1.7.0推出了taildirSource组件。主要功能是监测变化的文件。
优化了以前exec 模式下,tail -f 文件的问题。
Flume Document Url:
使用者:
https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
开发者:
https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst

主要介绍TailDir Source

类图如下:

后续自己画一个新的。

image.png
TailDirSource类

TAILDIR 入口类,通过配置参数匹配日志文件,获取日志文件更新内容并且将已经读取的偏移量记录到特定的文件当中(position file)中,完成文件的持续读取。

configure方法

首先来看configure方法,他是通过读取配置文件完成,接下来操作所需信息的初始化工作。

/**
     * param: 初始化配置参数
     **/
    @Override
    public synchronized void configure(Context context) {

        /**
         * 通过空格把groups 分割出来:
         *  a1.sources.r1.filegroups = f1 f2
         *  result[]-> [f1,f2]
         *
         * */
        String fileGroups = context.getString(FILE_GROUPS);
        Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);
        /**
         *
         *  返回一个group对应FilePath的Map<String,String>
         *  result-> <f1,/var/log/test1/example.log>
         *           <f2,/var/log/test2/.*log.*>
         */
        filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),
                fileGroups.split("\\s+"));
        /**
         * 判断是否为空
         */
        Preconditions.checkState(!filePaths.isEmpty(),
                "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");
        /**
         * 获取当前用户主目录
         */
        String homePath = System.getProperty("user.home").replace('\\', '/');
        /**
         *  获取positionFile 路径,带默认值
         *
         * a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
         *
         * xxx_agent.sources.r1.positionFile = ../../position/taildir_position.json
         *
         * result->  /var/log/flume/taildir_position.json
         */
        positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);

        /**
         *  positionFile路径
         */
        Path positionFile = Paths.get(positionFilePath);

        try {
            /**
             * 创建目录目录名,上级目录如果缺失一起创建,/var/log/flume
             */
            Files.createDirectories(positionFile.getParent());
        } catch (IOException e) {
            throw new FlumeException("Error creating positionFile parent directories", e);
        }

        /**
         * a1.sources.r1.headers.f1.headerKey1 = value1
         * a1.sources.r1.headers.f2.headerKey1 = value2
         * a1.sources.r1.headers.f2.headerKey2 = value2-2
         *
         *
         * 用于发送EVENT的header信息添加值
         * 返回table 结构
         * <f1,headerKey1,value1>
         * <f2,headerKey1,value2>
         * <f2,headerKey2,value2-2>
         */
        headerTable = getTable(context, HEADERS_PREFIX);

        /**
         * 批量大小
         */
        batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
        /**
         * 从头还是从尾部读取,默认false
         */
        skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);
        /**
         * 是否加偏移量,剔除行标题
         */
        byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);
        /**
         * idleTimeout日志文件在idleTimeout间隔时间,没有被修改,文件将被关闭
         */
        idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
        /**
         * Interval time (ms) to write the last position of each file on the position file.
         *
         * writePosInterval,TaildirSource读取每个监控文件都在位置文件中记录监控文件的已经读取的偏移量,
         * writePosInterval 更新positionFile的间隔时间
         * */
        writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);
        /**
         *
         * Listing directories and applying the filename regex pattern
         * may be time consuming for directories containing thousands of files.
         * Caching the list of matching files can improve performance.
         * The order in which files are consumed will also be cached.
         * Requires that the file system keeps track of modification times with at least a 1-second granularity.
         *
         * 是否开启matcher cache
         *
         * */
        cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,
                DEFAULT_CACHE_PATTERN_MATCHING);
        /**
         *  The increment for time delay before reattempting to poll for new data,
         *  when the last attempt did not find any new data.
         *
         *  当最后一次尝试没有找到任何新数据时,推迟变量长的时间再次轮训查找。
         */
        backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
                PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
        /**
         *  The max time delay between each reattempt to poll for new data,
         *  when the last attempt did not find any new data.
         *  当最后一次尝试没有找到任何新数据时,每次重新尝试轮询新数据之间的最大时间延迟
         */
        maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
                PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
        /**
         * Whether to add a header storing the absolute path filename.
         *
         * 是否添加头部存储绝对路径
         *
         * */
        fileHeader = context.getBoolean(FILENAME_HEADER,
                DEFAULT_FILE_HEADER);
        /**
         * Header key to use when appending absolute path filename to event header.
         *
         * 当fileHeader为TURE时使用。
         * */
        fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
                DEFAULT_FILENAME_HEADER_KEY);

        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }
    }
Start方法

创建初始化后的变量创建了ReliableTaildirEventReader对象,并启动两个线程池,分别是监控日志文件,记录日志文件读取的偏移量。后续会介绍ReliableTaildirEventReaderidleFileCheckerRunnablePositionWriterRunnable做了什么。

/**
     * describe:  创建初始化后的变量创建了ReliableTaildirEventReader对象,
     * 并启动两个线程池,分别是监控日志文件,记录日志文件读取的偏移量
     **/
    @Override
    public synchronized void start() {
        logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);
        try {
            /**
             * 通过configure()初始化后的变量创建了ReliableTaildirEventReader对象
             */
            reader = new ReliableTaildirEventReader.Builder()
                    .filePaths(filePaths)
                    .headerTable(headerTable)
                    .positionFilePath(positionFilePath)
                    .skipToEnd(skipToEnd)
                    .addByteOffset(byteOffsetHeader)
                    .cachePatternMatching(cachePatternMatching)
                    .annotateFileName(fileHeader)
                    .fileNameHeader(fileHeaderKey)
                    .build();
        } catch (IOException e) {
            throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
        }
        /**
         * 创建线程池监控日志文件。
         */
        idleFileChecker = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
        idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
                idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);
        /**
         * 创建线程池记录日志文件读取的偏移量。
         */
        positionWriter = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
        positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
                writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);

        super.start();
        logger.debug("TaildirSource started");
        sourceCounter.start();
    }

process方法

process时Source主要工作方法,框架会根据返回状态,不断调取该方法,完成日志文件的传输。

    /**
     * @describe: process方法记录了TailDirSource类中主要的逻辑,
     * 获取每个监控的日志文件,调用tailFileProcess获取每个日志文件的更新数据,
     * 并将每条记录转换为Event(具体细节要看ReliableTaildirEventReader的readEvents方法)
     * 并读取解析而为了只关注需要关注的文件
     **/
    @Override
    public Status process() {
        Status status = Status.READY;
        try {
            /**
             * 清空记录存在inode的list
             */
            existingInodes.clear();
            /**
             * 调用ReliableTaildirEventReader对象的updateTailFiles方法获取要监控的日志文件。
             */
            existingInodes.addAll(reader.updateTailFiles());
            for (long inode : existingInodes) {
                /**
                 * 获取具体tailFile对象
                 */
                TailFile tf = reader.getTailFiles().get(inode);
                /**
                 * 是否需要tail
                 */
                if (tf.needTail()) {

                    /**
                     *  获取每个日志文件的更新数据,并发送,其中包括文件规则是否满足
                     */
                    tailFileProcess(tf, true);
                }
            }
            closeTailFiles();
            try {
                TimeUnit.MILLISECONDS.sleep(retryInterval);
            } catch (InterruptedException e) {
                logger.info("Interrupted while sleeping");
            }
        } catch (Throwable t) {
            logger.error("Unable to tail files", t);
            status = Status.BACKOFF;
        }
        return status;
    }

接下来主要介绍ReliableTaildirEventReader类

ReliableTaildirEventReader

明天继续写……

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 一、Python简介和环境搭建以及pip的安装 4课时实验课主要内容 【Python简介】: Python 是一个...
    _小老虎_阅读 5,718评论 0 10
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,813评论 0 5
  • 简介 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume...
    达微阅读 667评论 0 2
  • 这里主要介绍几种常见的日志的source来源,包括监控文件型,监控文件内容增量,TCP和HTTP。 Spool类型...
    里仁有邻阅读 1,077评论 0 1
  • 人生之路,不会笔直向前的,总有崎岖泥泞,总有坎坷艰辛;我们前进的方向不会永远是正确的,总会做一些错事,总会走一些弯...
    运和阅读 319评论 0 2