Kafka消费打印消息格式的设置

通常在测试kafka时,会用kafka-console-consumer来查看消息是否能被消费。改名了为kafka官方提供的控制台消费消息工具,使用方法可通过直接抵用该命令(sh kafka-console-consumer.sh)仅用于测试场景。

默认情况下该命令在消费消息打印时,只会打印消息的值。

如果想要查看更多消息的信息,如createTime,key,或者想要更友好显示消息,则可以指定相关配置。

使用方法 sh kafka-console-consumer.sh ... --property print.key=true来打印消息的key值

当然还有其他的设置可以使用。 如下:

if (props.containsKey("print.timestamp"))
  printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
  printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
  keySeparator = props.getProperty("key.separator").getBytes
if (props.containsKey("line.separator"))
  lineSeparator = props.getProperty("line.separator").getBytes
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("key.deserializer"))
  keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("value.deserializer"))
  valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])

当然如果以上还不能完全满足诉求,就可以开发一个自定义的消息展示格式了

1.  实现继承MessageFormatter的类,并实现相关init,writeTo,close方法
2.  使用--format指定自定义的消息展示类名
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,638评论 0 10
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,884评论 8 167
  • 一、基本概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独...
    ITsupuerlady阅读 1,652评论 0 9
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,444评论 0 9
  • 用后16小时,手机自拍,无滤镜,无美颜,光泽度依然很好 对于我这个平时只用过几块钱一片面膜的人来说,真的很期盼快点...
    晖_2033阅读 6,010评论 0 0