一、前言
1.Kafka简介
Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统。
Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。
1.1 消息队列的两种模式
-
点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 消息生产者生产消息发送到 Queue 中,然后消息消费者从 Queue 中取出并且消费消息。
消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
-
发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消
息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。备注:kafka储存消息数据在磁盘,默认存储7天
2、Kafka架构
- 1)Producer :消息生产者,就是向 kafka broker 发消息的客户端;
- 2)Consumer :消息消费者,向 kafka broker 取消息的客户端;
- 3)Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负 责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所 有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
生产者数量大于主题分区数时,资源是浪费的,生产者将被空置;同一个消费群组中,每个分区只会有一个消费者,而每个消费者可以消费多个分区。所以5个分区最多有5个消费者在消费,多余的消费者将空闲。
- 4)Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
- 5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
topic目的为了将消息数据分类
- 6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
partition 同一个主题分布在不同的服务上生产消息时提高某一个topic的负载均衡和并发能力
- 7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower。
备份数据在不同机器上
- 8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对 象都是 leader。
- 9)follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 follower。
在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
概念/对象 | 简单说明 |
---|---|
Broker | Kafka节点 |
Topic | 主题,用来承载消息 |
Partition | 分区,用于主题分片存储 |
Producer | 生产者,向主题发布消息的应用 |
Consumer | 消费者,从主题订阅消息的应用 |
Consumer Group | 消费者组,由多个消费者组成 |
3、准备工作
1、Kafka服务器
准备3台CentOS服务器,并配置好静态IP、主机名
服务器名 | IP | 说明 |
---|---|---|
default | 192.168.33.10 | Kafka节点1 |
web | 192.168.33.11 | Kafka节点2 |
node | 192.168.88.12 | Kafka节点3 |
软件版本说明
项 | 说明 |
---|---|
Linux Server | CentOS 7 |
Kafka | 2.3.0 |
2、ZooKeeper集群
Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK
服务器名 | IP | 说明 |
---|---|---|
default | 192.168.88.21 | ZooKeeper节点 |
web | 192.168.88.22 | ZooKeeper节点 |
node | 192.168.88.23 | ZooKeeper节点 |
部署过程参考:本博客《ZooKeeper集群部署》
二、部署过程
1、应用&数据目录
#创建应用目录
mkdir -p /usr/local/kafka
#创建Kafka数据目录
mkdir -p /usr/local/kafka/logs
2、下载&解压
Kafka官方下载地址:https://kafka.apache.org/downloads
下载的是2.3.0版本:kafka_2.12-2.3.0
#创建并进入下载目录
mkdir ~/downloads
cd ~/downloads
#下载安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#解压到应用目录
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/local/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala编译器的版本,2.3.0才是Kafka的版本
3、Kafka节点配置
#进入应用目录
cd /usr/local/kafka/kafka_2.12-2.3.0/
#修改配置文件
vi config/server.properties
通用配置
配置日志目录、指定ZooKeeper服务器
# A comma separated list of directories under which to store log files
#kafka 运行日志存放的路径
log.dirs=/usr/local/kafka/logs
# root directory for all kafka znodes.
# 配置连接 Zookeeper 集群地址
zookeeper.connect=192.168.33.10:2181,192.168.33.11:2181,192.168.33.12:2181
分节点配置
- default
#broker 的全局唯一编号,不能重复
broker.id=0
- web
broker.id=1
- node
broker.id=2
4、防火墙配置
#开放端口
firewall-cmd --add-port=9092/tcp --permanent
#重新加载防火墙配置
firewall-cmd --reload
5、启动Kafka
#进入kafka根目录
cd /usr/local/kafka/kafka_2.12-2.3.0
#启动
sh bin/kafka-server-start.sh config/server.properties
#启动成功输出示例(最后几行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
# 后台启动
sh bin/kafka-server-start.sh -daemon config/server.properties
启动集群
依次在 default、web、node 节点上启动 kafka
三、Kafka测试
1、创建Topic
在kafka01(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区
sh bin/kafka-topics.sh --create --zookeeper 192.168.33.10:2181 --replication-factor 3 --partitions 1 --topic test-ken-io
选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
[root@localhost kafka_2.12-2.3.0]# sh bin/kafka-topics.sh --create --zookeeper 192.168.33.10:2181 --replication-factor 4 --partitions 2 --topic safe
Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2020-08-02 23:29:49,737] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
备注:本身集群只有3个brokers,创建副本replication-factor时不能创建超过brokers,同一台机器会导致出现相同的副本集没有意义。
Topic在default上创建后也会同步到集群中另外两个Broker:web、node
2、查看Topic
我们可以通过命令列出指定Broker的
sh bin/kafka-topics.sh --zookeeper 192.168.33.10:2181 --list
3、发送消息
这里我们向Broker(id=0)的Topic=test-ken-io发送消息
sh bin/kafka-console-producer.sh --broker- list 192.168.33.10:9092 --topic test-ken-io
#消息内容
> test by ken.io
4、消费消息
在Kafka02上消费Broker03的消息
sh bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
在Kafka03上消费Broker02的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
然后均能收到消息
test by ken.io
这是因为这两个消费消息的命令是建立了两个不同的Consumer
如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken
四、备注
1、Kafka常用配置项说明
Kafka常用Broker配置说明:
配置项 | 默认值/示例值 | 说明 |
---|---|---|
broker.id | 0 | Broker唯一标识 |
listeners | PLAINTEXT://192.168.88.53:9092 | 监听信息,PLAINTEXT表示明文传输 |
log.dirs | kafka/logs | kafka数据存放地址,可以填写多个。用”,”间隔 |
message.max.bytes | message.max.bytes | 单个消息长度限制,单位是字节 |
num.partitions | 1 | 默认分区数 |
log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 24 | 控制一个log保留时间,单位:小时 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服务器地址,多台用”,”间隔 |