之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习了一些基本概念,以及在spring boot中简单使用。这次主要是学习如何使用kafka,尤其是自定义的一些配置。自己看了一些文档感觉都不是特别的清晰,而且github上的demo又过于简略,所以自己决定来踩踩坑。spring kafka文档,这个是spring的感觉相对还是不错的,比spring boot文档中kafka的内容要详细很多。今天这次学习主要是使用kafka作为消息队列使用,不涉及关于"流"方面的内容。
一、项目准备
快速创建一个spring boot项目,选择需要的依赖web、kafka等等。我因为使用的还是以前的项目,主要就是postgresql、web、mybatis、kafka还有mybatis的generator插件等等。根据数据库表生成对应的model。创建相应的controller、service。
先本地启动kafka,因为kafka依赖于zookeeper,所以先启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
然后启动kafka
./kafka-server-start.sh ../config/server.properties
接着在相关的 service注入KafkaTemplate,以便发送消息。KafkaTemplate有多个重载的send方法,虽然方法参数不同,但是最终执行的都是doSend方法,而这个方法的参数为ProducerRecord<?,?>,其实就是将重载方法的参数都转换成ProducerRecord<?,?>而已,看下KafkaTemplate的代码:
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
@Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
return doSend(record);
}
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
if (correlationId != null) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
return doSend((ProducerRecord<K, V>) producerRecord);
}
这一点知道就可以了,现在也没必要去深究,目前还是学习如何使用为目的。
二、使用自动配置
目前我的项目有3张表,3个对应的model以及对应的controller、service、mapper。现在假设一个场景:在成功向数据库写入一条数据之后,使用kafka发送一条消息,消息内容就是对应的model,即写入数据库的数据,然后消费者接到消息后做进一步处理(进一步处理是啥我也不清楚....)。
所以我需要一个consumer来接收消息。接下来定一个MessageConsumer用来接收kafka发送的消息,因为有3个不同的model,所以我定义3个不同的Topic来区分。代码如下:
@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {
@KafkaHandler
public void listen(ConsumerRecord<?,?> record) {
log.info(">>>> kafka consumer record message={} <<<<",record.toString());
}
}
@KafkaListener注解加在类上或者方法上都是可以的,个人感觉没什么区别。这里topics是一个数组,监听所有Topic的消息。不过如果@KafkaListener加在类上,监听方法则需要再加上@KafkaHandler注解。这里接收参数使用的是ConsumerRecord<?,?>,对应上面提到的ProducerRecord<?,?>。都是将消息进行了一层包装而已。
基本上一个简单的消息发送和接收就完成了,发送消息时带上自定义的Topic和对应的数据就可以了。到现在还没有做任何和kafka相关的配置内容,但是这样启动项目肯定是会报错的,因为没有配置消费者的groupId。当然如果不配置也是可以的,不过需要在@KafkaListener增加一个id属性,我选择在配置文件指定一个groupId,配置文件增加一个配置:
spring.kafka.consumer.group-id=custom_kafka
接着再启动项目,并在写入数据库之后使用KafkaTemplate发送消息,service代码如下:
@Override
public Map<String, Object> save(Product product) {
Map<String,Object> resultMap = new HashMap<>();
int insert = productMapper.insert(product);
if (insert == 1) {
kafkaTemplate.send(TOPIC,product);
resultMap.put("success",true);
resultMap.put("message","save success and send to MQ succeed");
} else {
// 保存失败
resultMap.put("message","save failed");
}
return resultMap;
}
项目启动后使用rest client调用接口,结果抛出异常,异常信息如下:
java.lang.ClassCastException: com.ypc.kafka.entity.Product cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar:na]
根据上面的代码可知发送的消息为一个Product对象,而异常信息表示是Product不能转成String。看到应该很清楚了就是序列化的问题,而且kafka默认的kafka默认的序列化就是StringSerializer。而且根据项目启动时kafka的一些信息也可以得出这个结论。
既然kafka默认不配置不行,那就需要进行自己的配置。
三、自定义配置
既然要修改默认序列化配置,那么我们需要知道有哪些,在"org.apache.kafka.common.serialization"下有9个序列化相关的类,但是除了StringSerializer之外其他的序列化基本都是基础数据类型,如果是自定义类型还是没办法使用。其实每次说到序列化我一般就会想到使用json,因为json使用非常的广泛,后面网上找了一下spring提供了对json序列化的支持,所以配置文件添加对key和value该项配置:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
接着我们再次启动项目,并调用接口测试接收消息,依然报错,这个真的有点出乎意料,异常信息如下:
rg.springframework.kafka.KafkaException: No method found for class java.lang.String
at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:170) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:279) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:67) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
No method found for class java.lang.String??只能跟踪源码了。看DelegatingInvocableHandler的invoke方法的代码:
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
Class<? extends Object> payloadClass = message.getPayload().getClass();
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
Object result = handler.invoke(message, providedArgs);
Expression replyTo = this.handlerSendTo.get(handler);
return new InvocationResult(result, replyTo, this.handlerReturnsMessage.get(handler));
}
protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> payloadClass) {
InvocableHandlerMethod handler = this.cachedHandlers.get(payloadClass);
if (handler == null) {
handler = findHandlerForPayload(payloadClass);
if (handler == null) {
throw new KafkaException("No method found for " + payloadClass);
}
this.cachedHandlers.putIfAbsent(payloadClass, handler); //NOSONAR
setupReplyTo(handler);
}
return handler;
}
Message中封装的就是发送的消息,即payload,因为前面将Product序列化成了json,即String类型,所以得到的payloadClass变量为String类型,接着getHandlerForPayload方法从cachedHandlers没有获取到对应的handler,findHandlerForPayload方法返回结果依然为null。难道是因为Consumer里面没有定义接收String类型参数的方法吗??测试一下吧,修改MessageConsumer代码如下:
@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {
@KafkaHandler
public void listen(ConsumerRecord<?,?> record) {
System.out.println(record);
log.info(">>>> kafka consumer record message={} <<<<",record.toString());
}
@KafkaHandler
public void listen(String message) {
log.info(">>>> kafka consumer String message={} <<<<",message);
}
}
添加一个专门接收String类型参数的方法,然后再次测试,测试结果成功:
反思一下:其实之所以遇到上面这个问题,有自己先入为主的思想,以为监听的时候接收的数据类型都可以包装在ConsumerRecord<?,?>对象中,然后从这个对象中获取具体的消息体即可。但是自己忘了一个前提条件,就是这个项目中@KafkaListener注解是在类上,而不是方法上,有兴趣的可以去测试一下,对比一下@KafkaListener在类上和在方法上的区别。我测试发现如果@KafkaListener加在方法上使用ConsumerRecord<?,?>是可以接收任何消息的,因为这时候使用的是默认的handler。另外如果@KafkaListener在方法上和在类上的消费者同时存在,在类上的消费者是无法接收到消息的。也就是优先使用@KafkaListene在方法上的消费者。
根据上面遇到的情况,我觉得可以重载多个不同的方法针处理不同的消息类型,比如我有Order、User、Product三种不同的类型,那么我可以定义三个不同的方法来处理这些数据。不过因为对数据进行了json格式序列化,所以如果要接收这三种类型的数据,还需要对其进行反序列化处理,所以配置文件增加反序列化配置:
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
然后MessageConsumer增加两个方法,代码如下:
@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {
@KafkaHandler
public void user(User user) {
// 处理 user ...
log.info(">>>> kafka consumer UserType message={} <<<<",user.toString());
}
@KafkaHandler
public void order(Order order) {
// 处理 order ...
log.info(">>>> kafka consumer OrderType message={} <<<<",order.toString());
}
@KafkaHandler
public void listen(ConsumerRecord<?,?> record) {
log.info(">>>> kafka consumer record message={} <<<<",record.toString());
}
@KafkaHandler
public void listen(String message) {
log.info(">>>> kafka consumer String message={} <<<<",message);
}
}
之所以没有定义接收Product类型方法,我还是想通过ConsumerRecord<?,?>这个对象来处理所有未知数据类型。
启动项目,调用Order类型的接口,测试能否接收到数据。调用之后报错,报错信息如下:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ORDER_TOPIC-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.ypc.kafka.entity.Order' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
意思就是我定义的实体类不在反序列信任的包内,默认只信任java.util, java.lang这两个包中的类,所以需要将实体类的包添加为可信任,或者使用"*"将所有的都添加为可信任的。所以需要再添加一个配置
spring.kafka.consumer.properties.spring.json.trusted.packages=com.ypc.kafka.entity
然后再次测试,没有报错结果如下:
通过上面的这些配置,一步步实现了发送数据的序列化和反序列化,也完成了通过具体数据类型来执行不同的方法。当然也不是非要这么做,其实也可以通过监听器监听的Topic来实现,即不同的数据类型使用不同的Topic,比如Order类型那么只监听该类型的Topic就行了,但是前提是@KafkaListener需要在方法上。
接下来就是关于通用类型的处理,在MessageConsumer中我只写了处理Order和User这两种类型的方法,之所以没有写Product的类型,我是想准备一个通用的方法来处理未知类型(假设Product类型未知),但是ConsumerRecord<?,?>类型接收是不可行的,这个上面已经说过了。那我使用一个Object类型是否可行呢?添加一个方法:
@KafkaHandler
public void object(Object object) {
log.info(">>>> kafka consumer UnknownType message={} <<<<",object.toString());
}
然后调用Product相关接口测试一下,结果报错:
可以接收到Product类型,而且根据输出结果可以看出消息其实还是封装在ConsumerRecord里面的,是不是我上面使用ConsumerRecord的姿势不正确啊???使用Object真的可行吗,当然不行,至少现在是不可行的。如果我再调用一下Order的接口,结果报错:
org.springframework.kafka.KafkaException: Ambiguous methods for payload type: class com.ypc.kafka.entity.Order: object and order
也就是说其实识别出来这个类型为Order了,但是因为有了Object的存在导致它不知道该使用哪个方法来处理这个数据。如果只定义一个方法,即所有类型都使用Object这样是没问题的。那么能不能指定一个默认的handler,即如果类型确定的情况下由相应的方法处理,不确定的情况则由公用方法处理。@KafkaHandler正好有一个属性就是设置是否为默认处理器的,所以Object方法修改如下:
@KafkaHandler(isDefault = true)
public void object(Object object) {
log.info(">>>> kafka consumer UnknownType message={} <<<<",object.toString());
}
再次使用Order和Product的接口进行测试,结果如下:
从日志输出结果来看是没有问题了,即可以接收其他类型数据,且不影响已知的类型数据处理。但是因为这个Object之包含消息体本身,所以很难进一步处理,所以我觉得还是回到使用ConsumerRecord<?,?>这个路子上比较好,将MessageConsumer代码最终修改如下:
@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {
@KafkaHandler
public void user(User user) {
// 处理 user ...
log.info(">>>> kafka consumer UserType message={} <<<<",user.toString());
}
@KafkaHandler
public void order(Order order) {
// 处理 order ...
log.info(">>>> kafka consumer OrderType message={} <<<<",order.toString());
}
@KafkaHandler(isDefault = true)
public void listen(ConsumerRecord<?, ?> record) {
log.info(">>>> kafka consumer CommonType message={} <<<<",record.toString());
}
}
四、使用java配置的方式
因为spring推荐使用java配置的方式,所以根据将相关配置转成java代码就行了,代码如下:
@Configuration
//@EnableKafka
public class KafkaConfig {
@Bean
public DefaultKafkaHeaderMapper defaultKafkaHeaderMapper() {
DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
defaultKafkaHeaderMapper.addTrustedPackages("com.ypc.kafka.entity");
return defaultKafkaHeaderMapper;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"custom_kafka");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,"com.ypc.kafka.entity");
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<?,?> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory(),true);
}
}
我看的官方文档说需要在配置类加上@EnableKafka注解,但是我测试的时候发现不加也是可以的,这个不知道是什么情况。
配置文件如下:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=custom_kafka
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=true
#
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
# kafka额外配置
#spring.kafka.consumer.properties.spring.json.value.default.type=com.ypc.kafka.entity.Order
#spring.kafka.producer.properties.spring.json.type.mapping=order:com.ypc.kafka.entity.Order,user:com.ypc.kafka.entity.User
spring.kafka.consumer.properties.spring.json.trusted.packages=com.ypc.kafka.entity
#spring.kafka.consumer.properties.spring.json.type.mapping=order:com.ypc.kafka.entity.Order,user:com.ypc.kafka.entity.User
有些配置自己是在java代码中没有设置的,比如自动提交、添加json类型到header等等。
五、小结
其实这次学习也算是从0入门的一个过程,即从默认配置一点点增加、修改,然后测试,内容并不算多。这次相对上次有几个不同之处:一是使用注解不同,这次@KafkaListener注解在类上,且使用了@KafkaHandler注解;二是一个监听器监听多个Topic,且有多个类型的handler。当然其中涉及到自定义序列化和反序列化的问题是这次学习的主要内容,现在看来确实挺简单,但是实际中还是遇到了很多的坑。在上面的配置文件中还有两个遗留问题,没有解决,一是spring.kafka.producer.properties.spring.json.add.type.headers=true
这个配置的使用和意义,自己没有去查询具体head的内容,因为是二进制数据,debug的时候没有看到具体信息。二是:
spring.kafka.producer.properties.spring.json.type.mapping=
spring.kafka.consumer.properties.spring.json.type.mapping=
这两个配置的使用问题,这个让我有点迷惑,等有时间自己再试着看看。这次的代码提交到我的github,如果有什么问题欢迎交流。