需求:日志目录是 /data/logs/{instanceId}/xxx.log
, 保证倒数第二级为 实例ID
。
flume 为 taildir-source,要求事件按不同instanceId
, sink到不同的hdfs目录下。
思路:先通过taildirsource的fileHeader
配置,将日志完整路径写入Event-Header, 然后用拦截器截取path得到instanceId
- 自定义拦截器
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
public class EventInterceptor implements Interceptor {
//单个事件拦截
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String fileName = headers.getOrDefault("fileName", "");
// System.out.println("file name is: "+fileName);
//约定的fileName格式 e.g /data/logs/10086/xxx.log
if (!fileName.isEmpty()) {
String[] strs = fileName.split("/");
if (strs.length-2<0) {
return event;
}else{
headers.put("instanceId", strs[strs.length-2]);
}
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
//内部类
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new EventInterceptor();
}
}
}
maven打包,放入flume安装位置的
lib
目录下properties关键配置如下
## taildir-source 向header写入采集file路径
rec.sources.r1.fileHeader = true
rec.sources.r1.fileHeaderKey = fileName
##使用拦截器
rec.sources.r1.interceptors = i1
rec.sources.r1.interceptors.i1.type = com.rec.EventInterceptor$Builder
## %{xxx}方式取出header中指定value, 此处为:instanceId
rec.sinks.k1.hdfs.path = hdfs://192.168.204.31:8020/flume/rec/%{instanceId}