应用往Kafka写数据的原因有很多:用户行为分析、日志存储、异步通信等。多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量?消息的延迟?
kafka介绍
Kafka属于Apache组织,是一个高性能跨语言分布式发布订阅消息队列系统[7]。它的主要特点有:
以时间复杂度O(1)的方式提供消息持久化能力,并对大数据量能保证常数时间的访问性能;
高吞吐率,单台服务器可以达到每秒几十万的吞吐速率;
支持服务器间的消息分区,支持分布式消费,同时保证了每个分区内的消息顺序;
轻量级,支持实时数据处理和离线数据处理两种方式。
1.1. 主要功能
根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:
1:发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因
2:以容错的方式记录消息流,kafka以文件的方式来存储消息流
3:可以在消息发布的时候进行处理
1.2. 使用场景
1:在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能
2:构建实时的流数据处理程序来变换或处理数据流,数据处理功能
kafka生产者
首先,创建ProducerRecord必须包含Topic和Value,key和partition可选。然后,序列化key和value对象为ByteArray,并发送到网络。
接下来,消息发送到partitioner。如果创建ProducerRecord时指定了partition,此时partitioner啥也不用做,简单的返回指定的partition即可。如果未指定partition,partitioner会基于ProducerRecord的key生成partition。producer选择好partition后,增加record到对应topic和partition的batch record。最后,专有线程负责发送batch record到合适的Kafka broker。
当broker收到消息时,它会返回一个应答(response)。如果消息成功写入Kafka,broker将返回RecordMetadata对象(包含topic,partition和offset);相反,broker将返回error。这时producer收到error会尝试重试发送消息几次,直到producer返回error。
实例化producer后,接着发送消息。这里主要有3种发送消息的方法:
立即发送:只管发送消息到server端,不care消息是否成功发送。大部分情况下,这种发送方式会成功,因为Kafka自身具有高可用性,producer会自动重试;但有时也会丢失消息;
同步发送:通过send()方法发送消息,并返回Future对象。get()方法会等待Future对象,看send()方法是否成功;
异步发送:通过带有回调函数的send()方法发送消息,当producer收到Kafka broker的response会触发回调函数
以上所有情况,一定要时刻考虑发送消息可能会失败,想清楚如何去处理异常。
通常我们是一个producer起一个线程开始发送消息。为了优化producer的性能,一般会有下面几种方式:单个producer起多个线程发送消息;使用多个producer。
kafka消费者
kafka的消费模式总共有3种:最多一次,最少一次,正好一次。为什么会有这3种模式,是因为客户端处理消息,提交反馈(commit)这两个动作不是原子性。
1.最多一次:客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。
2.最少一次:客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。
3.正好一次:保证消息处理和提交反馈在同一个事务中,即有原子性。
本文从这几个点出发,详细阐述了如何实现以上三种方式。
At-most-once(最多一次)
设置enable.auto.commit为ture
设置 auto.commit.interval.ms为一个较小的时间间隔.
client不要调用commitSync(),kafka在特定的时间间隔内自动提交。
At-least-once(最少一次)
方法一
设置enable.auto.commit为false
client调用commitSync(),增加消息偏移;
方法二
设置enable.auto.commit为ture
设置 auto.commit.interval.ms为一个较大的时间间隔.
client调用commitSync(),增加消息偏移;
Exactly-once(正好一次)
3.1 思路
如果要实现这种方式,必须自己控制消息的offset,自己记录一下当前的offset,对消息的处理和offset的移动必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到mysql数据库同时更新此时的消息的偏移。
3.2 实现
设置enable.auto.commit为false
保存ConsumerRecord中的offset到数据库
当partition分区发生变化的时候需要rebalance,有以下几个事件会触发分区变化
1 consumer订阅的topic中的分区大小发生变化
2 topic被创建或者被删除
3 consuer所在group中有个成员挂了
4 新的consumer通过调用join加入了group
此时 consumer通过实现ConsumerRebalanceListener接口,捕捉这些事件,对偏移量进行处理。
consumer通过调用seek(TopicPartition, long)方法,移动到指定的分区的偏移位置。
Broker
Kafka是一个高吞吐量分布式消息系统,采用Scala和Java语言编写,它提供了快速、可扩展的、分布式、分区的和可复制的日志订阅服务。它由Producer、Broker、Consumer三部分构成.
Producer向某个Topic发布消息,而Consumer订阅某个Topic的消息。 一旦有某个Topic新产生的消息,Broker会传递给订阅它的所有Consumer,每个Topic分为多个分区,这样的设计有利于管理数据和负载均衡。
Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
Controller:中央控制器Control,负责管理分区和副本状态并执行管理着这些分区的重新分配。(里面涉及到partition leader 选举)
ISR:同步副本组
Topic
在Kafka中,消息是按Topic组织的.
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
Segment:partition物理上由多个segment组成
offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中. partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.
topic中partition存储分布
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
cleaner-offset-checkpoint:存了每个log的最后清理offset
meta.properties: broker.id 信息
recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷 到磁盘上的了。
replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)
partiton中文件存储方式
每个partion(目录)由多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
partiton中segment文件存储结构
partion中segment file组成和物理结构。
segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
以一对segment file文件为例,说明segment中index<—->data file对应关系物理结构如下
Index文件存储大量元数据,指向对应log文件中message的物理偏移地址。
log数据文件存储大量消息
其中以Index文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。
下面看看segment data file的内部
segment data file由许多message组成,下面详细说明message物理结构如下:
2.4 在partition中如何通过offset查找message
例如读取offset=368776的message,需要通过下面2个步骤查找。
第一步查找segment file
上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message
通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置(这个较小,可以放在内存中,直接操作)和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log 顺序查找 直到offset=368776为止。
从上述图2.3节可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
2.5 读写message总结
写message
消息从java堆转入page cache(即物理内存)。
由异步线程刷盘,消息从page cache刷入磁盘。
读message
消息直接从page cache转入socket发送出去。
当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁
盘Load消息到page cache,然后直接从socket发出去
Kafka高效文件存储设计特点
topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
通过索引信息可以快速定位message和确定response的最大大小。
通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
Kafka消费者
消费组与分区重平衡
可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。
消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。
如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
1.3. 详细介绍
Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制
1.3.1 消息传输流程
Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。
Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息
Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。
从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。
1.3.2 kafka服务器消息存储策略
谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。
kafka中的message以topic的形式存在,topic在物理上又分为很多的partition,partition物理上由很多segment组成,segment是存放message的真正载体。
下面具体介绍下segment文件:
(1) 每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
(2) 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
(3) segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
(4) segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
segment中index<—->data file对应关系物理结构如下:
index与log映射关系
.index文件存放的是message逻辑相对偏移量(相对offset=绝对offset-base offset)与在相应的.log文件中的物理位置(position)。但.index并不是为每条message都指定到物理位置的映射,而是以entry为单位,每条entry可以指定连续n条消息的物理位置映射(例如:假设有20000~20009共10条消息,.index文件可配置为每条entry
指定连续10条消息的物理位置映射,该例中,index entry会记录偏移量为20000的消息到其物理文件位置,一旦该条消息被定位,20001~20009可以很快查到。)。每个entry大小8字节,前4个字节是这个message相对于该log segment第一个消息offset(base offset)的相对偏移量,后4个字节是这个消息在log文件中的物理位置。
1.3.3 与生产者的交互
生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中
也可以通过指定均衡策略来将消息发送到不同的分区中
如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中
1.3.4 与消费者的交互
在消费者消费消息时,kafka使用offset来记录当前消费的位置
在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。
对于一个group而言,消费者的数量不应该多于分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费
因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。
Kafka安装与使用
1: 下载:
sudo wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar -xzf kafka_2.12-2.6.0.tgz
cd kafka_2.11-1.0.0
kafka运行需要java环境:
sudo apt-get install openjdk-8-jdk
2: 运行:
cd进入kafka解压目录,启动zookeeper,输入:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka服务:
bin/kafka-server-start.sh config/server.properties
3:创建一个topic
Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷, 在kafka解压目录打开终端,输入
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
在创建topic后可以通过输入
bin/kafka-topics.sh --list --zookeeper localhost:2181
来查看已经创建的topic
4:创建一个消息消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
5: 创建一个消息生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
用回车(Enter)发送消息
6: 设置多个代理集群
到目前为止,我们一直在使用单个代理,这并不好玩。对 Kafka来说,单个代理只是一个大小为一的集群,除了启动更多的代理实例外,没有什么变化。 为了深入了解它,让我们把集群扩展到三个节点(仍然在本地机器上)。
首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):
1> cp config/server.properties config/server-1.properties
2> cp config/server.properties config/server-2.properties
现在编辑这些新文件并设置如下属性:
1 config/server-1.properties:
2 broker.id=1
3 listeners=PLAINTEXT://:9093
4 log.dir=/tmp/kafka-logs-1
6 config/server-2.properties:
7 broker.id=2
8 listeners=PLAINTEXT://:9094
9 log.dir=/tmp/kafka-logs-2
broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。
我们已经建立Zookeeper和一个单节点了,现在我们只需要启动两个新的节点:
1 > bin/kafka-server-start.sh config/server-1.properties &
2 ...
3 > bin/kafka-server-start.sh config/server-2.properties &
4 ...
现在创建一个副本为3的新topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Good,现在我们有一个集群,但是我们怎么才能知道那些代理在做什么呢?运行"describe topics"命令来查看:
1 > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
2 Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
3 Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。
“leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
“replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
“isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。
请注意,在示例中,节点1是该主题中唯一分区的领导者。
我们可以在已创建的原始主题上运行相同的命令来查看它的位置:
1 > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
2 Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
3 Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
这没什么大不了,原来的主题没有副本且在服务器0上。我们创建集群时,这是唯一的服务器。
让我们发表一些信息给我们的新topic:
1 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
2 ...
3 my test message 1
4 my test message 2
5 ^C
现在我们来消费这些消息:
1 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
2 ...
3 my test message 1
4 my test message 2
5 ^C
让我们来测试一下容错性。 Broker 1 现在是 leader,让我们来杀了它:
1 > ps aux | grep server-1.properties
2 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
3 > kill -9 7564
在 Windows 上用:
1 > wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
2 ProcessId
3 6016
4 > taskkill /pid 6016 /f
领导权已经切换到一个从属节点,而且节点1也不在同步副本集中了:
1 > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
2 Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
3 Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
不过,即便原先写入消息的leader已经不在,这些消息仍可用于消费:
1 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
2 ...
3 my test message 1
4 my test message 2
5 ^C
python操作kafka
参考文档: https://kafka-python.readthedocs.io/en/master/usage.html, https://zhuanlan.zhihu.com/p/38330574
1 安装:
pip3 install kafka-python
消费者:
from kafka import KafkaConsumer
from kafka import TopicPartition
import json
consumer = KafkaConsumer('my_topic', bootstrap_servers=['127.0.0.1:9092'])
consumer = KafkaConsumer('my_topic', group_id="group2", bootstrap_servers=['127.0.0.1:9092'])
第1个参数为 topic的名称
group_id : 指定此消费者实例属于的组名,可以不指定
bootstrap_servers : 指定kafka服务器
auto_offset_reset='earliest':读取目前最早可读的消息
consumer.seek(TopicPartition(topic='test', partition=0), 5) #重置偏移量,从第5个偏移量消费
consumer = KafkaConsumer(group_id="group2", bootstrap_servers=['127.0.0.1:9092'], consumer_timeout_ms=1000)
consumer.assign([TopicPartition(topic='my_topic', partition=0)])
print(type(consumer))
若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
consumer_timeout_ms : 毫秒数
手动指定分区:consumer.assign([TopicPartition(topic='my_topic', partition=0)])
订阅多个topic, consumer.subscribe(topics=[])
consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=['my_topic', 'topic_1'])
# 或者使用正则订阅一类的topic
consumer.subscribe(pattern='^topic.*')
print(len(consumer))
for msg in consumer:
print(msg)
生产者:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
future = producer.send('topic_1', key='my_key', value='hello, python', partition=0)
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
# Successful result returns assigned partition and offset
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
producer.send函数为发送消息
第1个参数为 topic名称,必须指定
key : 键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
value : 值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
partition : 指定发送的partition,由于kafka默认配置1个partition,固为0
future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替
转成json格式发送
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
future = producer.send('topic_1', value= {'my_key' : 'hello, python'}, partition=0)
压缩发送消息 compression_type='gzip'
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'], compression_type='gzip')
future = producer.send('topic_1', key=b'my_key', value=b'hello, python', partition=0)
生产者运行结果:
my_topic
0
6
消费者运行结果:
ConsumerRecord(topic='my_topic', partition=0, offset=6, timestamp=1604989500586, timestamp_type=0, key=b'my_key', value=b'my_value', headers=[], checksum=None, serialized_key_size=6, serialized_value_size=8, serialized_header_size=-1)
topic
partition
offset : 这条消息的偏移量
timestamp : 时间戳
timestamp_type : 时间戳类型
key : key值,字节类型
value : value值,字节类型
checksum : 消息的校验和
serialized_key_size : 序列化key的大小
serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1
参考文档:
https://kafka.apachecn.org/
https://kafka-python.readthedocs.io/en/master/usage.html
https://www.pythonf.cn/read/115538
https://zhuanlan.zhihu.com/p/38330574