Kafka消息队列-从开始到上线

运行环境

操作系统:Windows 10 ;
Linux发行版:CentOS Linux release 7.6.1810 (Core)
JDK版本:1.8.0_221

说在前面

kafka作为开源的消息队列软件,版本迭代很快,就跟那个ElasticSearch一样。版本迭代一快,版本向下的兼容性就做的不好。这是一个比较令人头疼的问题。
我们自己的项目本身不可能随着这些开源软件,随时去升级版本。所以只能要去解决这种版本兼容的问题了。
所以,我打算从不同的版本来切入,梳理出不同的解决方案来。
我这里要说的就两个版本:一个是老版本 kafka_2.11-0.10.1.1 ,一个是当前的最新稳定版 kafka_2.13-2.6.1。

一、Kafka新版本:kafka_2.13-2.6.1

kafka版本是大前提,在这个大前提下,我们的运行环境如下:

spring boot 版本:2.0.9.RELEASE
spring-kafka版本: 2.1.12.RELEASE(该版本是由spring boot管理的)
kafka-clients版本:1.0.2(该版本是由spring boot管理的)

我上面写的这些当然都是我自己测试通过的。按道理来说,spring相关技术 应该使用与kafka版本匹配的版本。
我这里先贴一个官方的版本图表:


image.png

很显然,我的spring版本和kafka集群版本不匹配:

  • spring版本要跟随升级吗?不可能的,项目开发了这么多年了,怎么可能因为接入一个消息队列,就去升级呢。
  • kafka版本降级吗?也不可能了,为了适配,spring boot需要降级,但是即使降级了,官方也把这个低版本淘汰掉了。
  • kafka-clients和spring-kafka版本降级吗?也是不行的,因为当前版本spring boot加入了自动装配kafka,导致了spring boot为了兼容,也要降级。回到了上面的问题。

综合上面的分析,只能选择我上面的版本组合了。

好确定完版本之后,我们从后端往前端来说:

1. kafka理论知识

Kafka就是一种发布-订阅模式的消息队列。
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:


image.png

2. kafka安装配置

> tar -xzf kafka_2.13-2.6.1.tgz
> cd kafka_2.13-2.7.0
  • 配置conf/server.properties
    这里只说最少配置,为了保证基本的使用。
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# 监听配置,为了外部机器远程访问
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# 还是为了外部机器远程访问
advertised.listeners=PLAINTEXT://10.72.49.135:9092

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# 消息数据存放的地方
log.dirs=/opt/kafka/newer/kafka_2.13-2.6.1/data/kafka-logs

3. kafka命令操作

有了上面的配置,我们就可以开始启动服务了。

  • 运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 现在启动kafka服务
> nohup bin/kafka-server-start.sh config/server.properties &
  • 创建一个名为“test”的Topic,只有一个分区和一个备份
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  • 创建好之后,可以通过运行以下命令,查看已创建的topic信息:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
  • Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。运行producer(生产者),然后在控制台输入几条消息到服务器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 查看kafka版本号:
# 进入kafka安装目录:
> cd /opt/cloudera/parcels/KAFKA/lib/kafka
# 执行语句: 
> find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

4. spring boot 中开发

经过上述一顿猛如虎的操作,我们就来到IDEA中,通过spring boot 来连接并操作kafka集群。

  • 引入spring-kafka依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • application.yml配置kafka相关参数
