Kafka拦截器(interceptor)功能

Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

拦截器案例:
1)需求:
实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
答案略

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 拦截器原理 Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现cl...
    ZFH__ZJ阅读 371评论 0 2
  • 个人专题目录[https://www.jianshu.com/p/140e2a59db2c] 1. 拦截器原理 P...
    Java及SpringBoot阅读 1,610评论 0 3
  • 本小节我们来讨论Kafka生产者是如何发送消息到Kafka的, Kafka项目有一个生产者客户端,我们可以通过这个...
    Terminalist阅读 3,070评论 0 3
  • 遇到这样两个互相掐的同桌,我右边的那个不知道怎么回事一天的放屁。 我左边的那个忍无可忍,阴阳怪气来了一句:“一个傻...
    和鹊阅读 258评论 0 0
  • “别哭了……”,这话一出,对方肯定已是泪水滂沱,劝的人,也是事后诸葛亮,束手无策。 其实,有眼泪的...
    韩倾阅读 254评论 0 0