java 连接kafka失败问题处理

java 连接kafka失败

现象:一直报错

[Consumer clientId=consumer-1, groupId=testGroup] Connection to node -1 could not be established. Broker may not be available

分析过程:

1.我们使用的是镜像搭建,然后我这边在java层写了个简单的demo。通过spring boot的jar包。spring-kafka 这个包

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<version>2.9.5</version>

</dependency>

构架了个简单的消息demo。

但是发现启动一直报错。

ailed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

看到日志中还包含有:

[Consumer clientId=consumer-1, groupId=testGroup] Connection to node -1 could not be established. Broker may not be available.

2.于是怀疑kafka有问题。

然后开始对kafa服务进行测试。

创建topic

bin/kafka-topics.sh --bootstrap-server 192.168.XX.XX:9092 --create --topic st_topic --replication-factor 1 --partitions 1

查看topic

bin/kafka-topics.sh --list --bootstrap-server 192.168.XX.XX:9092

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.XX.XX:9092  --topic st_topic

消费消息

bin/kafka-console-consumer.sh --topic st_topic --from-beginning --bootstrap-server 192.168.XX.XX:9092 --group newGroup

查看消费组列表

bin/kafka-consumer-groups.sh --list --bootstrap-server 192.168.XX.XX:9092

查看消费组详情

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.XX.XX:9092 --group newGroup -describe

进行了一系列的操作,发现没问题

3.然后通过查询百度,发现统一的说改配置:

advertised.listeners=PLAINTEXT://192.168.XX.XX:9092

listeners=PLAINTEXT://192.168.XX.XX:9092

这个地址一定要配置成主机ip。

对应着镜像的配置:

KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:监听器名称和安全协议的映射配置。每个监听器的名称只能在map中出现一次。

KAFKA_CFG_INTER_BROKER_LISTENER_NAME:用于配置broker之间通信使用的监听器名称

KAFKA_CFG_LISTENERS:用于配置broker监听的URI以及监听器名称列表

KAFKA_CFG_ADVERTISED_LISTENERS:将该地址发布到zookeeper供客户端使用

- name: KAFKA_CFG_LISTENERS

          value: 'BROKER://0.0.0.0:9092,EXTERNAL://0.0.0.0:9095'

        - name: KAFKA_CFG_ADVERTISED_LISTENERS

          value: 'BROKER://$(POD_IP):9092'

        - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP

          value: 'CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT'

        - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME

          value: BROKER

        - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES

类似这样的配置。修改了,但是没有效果。

4.然后通过telelnet命令来检测端口是否开通发现没有问题。

5.最后在想着是否配置有问题,去掉了多余的java中kafka的配置,然后

再去测试,发现还是有问题。

6.最后想着不用spring-kafka,直接使用kafka-client包,来做测试,发送信息成功。

最后想了下,可能是版本的问题。查看spring-kafka的版本,对应着kafka-client的版本太低,最终确定就是版本的问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容