spring:
  kafka:
    bootstrap-servers: 10.72.49.135:9092
    producer:
      acks: 1
      retries: 0
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            default:
            trusted:
              packages: com.ncs.entity.group
      max-poll-records: 100
  • 配置生产者
    关键点,就是要使用KafkaTemplate来生产消息。
    还有就是,在配置中使用了JsonSerializer,所以我们放入消息队列中的数据是自动转化为json格式的。
    而数据格式是放在消息的头部header中的。这对于我们消费消息很有用。
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public void sendMsg() {
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, pushInfo);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.info("Topic: {} - 生产者 发送消息失败:{}", topic, ex.getMessage());
                }

                @Override
                public void onSuccess(SendResult<String, Object> result) {
                    //log.info("Topic: {} - 生产者 发送消息成功: {}",topic, result.getProducerRecord().value());
                }
            });
        }
    }
}

  • 配置消费者
    我们这里直接就可以拿到java类型的数据。这是因为,我们配置了JsonDeserializer,把json格式的数据直接反序列化,但是,反序列化需要一个java类型,这个java类型哪里来的呢?就是从消息头中获取到的。
public class KafkaConsumer {
    @KafkaListener(topics = "test2",groupId = "test2-group")
    public void cosumeMsg(PushInfo pushInfo) {
        log.info("消费者消费了:"+pushInfo.toString());
    }
}

看起来基于spring boot来开发kafka是很简单的。因为spring boot把很多底层的操作都做了封装。

二、Kafka老版本:kafka_2.11-0.10.1.1

我们来看老版本的kafka集群。关键的问题就是要处理兼容性。
上面kafka版本依然是大前提,在这个大前提下,我们的运行环境如下:

spring boot 版本:2.0.9.RELEASE
spring-kafka版本: 2.1.12.RELEASE(该版本是由spring boot管理的)
kafka-clients版本:1.0.2(该版本是由spring boot管理的)

你没有看错,我还是这些版本,没有变。改变版本是不可能的了。原因去前面再看看吧。我们只能从前端程序调用部分来调整兼容性了。

好确定完版本之后,我们从后端往前端来说:

1. kafka理论知识

理论部分,我就不再重复了。

2. kafka安装配置

前面有讲,还是不赘述了。

3. kafka命令操作

前面都有讲,这里不赘述。只说因为版本差异,不同的操作部分。

  • 创建一个名为“test”的Topic,只有一个分区和一个备份
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4. spring boot 中开发

经过上述一顿猛如虎的操作,我们就来到IDEA中,通过spring boot 来连接并操作kafka集群。

  • 引入spring-kafka依赖
    不赘述了。
  • application.yml配置kafka相关参数,这里是有差异的。
spring:
  kafka:
    bootstrap-servers: 10.72.49.135:9092
    producer:
      acks: 1
      retries: 0
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 这里的序列化器依然可以使用JsonSerializer,但是要禁止在消息头部添加类型信息
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 这里配置不添加类型信息头,要不然,老版本kafka集群不支持,会报错。
      properties:
        spring:
          json:
            add:
              type:
                headers: false
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 这里的反序列化器就不能使用JsonDeserializer,需要使用StringDeserializer,但是貌似也没有起作用。
#      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        spring:
          json:
            default:
#              value:
#                type: com.ncs.entity.group.BrwRateStatis
            trusted:
              packages: com.ncs.entity.group
      max-poll-records: 100
  • 配置生产者
    和前面一毛一样,不赘述了,只是配置中禁止添加类型信息消息头,这里生产的消息,底层应该有不同。
  • 配置消费者
    和前面一毛一样,不再赘述。
  • 配置kafka
    你发现没有,消费者在消费json格式的数据的时候,是需要类型信息的,但是由于老版本的原因,我们已经禁用了。那么消费怎么能够成功转换呢?这里,我们就需要进行自定义配置kafka了。
// 1.启用这些自定义的kakfa配置
@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setMessageConverter(new StringJsonMessageConverter());//2.关键点,从消息队列中取出string数据,然后按照json格式解析,并放入到对象中。这里,spring-kafka会拿到消费者参数里的类型信息,然后,根据这个类型信息转换json数据。
        return factory;

    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<Integer,String>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.72.49.135:9092");
        //3. 要配置这些信息,貌似因为自定义了这写配置,就把application.yml中的相关配置给架空了。
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

做完这些,我们又可以愉快的玩耍了。

附录

