大数据之flume source、interceptor、sink自定义

依赖:

<groupId>org.apache.flume

<artifactId>flume-ng-core

<version>1.9.0</version>

一、interceptor自定义 
public class MyInterceptorimplements Interceptor {

public void initialize() {

}

public Eventintercept(Event event) {

byte[] body = event.getBody();

      if ((body[0] >='A' && body[0] <='Z') || (body[0] >='a' && body[0] <='z')) {

event.getHeaders().put("type","letter");

      }else if (body[0] >='0' && body[0] <='9') {

event.getHeaders().put("type","number");

      }

return event;

    }

public Listintercept(List list) {

for (Event e : list

) {

intercept(e);

        }

return list;

    }

public void close() {

}

/*

静态内部类

*/

    public static class MyBuilderimplements Interceptor.Builder {

public Interceptorbuild() {

return new MyInterceptor();

        }

public void configure(Context context) {

}

}

二、source自定义 
/*

自定义source

使用flume接收数据,并给每条数据添加前缀,输出控制台,前缀可从flume配置

*/

public class MySourceextends AbstractSourceimplements Configurable, PollableSource {

private StringpreString;

    public Statusprocess()throws EventDeliveryException {

SimpleEvent event =new SimpleEvent();

        //给event设置数据

        event.setBody((preString+"hello").getBytes());

        //将数据放入channel中

        ChannelProcessor processor = getChannelProcessor();

        processor.processEvent(event);

return null;

    }

public long getBackOffSleepIncrement() {

return 0;

    }

public long getMaxBackOffSleepInterval() {

return 0;

    }

public void configure(Context context) {

//初始化context,从配置文件读取数据

      preString = context.getString("prefix","test");

    }

}

三、sink自定义 
public class MySinkextends AbstractSinkimplements Configurable {

private Stringsuffix;

    Loggerlogger = LoggerFactory.getLogger(MySink.class);

    public Statusprocess()throws EventDeliveryException {

Status status = Status.READY;

        Channel channel = getChannel();

        Transaction transaction = channel.getTransaction();

        try {

transaction.begin();

            Event event = channel.take();

            byte[] body = event.getBody();

            logger.info(Arrays.toString(body) +suffix);

            //事务提交

            transaction.commit();

        }catch (Exception e) {

status = Status.BACKOFF;

            //事务回滚

            transaction.rollback();

        }finally {

//事务关闭

            transaction.close();

        }

return status;

    }

public void configure(Context context) {

suffix = context.getString("suffix","test");

    }

}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容