依赖:
<groupId>org.apache.flume
<artifactId>flume-ng-core
<version>1.9.0</version>
一、interceptor自定义
public class MyInterceptorimplements Interceptor {
public void initialize() {
}
public Eventintercept(Event event) {
byte[] body = event.getBody();
if ((body[0] >='A' && body[0] <='Z') || (body[0] >='a' && body[0] <='z')) {
event.getHeaders().put("type","letter");
}else if (body[0] >='0' && body[0] <='9') {
event.getHeaders().put("type","number");
}
return event;
}
public Listintercept(List list) {
for (Event e : list
) {
intercept(e);
}
return list;
}
public void close() {
}
/*
静态内部类
*/
public static class MyBuilderimplements Interceptor.Builder {
public Interceptorbuild() {
return new MyInterceptor();
}
public void configure(Context context) {
}
}
二、source自定义
/*
自定义source
使用flume接收数据,并给每条数据添加前缀,输出控制台,前缀可从flume配置
*/
public class MySourceextends AbstractSourceimplements Configurable, PollableSource {
private StringpreString;
public Statusprocess()throws EventDeliveryException {
SimpleEvent event =new SimpleEvent();
//给event设置数据
event.setBody((preString+"hello").getBytes());
//将数据放入channel中
ChannelProcessor processor = getChannelProcessor();
processor.processEvent(event);
return null;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
public void configure(Context context) {
//初始化context,从配置文件读取数据
preString = context.getString("prefix","test");
}
}
三、sink自定义
public class MySinkextends AbstractSinkimplements Configurable {
private Stringsuffix;
Loggerlogger = LoggerFactory.getLogger(MySink.class);
public Statusprocess()throws EventDeliveryException {
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
Event event = channel.take();
byte[] body = event.getBody();
logger.info(Arrays.toString(body) +suffix);
//事务提交
transaction.commit();
}catch (Exception e) {
status = Status.BACKOFF;
//事务回滚
transaction.rollback();
}finally {
//事务关闭
transaction.close();
}
return status;
}
public void configure(Context context) {
suffix = context.getString("suffix","test");
}
}