运行环境
操作系统:Windows 10 ;
Linux发行版:CentOS Linux release 7.6.1810 (Core)
JDK版本:1.8.0_221
说在前面
kafka作为开源的消息队列软件,版本迭代很快,就跟那个ElasticSearch一样。版本迭代一快,版本向下的兼容性就做的不好。这是一个比较令人头疼的问题。
我们自己的项目本身不可能随着这些开源软件,随时去升级版本。所以只能要去解决这种版本兼容的问题了。
所以,我打算从不同的版本来切入,梳理出不同的解决方案来。
我这里要说的就两个版本:一个是老版本 kafka_2.11-0.10.1.1 ,一个是当前的最新稳定版 kafka_2.13-2.6.1。
一、Kafka新版本:kafka_2.13-2.6.1
kafka版本是大前提,在这个大前提下,我们的运行环境如下:
spring boot 版本:2.0.9.RELEASE
spring-kafka版本: 2.1.12.RELEASE(该版本是由spring boot管理的)
kafka-clients版本:1.0.2(该版本是由spring boot管理的)
我上面写的这些当然都是我自己测试通过的。按道理来说,spring相关技术 应该使用与kafka版本匹配的版本。
我这里先贴一个官方的版本图表:
很显然,我的spring版本和kafka集群版本不匹配:
- spring版本要跟随升级吗?不可能的,项目开发了这么多年了,怎么可能因为接入一个消息队列,就去升级呢。
- kafka版本降级吗?也不可能了,为了适配,spring boot需要降级,但是即使降级了,官方也把这个低版本淘汰掉了。
- kafka-clients和spring-kafka版本降级吗?也是不行的,因为当前版本spring boot加入了自动装配kafka,导致了spring boot为了兼容,也要降级。回到了上面的问题。
综合上面的分析,只能选择我上面的版本组合了。
好确定完版本之后,我们从后端往前端来说:
1. kafka理论知识
Kafka就是一种发布-订阅模式的消息队列。
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
2. kafka安装配置
- 下载kafka;
https://kafka.apache.org/
> tar -xzf kafka_2.13-2.6.1.tgz
> cd kafka_2.13-2.7.0
- 配置conf/server.properties
这里只说最少配置,为了保证基本的使用。
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# 监听配置,为了外部机器远程访问
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# 还是为了外部机器远程访问
advertised.listeners=PLAINTEXT://10.72.49.135:9092
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
# 消息数据存放的地方
log.dirs=/opt/kafka/newer/kafka_2.13-2.6.1/data/kafka-logs
3. kafka命令操作
有了上面的配置,我们就可以开始启动服务了。
- 运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
- 现在启动kafka服务
> nohup bin/kafka-server-start.sh config/server.properties &
- 创建一个名为“test”的Topic,只有一个分区和一个备份
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- 创建好之后,可以通过运行以下命令,查看已创建的topic信息:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
- Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。运行producer(生产者),然后在控制台输入几条消息到服务器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
- 查看kafka版本号:
# 进入kafka安装目录:
> cd /opt/cloudera/parcels/KAFKA/lib/kafka
# 执行语句:
> find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
4. spring boot 中开发
经过上述一顿猛如虎的操作,我们就来到IDEA中,通过spring boot 来连接并操作kafka集群。
- 引入spring-kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- application.yml配置kafka相关参数
spring:
kafka:
bootstrap-servers: 10.72.49.135:9092
producer:
acks: 1
retries: 0
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
default:
trusted:
packages: com.ncs.entity.group
max-poll-records: 100
- 配置生产者
关键点,就是要使用KafkaTemplate来生产消息。
还有就是,在配置中使用了JsonSerializer,所以我们放入消息队列中的数据是自动转化为json格式的。
而数据格式是放在消息的头部header中的。这对于我们消费消息很有用。
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendMsg() {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, pushInfo);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.info("Topic: {} - 生产者 发送消息失败:{}", topic, ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
//log.info("Topic: {} - 生产者 发送消息成功: {}",topic, result.getProducerRecord().value());
}
});
}
}
}
- 配置消费者
我们这里直接就可以拿到java类型的数据。这是因为,我们配置了JsonDeserializer,把json格式的数据直接反序列化,但是,反序列化需要一个java类型,这个java类型哪里来的呢?就是从消息头中获取到的。
public class KafkaConsumer {
@KafkaListener(topics = "test2",groupId = "test2-group")
public void cosumeMsg(PushInfo pushInfo) {
log.info("消费者消费了:"+pushInfo.toString());
}
}
看起来基于spring boot来开发kafka是很简单的。因为spring boot把很多底层的操作都做了封装。
二、Kafka老版本:kafka_2.11-0.10.1.1
我们来看老版本的kafka集群。关键的问题就是要处理兼容性。
上面kafka版本依然是大前提,在这个大前提下,我们的运行环境如下:
spring boot 版本:2.0.9.RELEASE
spring-kafka版本: 2.1.12.RELEASE(该版本是由spring boot管理的)
kafka-clients版本:1.0.2(该版本是由spring boot管理的)
你没有看错,我还是这些版本,没有变。改变版本是不可能的了。原因去前面再看看吧。我们只能从前端程序调用部分来调整兼容性了。
好确定完版本之后,我们从后端往前端来说:
1. kafka理论知识
理论部分,我就不再重复了。
2. kafka安装配置
前面有讲,还是不赘述了。
3. kafka命令操作
前面都有讲,这里不赘述。只说因为版本差异,不同的操作部分。
- 创建一个名为“test”的Topic,只有一个分区和一个备份
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4. spring boot 中开发
经过上述一顿猛如虎的操作,我们就来到IDEA中,通过spring boot 来连接并操作kafka集群。
- 引入spring-kafka依赖
不赘述了。 - application.yml配置kafka相关参数,这里是有差异的。
spring:
kafka:
bootstrap-servers: 10.72.49.135:9092
producer:
acks: 1
retries: 0
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 这里的序列化器依然可以使用JsonSerializer,但是要禁止在消息头部添加类型信息
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 这里配置不添加类型信息头,要不然,老版本kafka集群不支持,会报错。
properties:
spring:
json:
add:
type:
headers: false
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 这里的反序列化器就不能使用JsonDeserializer,需要使用StringDeserializer,但是貌似也没有起作用。
# value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring:
json:
default:
# value:
# type: com.ncs.entity.group.BrwRateStatis
trusted:
packages: com.ncs.entity.group
max-poll-records: 100
- 配置生产者
和前面一毛一样,不赘述了,只是配置中禁止添加类型信息消息头,这里生产的消息,底层应该有不同。 - 配置消费者
和前面一毛一样,不再赘述。 - 配置kafka
你发现没有,消费者在消费json格式的数据的时候,是需要类型信息的,但是由于老版本的原因,我们已经禁用了。那么消费怎么能够成功转换呢?这里,我们就需要进行自定义配置kafka了。
// 1.启用这些自定义的kakfa配置
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
factory.setMessageConverter(new StringJsonMessageConverter());//2.关键点,从消息队列中取出string数据,然后按照json格式解析,并放入到对象中。这里,spring-kafka会拿到消费者参数里的类型信息,然后,根据这个类型信息转换json数据。
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<Integer,String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.72.49.135:9092");
//3. 要配置这些信息,貌似因为自定义了这写配置,就把application.yml中的相关配置给架空了。
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
做完这些,我们又可以愉快的玩耍了。
附录
说完上面的这些,其实,就可以完结了。但是,由于刚入门的一个误区,所以,还要另外讲一下schema的知识。
1. Kafka Schema Registry
首先了解一下相关的理论部分。
基本上,对于Kafka Producers和Kafka Consumer,Kafka的Schema Registry都存储Avro Schemas。
- 它提供了一个用于管理Avro架构的RESTful界面。
- 它允许存储版本化模式的历史记录。
- 此外,它还支持检查Kafka的架构兼容性。
-
使用Avro Schema,我们可以配置兼容性设置以支持Kafka模式的发展。
2. apache avro
基本上,Kafka Avro序列化项目提供了序列化程序。在Avro和Kafka Schema Registry的帮助下,使用Kafka Avro序列化的Kafka Producers和Kafka Consumer都处理模式管理以及记录的序列化。
3. 报错1:Magic v1 does not support record headers
在使用spring boot 2.0.9 连接kafka 0.10.1.1 的时候,报这个错误。显然是因为kafka client的版本比kafka集群高,导致版本不兼容。深层原因是:高版本的客户端使用了JSON serializer,这个方法在record headers添加了类型信息,但是< 0.11.0.0的kafka不支持,所以就报错了。
解决方式有两种:
1.关闭在头部添加类型信息;
2.对kafka客户端进行降级;
4. 如果kafka集群中没有topic,那么客户端会自动创建topic。
5. kafka client的版本问题
目前的现状是,kafka官方团队在维护java client,除此之外的其他语言的客户端,都是由志愿者在维护开发。官方原文:Kafka exposes all its functionality over a language independent protocol which has clients available in many programming languages. However only the Java clients are maintained as part of the main Kafka project, the others are available as independent open source projects. A list of non-Java clients is available here.(译文:kafka公开了其所有的功能协议,与语言无关。只有java客户端作为kafka项目的一部分进行维护,其他的作为开源的项目提供,这里提供了非java客户端的列表。)
6. 高版本spring-kafka 兼容 老版本 kafka_0.10.1.1,反序列化json的方案
由于老版本kafka中,不支持消息头中添加类型信息,所以,我们再反序列化的时候,就要想办法获取到这个java类型信息。
- 配置文件中配置默认的反序列化的类型
在application.yml中配置默认的反序列化器中的目标类型,JsonDeserializer反序列化器在找不到目标类型的时候,会使用默认的类型来转换。缺点是只能支持一种数据类型的消费,不够灵活。 - 配置DefaultKafkaConsumerFactory
@KafkaListener配置containerFactory参数,然后,在KafkaConfig配置类中配置,关键点在于,在consumerFactory中配置JsonDeserializer反序列化器,并传入需要转换的目标类型UserInfo。很显然,这种方式的缺点就是,只能支持一种类型的消费。不够灵活。 - 使用avro的配置方式;
这种方式就不尝试了,成本有点高。但是,肯定可以实现的。 - 使用ConsumerRecord<Integer, String>接收参数
这种方案,我原以为可以实现,但是,由于泛型的类型擦除,并不能实现。
但是,按照类型推断来说,本可以实现的。但是,这里是形参,在运行期间,已经没有相关的类型信息了,理论上,还是不可行。 - 利用StringJsonMessageConverter实现
这是最好的方案,官方推荐。满足灵活需求,可以支持多种消息类型的消费。原理就是,在转换json的过程中,spring-kafka会拿到消息数据,同时,也可以拿到@KafkaListener监听方法中的参数类型信息,根据这两个数据,就可以完成json转换为java类型的对象了。 - 使用自定义的JsonDeserializer
这个方法也是可以实现的,我没有尝试。理论上是可行的,因为在JsonDeserializer的public T deserialize(String topic, Headers headers, byte[] data) {}
关键方法中,我们可以拿到消息数据,也可以拿到topic信息,有了这两个参数,我们可以根据topic来转换为指定的类型对象。