flume1.7.0推出了taildirSource组件。主要功能是监测变化的文件。
优化了以前exec 模式下,tail -f 文件的问题。
Flume Document Url:
使用者:
https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
开发者:
https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
主要介绍TailDir Source
类图如下:
后续自己画一个新的。
TailDirSource类
TAILDIR 入口类,通过配置参数匹配日志文件,获取日志文件更新内容并且将已经读取的偏移量记录到特定的文件当中(position file)中,完成文件的持续读取。
configure方法
首先来看configure方法,他是通过读取配置文件完成,接下来操作所需信息的初始化工作。
/**
* param: 初始化配置参数
**/
@Override
public synchronized void configure(Context context) {
/**
* 通过空格把groups 分割出来:
* a1.sources.r1.filegroups = f1 f2
* result[]-> [f1,f2]
*
* */
String fileGroups = context.getString(FILE_GROUPS);
Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);
/**
*
* 返回一个group对应FilePath的Map<String,String>
* result-> <f1,/var/log/test1/example.log>
* <f2,/var/log/test2/.*log.*>
*/
filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),
fileGroups.split("\\s+"));
/**
* 判断是否为空
*/
Preconditions.checkState(!filePaths.isEmpty(),
"Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");
/**
* 获取当前用户主目录
*/
String homePath = System.getProperty("user.home").replace('\\', '/');
/**
* 获取positionFile 路径,带默认值
*
* a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
*
* xxx_agent.sources.r1.positionFile = ../../position/taildir_position.json
*
* result-> /var/log/flume/taildir_position.json
*/
positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);
/**
* positionFile路径
*/
Path positionFile = Paths.get(positionFilePath);
try {
/**
* 创建目录目录名,上级目录如果缺失一起创建,/var/log/flume
*/
Files.createDirectories(positionFile.getParent());
} catch (IOException e) {
throw new FlumeException("Error creating positionFile parent directories", e);
}
/**
* a1.sources.r1.headers.f1.headerKey1 = value1
* a1.sources.r1.headers.f2.headerKey1 = value2
* a1.sources.r1.headers.f2.headerKey2 = value2-2
*
*
* 用于发送EVENT的header信息添加值
* 返回table 结构
* <f1,headerKey1,value1>
* <f2,headerKey1,value2>
* <f2,headerKey2,value2-2>
*/
headerTable = getTable(context, HEADERS_PREFIX);
/**
* 批量大小
*/
batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
/**
* 从头还是从尾部读取,默认false
*/
skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);
/**
* 是否加偏移量,剔除行标题
*/
byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);
/**
* idleTimeout日志文件在idleTimeout间隔时间,没有被修改,文件将被关闭
*/
idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
/**
* Interval time (ms) to write the last position of each file on the position file.
*
* writePosInterval,TaildirSource读取每个监控文件都在位置文件中记录监控文件的已经读取的偏移量,
* writePosInterval 更新positionFile的间隔时间
* */
writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);
/**
*
* Listing directories and applying the filename regex pattern
* may be time consuming for directories containing thousands of files.
* Caching the list of matching files can improve performance.
* The order in which files are consumed will also be cached.
* Requires that the file system keeps track of modification times with at least a 1-second granularity.
*
* 是否开启matcher cache
*
* */
cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,
DEFAULT_CACHE_PATTERN_MATCHING);
/**
* The increment for time delay before reattempting to poll for new data,
* when the last attempt did not find any new data.
*
* 当最后一次尝试没有找到任何新数据时,推迟变量长的时间再次轮训查找。
*/
backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
/**
* The max time delay between each reattempt to poll for new data,
* when the last attempt did not find any new data.
* 当最后一次尝试没有找到任何新数据时,每次重新尝试轮询新数据之间的最大时间延迟
*/
maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
/**
* Whether to add a header storing the absolute path filename.
*
* 是否添加头部存储绝对路径
*
* */
fileHeader = context.getBoolean(FILENAME_HEADER,
DEFAULT_FILE_HEADER);
/**
* Header key to use when appending absolute path filename to event header.
*
* 当fileHeader为TURE时使用。
* */
fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
DEFAULT_FILENAME_HEADER_KEY);
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
}
Start方法
创建初始化后的变量创建了ReliableTaildirEventReader对象,并启动两个线程池,分别是监控日志文件,记录日志文件读取的偏移量。后续会介绍ReliableTaildirEventReader,idleFileCheckerRunnable,PositionWriterRunnable做了什么。
/**
* describe: 创建初始化后的变量创建了ReliableTaildirEventReader对象,
* 并启动两个线程池,分别是监控日志文件,记录日志文件读取的偏移量
**/
@Override
public synchronized void start() {
logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);
try {
/**
* 通过configure()初始化后的变量创建了ReliableTaildirEventReader对象
*/
reader = new ReliableTaildirEventReader.Builder()
.filePaths(filePaths)
.headerTable(headerTable)
.positionFilePath(positionFilePath)
.skipToEnd(skipToEnd)
.addByteOffset(byteOffsetHeader)
.cachePatternMatching(cachePatternMatching)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.build();
} catch (IOException e) {
throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
}
/**
* 创建线程池监控日志文件。
*/
idleFileChecker = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);
/**
* 创建线程池记录日志文件读取的偏移量。
*/
positionWriter = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);
super.start();
logger.debug("TaildirSource started");
sourceCounter.start();
}
process方法
process时Source主要工作方法,框架会根据返回状态,不断调取该方法,完成日志文件的传输。
/**
* @describe: process方法记录了TailDirSource类中主要的逻辑,
* 获取每个监控的日志文件,调用tailFileProcess获取每个日志文件的更新数据,
* 并将每条记录转换为Event(具体细节要看ReliableTaildirEventReader的readEvents方法)
* 并读取解析而为了只关注需要关注的文件
**/
@Override
public Status process() {
Status status = Status.READY;
try {
/**
* 清空记录存在inode的list
*/
existingInodes.clear();
/**
* 调用ReliableTaildirEventReader对象的updateTailFiles方法获取要监控的日志文件。
*/
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
/**
* 获取具体tailFile对象
*/
TailFile tf = reader.getTailFiles().get(inode);
/**
* 是否需要tail
*/
if (tf.needTail()) {
/**
* 获取每个日志文件的更新数据,并发送,其中包括文件规则是否满足
*/
tailFileProcess(tf, true);
}
}
closeTailFiles();
try {
TimeUnit.MILLISECONDS.sleep(retryInterval);
} catch (InterruptedException e) {
logger.info("Interrupted while sleeping");
}
} catch (Throwable t) {
logger.error("Unable to tail files", t);
status = Status.BACKOFF;
}
return status;
}
接下来主要介绍ReliableTaildirEventReader类
ReliableTaildirEventReader
明天继续写……