1.我们知道使用Flume实时监控日志文件,可以使用tail -F 文件地址,但是使用tail -F 会有一个问题 当日志滚动添加至一个文件的时 突然Flume出现异常导致Flume服务挂掉此时日志文件还在不停的滚动而Flume如果想要收集数据就得重启服务但是重启后的Flume会将目标文件重头开始数据收集,此时就会出现数据重复被收集!那么有没有我们自己去管理文件的数据偏移量呢?避免出现服务重启带来的数据重复收集呢?答案是可以的~
2.此时我们可以自定义Source来管理我们的文件数据偏移量
添加pom.xml
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
3.创建我们的自定义类
package com.atguigu.gmall.flume.source;
import com.google.gson.Gson;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MyFlumeSource extends AbstractSource implements EventDrivenSource, Configurable {
private static final Logger logger = LoggerFactory.getLogger(MyFlumeSource.class);
private String filePath;
private String posiPath;
private Long interval;
private String charset;
private FileRunner fileRunner;
private ExecutorService executor;
private ChannelProcessor channelProcessor;
@Override
public void configure(Context context) {
this.filePath = context.getString("filePath");
this.posiPath = context.getString("posiPath");
this.interval = context.getLong("interval");
this.charset = context.getString("charset");
this.channelProcessor = channelProcessor;
}
@Override
public synchronized void start() {
logger.info("MyFlumeSource start now..................");
this.executor = Executors.newSingleThreadExecutor();
ChannelProcessor channelProcessor = getChannelProcessor();
logger.info("filePath={},posiPath={},interval={},charset={},channelProcessor={}", filePath, posiPath, interval, charset, JSON.toString(channelProcessor));
fileRunner = new FileRunner(filePath, posiPath, interval, charset, channelProcessor);
executor.execute(fileRunner);
super.start();
}
@Override
public synchronized void stop() {
fileRunner.setFlag(false);
executor.shutdown();
while (!this.executor.isTerminated()) {
logger.debug("Waiting for exec executor service to stop");
try {
this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
} catch (InterruptedException var2) {
logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
Thread.currentThread().interrupt();
}
}
super.stop();
}
private static class FileRunner implements Runnable {
private String filePath;//读取数据的位置
private String posiPath;//偏移量存储的地方
private Long interval;//采集数据的间隔时间
private String charset;//编码方式
private Long offset = 0L;
private RandomAccessFile raf;//读取日志文件
private File pfile;//偏移量文件实例就是一个文件
private ChannelProcessor channelProcessor;//
private Boolean flag = true;
private FileRunner(String filePath, String posiPath, long interval, String charset, ChannelProcessor channelProcessor) {
this.filePath = filePath;
this.posiPath = posiPath;
this.interval = interval;
this.charset = charset;
this.channelProcessor = channelProcessor;
pfile = new File(posiPath);
if (!pfile.exists()) {
try {
pfile.createNewFile();
} catch (IOException e) {
logger.error("create position file Exception:{}", JSON.toString(e));
}
}
try {
String offsetStr = FileUtils.readFileToString(pfile);
if (offsetStr != null && !"".equals(offsetStr)) {
offset = Long.parseLong(offsetStr);
}
raf = new RandomAccessFile(filePath, "r");
raf.seek(offset);
} catch (IOException e) {
logger.error("read position file Exception:{}", JSON.toString(e));
}
}
@Override
public void run() {
while (flag) {
try {
String line = raf.readLine();
if (StringUtils.isNotBlank(line)) {
Event event = EventBuilder.withBody(line, Charset.forName(charset));
channelProcessor.processEvent(event);
//获取最新的偏移量
offset = raf.getFilePointer();
//偏移量写入到一个文件
FileUtils.writeStringToFile(pfile, offset.toString());
} else {
Thread.sleep(interval);
}
} catch (Exception e) {
logger.error("read line error...");
}
}
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
}
4.我们将此类打包上传至$flume_home/lib
5.我们开始编辑我们的文件,我取名为 test-source-offset.conf
#定义agent名, source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#具体定义source Type一定是全路径
a1.sources.r1.type = com.atguigu.gmall.flume.source.MyFlumeSource
#目标文件
a1.sources.r1.filePath =/opt/module/test.log
#偏移量保存位置
a1.sources.r1.posiPath =/opt/module/posi.txt
a1.sources.r1.interval = 1000
a1.sources.r1.charset = UTF-8
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
6.测试
6.1 创建一个/opt/module/test.log文件
6.2启动Flume服务
bin/flume-ng agent -n a1 -c conf -f /opt/module/test-source-offset.conf -Dflume.root.logger=INFO,console