1.下载docker desktop
- 此处不再赘述
2.下载zookeeper镜像和kafka镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
3.启动zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper
4.获取zookeeper IPAddress
docker inspect zookeeper
# 找到对应的IPAddress
返回数据如下(截取部分数据),取IPAddress即可
"Networks": {
"bridge": {
"IPAMConfig": null,
"Links": null,
"Aliases": null,
"NetworkID": "ff561b4cef4f5443e27b7ad6c805a6ae94e818bc6e1a92fb4e0c8970f80b4cd8",
"EndpointID": "e9795ecf718d7785b01f23ba813aa0b7ae8a2d21ac7dcf5ce86e9ccf0e5959d5",
"Gateway": "172.17.0.1",
"IPAddress": "172.17.0.2",
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"MacAddress": "02:42:ac:11:00:02",
"DriverOpts": null
}
}
5.启动kafka
- kafka启动依赖于zookeeper,需要设置几个环境变量,其中KAFKA_ZOOKEEPER_CONNECT的值即为第四步获取的IPAddress。
KAFKA_ADVERTISED_HOST_NAME 为kafka本机的地址
KAFKA_ADVERTISED_PORT为kafka的端口号
docker run -d --name kafka --publish 9092:9092 \
--link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=172.17.0.2:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 \
--env KAFKA_ADVERTISED_PORT=9092 \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka:latest
6.kafka topic
1.进入kafka容器,进入目录
cd /opt/kafka/bin
/opt/kafka_2.13-2.7.0/bin # ls
connect-distributed.sh kafka-dump-log.sh kafka-streams-application-reset.sh
connect-mirror-maker.sh kafka-features.sh kafka-topics.sh
connect-standalone.sh kafka-leader-election.sh kafka-verifiable-consumer.sh
kafka-acls.sh kafka-log-dirs.sh kafka-verifiable-producer.sh
kafka-broker-api-versions.sh kafka-mirror-maker.sh trogdor.sh
kafka-configs.sh kafka-preferred-replica-election.sh windows
kafka-console-consumer.sh kafka-producer-perf-test.sh zookeeper-security-migration.sh
kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-server-start.sh
kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-stop.sh
kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-shell.sh
kafka-delegation-tokens.sh kafka-server-start.sh
kafka-delete-records.sh kafka-server-stop.sh
# 查看topic列表
kafka-topics.sh --list --zookeeper 172.17.0.2:2181
# 创建topic
kafka-topics.sh --create --zookeeper 172.17.0.2:2181 --replication-factor 1 --partitions 1 -
-topic test1
Created topic test1.
GUI工具
- Idea插件 zookeeper,kafkalytic
错误排查
- kafka docker容器启动后自动退出,很有可能是设置的环境变量有问题,需要按照第四步先通过docker inspect zookeeper获取ip地址,然后在kafka容器启动时通过环境变量赋值
- 进入命令行后发现报错,发现是KAFKA_ADVERTISED_HOST_NAME设置错误
[2021-08-26 15:33:13,854] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 (/172.17.0.2:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
springboot 集成kafka
pom.xml依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.janseitse.kaf</groupId>
<artifactId>kafk</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafk</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.properties
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
@kafkaListener
// 多个消费组可以消费同一个topic下的消息,一个消费组内的消费者只能有一个可以消费一个topic下的message,id即为消费组的名称
@KafkaListener(topics = "topic-order1",idIsGroup = true,id = "group1")
public void listen(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
Object key = consumerRecord.key();
System.out.println("group1");
System.out.println(consumerRecord.topic());
System.out.println(consumerRecord.partition());
System.out.println(consumerRecord.offset());
Object value = consumerRecord.value();
System.out.println(value);
acknowledgment.acknowledge();
}
@KafkaListener(topics = "topic-order",idIsGroup = true,id = "group2")
public void listen2(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
Object key = consumerRecord.key();
System.out.println("group2");
System.out.println("group2 topic is:"+consumerRecord.topic());
System.out.println("group2 partition is:"+consumerRecord.partition());
System.out.println("group2 offset is:"+consumerRecord.offset());
Object value = consumerRecord.value();
System.out.println("group2 value is:"+value);
acknowledgment.acknowledge();
}