在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
概念/对象 | 简单说明 |
---|---|
Broker | Kafka节点 |
Topic | 主题,用来承载消息 |
Partition | 分区,用于主题分片存储 |
Producer | 生产者,向主题发布消息的应用 |
Consumer | 消费者,从主题订阅消息的应用 |
Consumer Group | 消费者组,由多个消费者组成 |
准备工作
准备3台Debian服务器,并配置好静态IP、主机名
服务器名 | IP | 说明 |
---|---|---|
kafka01 | 10.200.14.48 | Kafka节点1 |
kafka02 | 10.200.14.55 | Kafka节点2 |
kafka03 | 10.200.103.44 | Kafka节点3 |
软件版本说明
项 | 说明 |
---|---|
Linux Server | Debian 9 |
Kafka | 2.3.0 |
ZooKeeper节点信息如下,相关部署见这篇文章
节点名 | IP | 说明 |
---|---|---|
zk01 | 10.200.14.48 | ZooKeeper节点 |
zk02 | 10.200.14.55 | ZooKeeper节点 |
zk03 | 10.200.103.44 | ZooKeeper节点 |
部署过程
注意:以下内容,如果没有特殊指定在哪个机器上操作,就是代表需要在所有的机器上操作
#创建应用目录
mkdir /usr/kafka
#创建Kafka数据目录
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka
#下载安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#解压到应用目录
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
注:kafka_2.12-2.3.0.tgz 其中2.12是Scala编译器的版本,2.3.0才是Kafka的版本
#进入应用目录
cd /usr/kafka/kafka_2.12-2.3.0/
#修改配置文件
vi config/server.properties
配置日志目录、指定ZooKeeper服务器
# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs
# root directory for all kafka znodes.
zookeeper.connect=10.200.14.48:2181,10.200.14.55:2181,10.200.103.44:2181
分节点配置
-
Kafka01 (IP:10.200.14.48)
broker.id=0 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.200.14.48:9092
-
Kafka02(IP:10.200.14.55)
broker.id=1 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.200.14.55:9092
-
Kafka03(IP:10.200.103.44)
broker.id=2 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.200.103.44:9092
防火墙设置
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
iptables-save
分别启动Kafka
#进入kafka根目录
cd /usr/kafka/kafka_2.12-2.3.0/
#启动
bin/kafka-server-start.sh config/server.properties &
#启动成功输出示例(最后几行)
[2020-12-25 17:07:24,959] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2020-12-25 17:07:24,964] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2020-12-25 17:07:24,965] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2020-12-25 17:07:24,965] INFO Kafka startTimeMs: 1608887244959 (org.apache.kafka.common.utils.AppInfoParser)
[2020-12-25 17:07:24,966] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Kafka集群测试
1. 创建Topic
在kafka01(Broker)上创建测试Tpoic:test-kafka
,这里我们指定了3个副本、1个分区
bin/kafka-topics.sh --create --bootstrap-server 10.200.14.48:9092 --replication-factor 3 --partitions 1 --topic test-kafka
Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka02、kafka03
2. 查看Topic
# 我们可以通过命令列出指定Broker的
bin/kafka-topics.sh --list --bootstrap-server 10.200.103.44:9092
3. 发送消息
这里我们向Broker(id=0)的Topic=test-kafka发送消息
bin/kafka-console-producer.sh --broker-list 10.200.14.48:9092 --topic test-kafka
#消息内容
> test message by 10.200.14.48
4. 消费消息
在Kafka02上消费Broker03的消息
bin/kafka-console-consumer.sh --bootstrap-server 10.200.14.55:9092 --topic test-kafka --from-beginning
在Kafka03上消费Broker02的消息
bin/kafka-console-consumer.sh --bootstrap-server 10.200.103.44:9092 --topic test-kafka --from-beginning
然后均能收到消息
test message by 10.200.14.48
这是因为这两个消费消息的命令是建立了两个不同的Consumer。如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,同一分区同一topic的消息同时只会被消费者组中的一个Consumer消费到
bin/kafka-console-consumer.sh --bootstrap-server 10.200.14.55:9092 --topic test-kafka --from-beginning --group test_group
bin/kafka-console-consumer.sh --bootstrap-server 10.200.103.44:9092 --topic test-kafka --from-beginning --group test_group
Kafka常用配置项说明
配置项 | 默认值/示例值 | 说明 |
---|---|---|
broker.id | 0 | Broker唯一标识 |
listeners | PLAINTEXT://192.168.88.53:9092 | 监听信息,PLAINTEXT表示明文传输 |
log.dirs | kafka/logs | kafka数据存放地址,可以填写多个。用”,”间隔 |
message.max.bytes | message.max.bytes | 单个消息长度限制,单位是字节 |
num.partitions | 1 | 默认分区数 |
log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 24 | 控制一个log保留时间,单位:小时 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服务器地址,多台用”,”间隔 |