概述
由于现有生产环境迁移到BClinux需要重新部署kafka集群,正好借此梳理一下kafka相关的知识点,以及部署流程。后续使用中的问题也会在文档后续记录
kafka诞生
- 背景:kafka的诞生是来自于LinkedIn(领英)公司面临数据集成的问题,因为其使用的多个系统都有自己的数据表示和传输方式。
- 解决的问题:为了解决实时数据流的处理问题,LinkedIn内部开发了Kafka。它的设计目标是提供一个高吞吐量、分布式、水平扩展和容错的系统。主要设计者包括Jay Kreps、Neha Narkhede和Jun Rao。
- 开源: 2011年,Kafka被捐献给Apache软件基金会并开源,受到开发者社区的关注并被众多大企业采纳。Kafka从一个消息队列发展为一个包含流处理库的完整流处理平台,使其在流数据处理领域中占据了主导地位。
- 商业版本: Confluent Platform ,Cloudera kafka , Hartonworks kafka
kafka 的特性
在kafka之前还使用ActiveMQ作为消息队列,ActiveMQ支持多种协议比如JMS、AMQP、MQTT等,不过ActiveMQ在吞吐量,延迟,持久性和可靠性上都不如kafka,下面是kafka的具体特性
- 高吞吐量:Kafka能够迅速处理大量消息,每秒可以处理数百万条,适合大数据场景。
- 可扩展性:随着业务的增长,Kafka允许你简单地添加更多的服务器到集群中,应对更大的负载。
- 持久性和可靠性:消息在Kafka中是持久化的,即使多个broker节点失败,消息也不会丢失。Kafka利用分布式和副本机制来增加数据的持久性和可靠性。
- 实时性:Kafka支持近实时的数据处理,使得消费者更快的获取数据。
- 分布式:Kafka设计为分布式结构,这意味着它在多台服务器上运行,确保数据的高可用性。
- 多消费者:多个应用或服务可以同时从Kafka的同一个topic读取数据,且互不干扰。
- 持久化存储:Kafka将数据持久化到磁盘,即使系统产生大量数据,也能保持长时间的稳定性能。
- 内建流处理:借助Kafka Streams 自身提供流数据处理功能,不依赖其他系统。
- 强顺序保证:在Kafka的每个partition数据分区内,消息都是有序的。
kafka相关概念
Kafka 主要支持两种消息传递模型
- 发布订阅模式 每个消费者都属于不同的消费者组,每个分区都可以被不同的消费者消费
- 点对点模式 所有的消费者都在一个消费者组里面,这样每个分区每次都只能被一个消费者消费
消息传递语义
- 最多一次:消息可能会丢失,永远不重复发送
- 最少一次:消息不会丢失,但是可能会重复
- 精确一次:保证消息被传递到服务端且在服务端不重复
topic主题
- Kafka中的主题是一个消息流的分类。比如说可以创建一个叫做"成绩"的主题来保存成绩数据。
- 主题在Kafka中是如何将消息进行分区和复制的关键。每个主题都可以被分割成多个分区,每个分区可以有多个副本。
- 生产者发布消息到特定的主题,而消费者从主题中订阅并读取消息。
broker 消息代理
- Kafka的服务节点被称为broker。
- broker负责维护发布到系统的数据。每个broker可以保存数据并为客户端提供读写服务。
- 在一个Kafka集群中,可能会有一个或多个brokers。让Kafka系统具有高容错性和高吞吐量。
partition分区
- 分区是最小的并行单位
- 个消费者可以消费多个分区,比如消费者1可以消费分区1和分区3
-
一个分区可以被多个消费者组里的消费者消费但是,一个分区不能同时被同一个消费者里的多个消费者消费,比如同在一个消费者组1里面的,消费者1和消费者2不能消费同一个分区
-
生产者在往同一分区发送消息时候,先发送的offset比后发送的offset小,消费者消费时会先消费offset小的消息。消费者按照分区里的存放顺序进行消费,分区内的消息顺序读取,但是不通分区的顺序不能保证
-
如果要保证所有消息的顺序有两种方法
- 可以设置一个分区但是会丢失扩展性和性能
- 支持通过设置相同的key,相同的key的消息会发送给同一个分区
record 消息记录
- 在Kafka中,消息是字节的数组,每个消息都被称为一个记录。
- 每个记录都包含一个key和一个value,都是字节的数组。key是可设置的。
- 记录还包含一个与之关联的时间戳。
kafka的基本原理
kafka 部署
下载:下载的时候页面提醒3.5.1是stable稳定版,这里我们就使用3.5.1即可,下载Source download,也就是源码包。因为kafka官网并不提供rpm包,为了后续的集群部署和维护我通常选择将源码构建成rpm包,具体构建步骤在我其他的文档里有写
-
网址:https://kafka.apache.org/downloads
创建Kafka账户与kafka安装目录
在三台服务器(10.10.1.7、10.10.1.8、10.10.1.9)上执行以下操作:
sudo useradd kafka
sudo passwd kafka
mkdir /opt/kafka
chown -R kafka:kafka /opt/kafka
在每台服务器上安装Kafka rpm包
rpm -ivh /opt/kafka/kafka.rpm
kafka主要有以下几个目录
bin #常用shell脚本
config #配置文件
libs #依赖包
logs #日志
site-docs #文档
配置Kafka
- 打开server.properties文件:
vim /etc/kafka/server.properties
更改以下事项
broker.id=0 #在每台服务器上设置不同的broker.id
port=9092
host.name=10.10.1.7 #新增,不同机器配置成自己ip
log.dirs=/opt/kafka-logs #日志位置自定义保证有权限就可
listeners=PLAINTEXT://10.10.1.7:9092 #监听器 指定本机的监听名称和端口
advertised.listeners=PLAINTEXT://10.10.1.7:9092 #对外发布的访问IP和端口,给客户端和zookeeper使用,如果存在内外网需要单独配置INTERNAL和EXTERNAL,这里不细讲了
zookeeper.connect=10.10.1.2:2181,10.10.1.3:2181,10.10.1.4:2181 #zookeeper集群地址
启动Kafka
在三台服务器上分别运行:
cd /opt/kafka/bin/
./kafka-server-start.sh ../config/server.properties #指定配置文件
验证Kafka部署
- 创建一个测试Topic:
[dccp_alarm@xz-csywgzt-yyfwq-4 bin]$ ./kafka-topics.sh --create --topic test --replication-factor 3 --partitions 3 --bootstrap-server 10.10.1.7:9092
Created topic test.
--replication-factor 3 ## 分区数量3
--partitions 3 ## 副本数量2
--zookeeper ## zookeeper地址 配置文件写了 可以不指定
--bootstrap-server 10.10.1.7:9092 ##指定至少一个Kafka broker的地址和端口
- 列出所有的Topic,确认你的Topic已创建:
./kafka-topics.sh --describe --topic test --bootstrap-server 10.10.1.7:9092
--describe 查看状态
Topic: test TopicId: mBvq8ul6QV68mtvDU5KpaA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
# PartitionCount: 3 分区数量3
# ReplicationFactor: 3 副本数量是3
# Partition: 0 分区0
# Leader: 1 leader是1
# Replicas: 1,0,2 三个副本分别是1,0,2
#Isr: 1,0,2 Isr表示同步状态正常的副本
创建生产者给消费者发送消息测试
#创建生产者,出现 > 后发送消息
./kafka-console-producer.sh --broker-list 10.241.106.57:9092 --topic test
#创建消费者 查看能否接受到消息
./kafka-console-consumer.sh --bootstrap-server 10.241.106.57:9092 --topic test
消费者端口可以正常接受消息即为正常
启动Kafka服务自动化
为了确保在服务器启动时Kafka自动运行,可以在每台服务器上运行:
sudo systemctl enable kafka
sudo systemctl start kafka
后续问题更新
。。。。。。