java 连接kafka失败
现象:一直报错
[Consumer clientId=consumer-1, groupId=testGroup] Connection to node -1 could not be established. Broker may not be available
分析过程:
1.我们使用的是镜像搭建,然后我这边在java层写了个简单的demo。通过spring boot的jar包。spring-kafka 这个包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.5</version>
</dependency>
构架了个简单的消息demo。
但是发现启动一直报错。
ailed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
看到日志中还包含有:
[Consumer clientId=consumer-1, groupId=testGroup] Connection to node -1 could not be established. Broker may not be available.
2.于是怀疑kafka有问题。
然后开始对kafa服务进行测试。
创建topic
bin/kafka-topics.sh --bootstrap-server 192.168.XX.XX:9092 --create --topic st_topic --replication-factor 1 --partitions 1
查看topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.XX.XX:9092
发送消息
bin/kafka-console-producer.sh --broker-list 192.168.XX.XX:9092 --topic st_topic
消费消息
bin/kafka-console-consumer.sh --topic st_topic --from-beginning --bootstrap-server 192.168.XX.XX:9092 --group newGroup
查看消费组列表
bin/kafka-consumer-groups.sh --list --bootstrap-server 192.168.XX.XX:9092
查看消费组详情
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.XX.XX:9092 --group newGroup -describe
进行了一系列的操作,发现没问题
3.然后通过查询百度,发现统一的说改配置:
advertised.listeners=PLAINTEXT://192.168.XX.XX:9092
listeners=PLAINTEXT://192.168.XX.XX:9092
这个地址一定要配置成主机ip。
对应着镜像的配置:
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:监听器名称和安全协议的映射配置。每个监听器的名称只能在map中出现一次。
KAFKA_CFG_INTER_BROKER_LISTENER_NAME:用于配置broker之间通信使用的监听器名称
KAFKA_CFG_LISTENERS:用于配置broker监听的URI以及监听器名称列表
KAFKA_CFG_ADVERTISED_LISTENERS:将该地址发布到zookeeper供客户端使用
- name: KAFKA_CFG_LISTENERS
value: 'BROKER://0.0.0.0:9092,EXTERNAL://0.0.0.0:9095'
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: 'BROKER://$(POD_IP):9092'
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: 'CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT'
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: BROKER
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
类似这样的配置。修改了,但是没有效果。
4.然后通过telelnet命令来检测端口是否开通发现没有问题。
5.最后在想着是否配置有问题,去掉了多余的java中kafka的配置,然后
再去测试,发现还是有问题。
6.最后想着不用spring-kafka,直接使用kafka-client包,来做测试,发送信息成功。
最后想了下,可能是版本的问题。查看spring-kafka的版本,对应着kafka-client的版本太低,最终确定就是版本的问题。