业务场景
外部设备定期会向本地目录里面写入文本文件,文本内容每行都是以竖线分割,例如
2|235|872|6|460078003|0|||02000bda||
2|235|852|3|460078004|0|||02000bda||
1|235|772|1|460078005|0|||02000bda||
需要通过flume将文件内容导入到kafka中,并指定源文件的第5列的值作为kafka每条消息的key
Flume完整配置
agent.sources = r1
agent.channels = c1
agent.sinks = s1
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /tmp/flume
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = regex_extractor
agent.sources.r1.interceptors.i1.regex = .*\\|.*\\|.*\\|.*\\|(.*)\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*
agent.sources.r1.interceptors.i1.serializers = s1
agent.sources.r1.interceptors.i1.serializers.s1.name = key
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = test
agent.sinks.s1.brokerList = host1:6667
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 20
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.sinks.s1.channel = c1
agent.sources.r1.channels = c1
配置说明
实现此功能的关键是使用了flume的正则过滤器(regex_extractor),下面就详细说明一下这个过滤器是如何工作的。
这里使用了正则表达式的组的概念,也就是说在regex中定义的组,每个组匹配的值作为header中的value,而此value的key为s1.name对应的名字。
以下代码是模拟Flume正则过滤器的实现:
Pattern regex = Pattern.compile("(.*)\\|.*\\|.*\\|(.*)\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*");
Matcher matcher = regex.matcher("999|235|872|888|460077171338003|0|||02000bda||");
if (matcher.find()) {
for (int group = 0, count = matcher.groupCount(); group < count; group++) {
int groupIndex = group + 1;
System.out.println(matcher.group(groupIndex));
}
}
返回结果是
999
888
可见首先找到所有组的值,然后对应serializers配置,再由serializers.s1.name找到对应的key。
例如是下面这样的配置,而且原数据中有一行999|235|872|888|460077171338003|0|||02000bda||
,那么在header中会有两个键值对,key1->999, key2->888
agent.sources.r1.interceptors.i1.regex = .(.*)\\|.*\\|.*\\|(.*)\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*
agent.sources.r1.interceptors.i1.serializers = s1 s2
agent.sources.r1.interceptors.i1.serializers.s1.name = key1
agent.sources.r1.interceptors.i1.serializers.s2.name = key2
源码分析
public Event intercept(Event event) {
Matcher matcher = regex.matcher(
new String(event.getBody(), Charsets.UTF_8));//event.getBody()为消息体
Map<String, String> headers = event.getHeaders();
if (matcher.find()) {
for (int group = 0, count = matcher.groupCount(); group < count; group++) {
int groupIndex = group + 1;
if (groupIndex > serializers.size()) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping group {} to {} due to missing serializer",
group, count);
}
break;
}
NameAndSerializer serializer = serializers.get(group);
if (logger.isDebugEnabled()) {
logger.debug("Serializing {} using {}", serializer.headerName,
serializer.serializer);
}
headers.put(serializer.headerName, //serializers.xxx.name定义的值
serializer.serializer.serialize(matcher.group(groupIndex)));//matcher.group(groupIndex)为根据组匹配到的值
}
}
return event;
}