环境
- 操作系统:CentOS 7.5
- 运行环境:Java 1.8
- zookeeper-3.4.12
- kafka_2.12-1.1.0
网络结构
- host1 172.18.0.11
- host2 172.18.0.12
- host3 172.18.0.13
修改host1、host2和host3机器上的配置文件/etc/hosts,新增内容如下:
172.18.0.11 host1
172.18.0.12 host2
172.18.0.13 host3
zookeeper集群
host1:2181
host2:2181
host3:2181
下载
- 下载kafka_2.12-1.1.0.tgz
- 分别上传到host1、host2和host3机器上的/usr/local文件夹下
- 分别解压文件tar -zxvf kafka_2.12-1.1.0.tgz,解压后的文件夹为 kafka_2.12-1.1.0
配置
进入到config目录下,修改server.properties,修改内容如下:
# host1机器
broker.id=0
listeners=PLAINTEXT://host1:9092
advertised.listeners=PLAINTEXT://host1:9092
zookeeper.connect=host1:2181,host2:2181,host3:2181
# host2机器
broker.id=1
listeners=PLAINTEXT://host2:9092
advertised.listeners=PLAINTEXT://host2:9092
zookeeper.connect=host1:2181,host2:2181,host3:2181
# host3机器
broker.id=2
listeners=PLAINTEXT://host3:9092
advertised.listeners=PLAINTEXT://host3:9092
zookeeper.connect=host1:2181,host2:2181,host3:2181
修改producer.properties文件,内容如下:
bootstrap.servers=host1:9092,host2:9092,host3:9092
修改consumer.properties文件,内容如下:
group.id=mygroup
bootstrap.servers=host1:9092,host2:9092,host3:9092
启动
# 前台启动,关闭使用Ctrl+C组合键
./bin/kafka-server-start.sh ./config/server.properties
# 后台启动
./bin/kafka-server-start.sh -daemon ./config/server.properties
关闭
./bin/kafka-server-stop.sh
topic
查看topic
./bin/kafka-topics.sh --list --zookeeper host1:2181,host2:2181,host3:2181
创建topic
./bin/kafka-topics.sh --create --zookeeper host1:2181,host2:2181,host3:2181 --replication-factor 2 --partitions 1 --topic mytopic
- --replication-factor 2 # 复制两份
- --partitions 1 # 创建1个分区
查看topic状态
./bin/kafka-topics.sh --describe --zookeeper host1:2181,host2:2181,host3:2181 --topic mytopic
删除topic
./bin/kafka-topics.sh --delete --zookeeper host1:2181,host2:2181,host3:2181 --topic mytopic
consumer
订阅消息
# 旧的订阅消息方式采用--zookeeper
./bin/kafka-console-consumer.sh --zookeeper host1:2181,host2:2181,host3:2181 --topic mytopic
./bin/kafka-console-consumer.sh --zookeeper host1:2181,host2:2181,host3:2181 --topic mytopic --from-beginning
# 新的订阅方式采用--bootstrap-server
./bin/kafka-console-consumer.sh --bootstrap-server host1:9092,host2:9092,host3:9092 --topic mytopic
./bin/kafka-console-consumer.sh --bootstrap-server host1:9092,host2:9092,host3:9092 --topic mytopic --from-beginning
producer
发布消息
./bin/kafka-console-producer.sh --broker-list host1:9092,host2:9092,host3:9092 --topic mytopic