说完上面的这些,其实,就可以完结了。但是,由于刚入门的一个误区,所以,还要另外讲一下schema的知识。

1. Kafka Schema Registry

首先了解一下相关的理论部分。
基本上,对于Kafka Producers和Kafka Consumer,Kafka的Schema Registry都存储Avro Schemas。

  • 它提供了一个用于管理Avro架构的RESTful界面。
  • 它允许存储版本化模式的历史记录。
  • 此外,它还支持检查Kafka的架构兼容性。
  • 使用Avro Schema,我们可以配置兼容性设置以支持Kafka模式的发展。


    image.png
2. apache avro

基本上,Kafka Avro序列化项目提供了序列化程序。在Avro和Kafka Schema Registry的帮助下,使用Kafka Avro序列化的Kafka Producers和Kafka Consumer都处理模式管理以及记录的序列化。

3. 报错1:Magic v1 does not support record headers

在使用spring boot 2.0.9 连接kafka 0.10.1.1 的时候,报这个错误。显然是因为kafka client的版本比kafka集群高,导致版本不兼容。深层原因是:高版本的客户端使用了JSON serializer,这个方法在record headers添加了类型信息,但是< 0.11.0.0的kafka不支持,所以就报错了。
解决方式有两种:
1.关闭在头部添加类型信息;
2.对kafka客户端进行降级;

4. 如果kafka集群中没有topic,那么客户端会自动创建topic。
5. kafka client的版本问题

目前的现状是,kafka官方团队在维护java client,除此之外的其他语言的客户端,都是由志愿者在维护开发。官方原文:Kafka exposes all its functionality over a language independent protocol which has clients available in many programming languages. However only the Java clients are maintained as part of the main Kafka project, the others are available as independent open source projects. A list of non-Java clients is available here.(译文:kafka公开了其所有的功能协议,与语言无关。只有java客户端作为kafka项目的一部分进行维护,其他的作为开源的项目提供,这里提供了非java客户端的列表。)

6. 高版本spring-kafka 兼容 老版本 kafka_0.10.1.1,反序列化json的方案

由于老版本kafka中,不支持消息头中添加类型信息,所以,我们再反序列化的时候,就要想办法获取到这个java类型信息。

  • 配置文件中配置默认的反序列化的类型
    在application.yml中配置默认的反序列化器中的目标类型,JsonDeserializer反序列化器在找不到目标类型的时候,会使用默认的类型来转换。缺点是只能支持一种数据类型的消费,不够灵活。
  • 配置DefaultKafkaConsumerFactory
    @KafkaListener配置containerFactory参数,然后,在KafkaConfig配置类中配置,关键点在于,在consumerFactory中配置JsonDeserializer反序列化器,并传入需要转换的目标类型UserInfo。很显然,这种方式的缺点就是,只能支持一种类型的消费。不够灵活。
  • 使用avro的配置方式;
    这种方式就不尝试了,成本有点高。但是,肯定可以实现的。
  • 使用ConsumerRecord<Integer, String>接收参数
    这种方案,我原以为可以实现,但是,由于泛型的类型擦除,并不能实现。
    但是,按照类型推断来说,本可以实现的。但是,这里是形参,在运行期间,已经没有相关的类型信息了,理论上,还是不可行。
  • 利用StringJsonMessageConverter实现
    这是最好的方案,官方推荐。满足灵活需求,可以支持多种消息类型的消费。原理就是,在转换json的过程中,spring-kafka会拿到消息数据,同时,也可以拿到@KafkaListener监听方法中的参数类型信息,根据这两个数据,就可以完成json转换为java类型的对象了。
  • 使用自定义的JsonDeserializer
    这个方法也是可以实现的,我没有尝试。理论上是可行的,因为在JsonDeserializer的public T deserialize(String topic, Headers headers, byte[] data) {}关键方法中,我们可以拿到消息数据,也可以拿到topic信息,有了这两个参数,我们可以根据topic来转换为指定的类型对象。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容