创建docker nerwork
docker network create -d bridge mynetwork
docker compose安装Kafka集群版
- 调整IP地址KAFKA_CFG_ADVERTISED_LISTENERS
- 调整volumes文件挂载地址
docker-compose.yml
version: "3"
services:
kafka1:
image: 'bitnami/kafka:3.3.1'
network_mode: mynetwork
container_name: kafka11
user: root
ports:
- 9192:9092
- 9193:9093
environment:
### 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
### broker配置
# 定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.201.68:9192
# broker.id,必须唯一
- KAFKA_BROKER_ID=1
volumes:
- /Users/wenyaobo/Src/kafka_data/kafka1/kafka/kraft:/bitnami/kafka
#extra_hosts:
#- "kafka1:云服务器IP"
#- "kafka2:云服务器IP"
#- "kafka3:云服务器IP"
kafka2:
image: 'bitnami/kafka:3.3.1'
network_mode: mynetwork
container_name: kafka22
user: root
ports:
- 9292:9092
- 9293:9093
environment:
### 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
### broker配置
# 定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.201.68:9292
# broker.id,必须唯一
- KAFKA_BROKER_ID=2
volumes:
- /Users/wenyaobo/Src/kafka_data/kafka2/kafka/kraft:/bitnami/kafka
kafka3:
image: 'bitnami/kafka:3.3.1'
network_mode: mynetwork
container_name: kafka33
user: root
ports:
- 9392:9092
- 9393:9093
environment:
### 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
### broker配置
# 定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.201.68:9392
# broker.id,必须唯一
- KAFKA_BROKER_ID=3
volumes:
- /Users/wenyaobo/Src/kafka_data/kafka3/kafka/kraft:/bitnami/kafka
docker-compose.yml文件目录下执行
docker-compose -f docker-compose.yml up
docker compose安装Kafka可视化页面
version: "3"
services:
kafka-ui:
image: provectuslabs/kafka-ui:latest
network_mode: mynetwork
container_name: kafka-ui
restart: always
ports:
- 9091:8080
volumes:
- /Users/wenyaobo/Src/kafka_ui/etc/localtime:/etc/localtime
environment:
# 集群名称
- KAFKA_CLUSTERS_0_NAME=local
# 集群地址
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka11:9092,kafka22:9092,kafka33:9092
docker-compose.yml文件目录下执行
docker-compose -f docker-compose.yml up
kafka操作
#进入容器
docker exec -it kafka11 /bin/bash
#进入目录
cd /opt/bitnami/kafka/bin/
#创建topic
kafka-topics.sh --create --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.
#查看所有Topic
kafka-topics.sh --list --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092
test
#查看topic详情
kafka-topics.sh --describe --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092 --topic test
Topic: test TopicId: yiKjk9VTTZqVolLOEbZrbw PartitionCount: 3 ReplicationFactor: 1 Configs: min.insync.replicas=1,cleanup.policy=delete,retention.ms=86400000,retention.bytes=-1
Topic: test Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 2 Leader: 2 Replicas: 2 Isr: 2
# 启动一个生产者(输入消息)
kafka-console-producer.sh --broker-list kafka11:9092,kafka22:9092,kafka33:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?
# 启动一个消费者(等待消息)
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
kafka-console-consumer.sh --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?