中间件:Kafka
关键字:Kafka文件机制,Kafka分区,Kafka数据可靠性,Kafka Ack等
注:本文是作者学习Kafka时得笔记和经验总结
Kafka特性总览:
- 消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。
-
Kafka的特性:
- 高吞吐量、低延迟 :kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka 分区扩展,consumer 扩展。
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
- 高并发:支持数千个客户端同时读写。
-
Kafka得使用场景:
- 日志收集:一个公司用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Solr,ES等。
- 消息系统:解耦生产者和消费者,降低系统压力、缓存消息等。
- 流式处理:比如Spark streaming和Storm
-
Kafka得设计思想:
-
Kafka的集群拓扑结构 :
-
Kakfa Broker Leader的选举:
① 👍Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)
② 👍这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,所有的broker又会重复去注册临时节点。
③ 👍leader会追踪和维护ISR中所有follower的滞后状态。如果滞后太多(数量滞后和时间滞后两个维度,replica.lag.time.max.ms和replica.lag.max.message可配置),leader会把该replica从ISR中移除 ( follower 在 replica.lag.time.max.ms 时间内一直落后 leader replica.lag.max.messages 条消息的时候才会被踢出)。被移除ISR的replica一直在追赶leader。leader写入数据后并不会commit,只有ISR列表中的所有folower同步之后才会commit,把滞后的follower移除ISR主要是避免写消息延迟。
④ 👍设置ISR主要是为了broker宕掉之后,重新选举partition的leader从ISR列表中选择。
⑤ 👍👍 所以,如果一个broker挂了,controller会读取该broker上所有得分区在zookeeper上状态,并选取ISR列表中得一个replica作为leader(如果ISR列表中的replica全挂,选一个不在ISR列表中的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中)作为Leader)。另外,controller还会通知 zookeeper 这个broker 宕机了,zookeeper会通知其他得broker。 - 👍Consumer Rebalance的触发条件 :
① Consumer增加或删除会触发 Consumer Group的Rebalance
② Broker的增加或者减少会触发 Consumer Rebalance(分区副本发生改变) - 👍Kafka工作流程-高级消费者和低级消费者:
① 高级消费:
Ⅰ 自动负载均衡,高阶消费者为了简化编程,封装了一系列 API,这套 API 会均匀地将分区分配给消费者 线程,消费者消费哪个分区不由消费者决定,而是由高阶 API 决定,如果有消费者线程挂 掉了,高阶 API 会检测到,进而进行重新分配。高阶消费者 API 将大部分功能已经实现, 因此,编程者编写高阶消费者的难度也随之降低,不需要关注分区的分配,只需要关注业务逻辑就行
Ⅱ 自动提交offset:自动提交时,假设 1s 提交一次 offset 的更新,设当前 offset=10,当消费者消费了 0.5s 的数据,offset 移动了 15,由于提交间隔为 1s,因此这一 offset 的更新并不会被提交,这时候我们写的消费者挂掉,重启后,消费者会去 ZooKeeper 上获取读取位置,获取到的 offset 仍为 10,它就会重复消费,这就是一个典型的重复消费问题。
② 低级消费:对于低阶消费者就不再有分区到消费者之间的 API 中间层了,由消费者直接找到分区进行消费,即消费者通过 ZooKeeper 找到指定分区的 Leader 在哪个 broker 上。首先,在 ZooKeeper 中能够找到 Kafka 所有 topic 的分区列表,并且可以找到指定分区的 Leader 在哪个 broker 上。 - 👍Topic & Partition:
① Topic相当于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行load balance,均匀的分布在这个topic下的不同的partition上( hash(message) % [broker数量] )。
② 物理存储上,这个topic会分成一个或多个partition,每个partiton相当于是一个子queue。在物理结构上,每个partition对应一个物理的目录(文件夹),文件夹命名是[topicname][partition][序号],一个topic可以有无数多的partition,根据业务需求和数据量来设置。
③ 在kafka配置文件中可随时更改num.partitions参数来配置更改topic的partition数量,也可以通过命令修改分区数。
④ 当add a new partition的时候,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance。 - 👍Partition Replica :
① 每个partition可以在其他的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。
② replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本 - 👍Topic分配partition和partition replica的算法 :
① 将Broker(size=n)和待分配的Partition排序。
② 将第 i 个Partition分配到第(i%n)个Broker上。
③ 将第 i 个Partition的第 j 个Replica分配到第((i + j) % n)个Broker上。
-
Kafka的集群拓扑结构 :
- 👍消息投递可靠性 :
-
Partition ack :producer 生产消息被认为完成时得确认值:
① 0:不等待broker得确认信息,延迟最小(消费可靠性不能保证)
② 1:leader收到了确认信息
③ -1:ISR列表中得所有replica都返回确认消息
-
Partition ack :producer 生产消息被认为完成时得确认值:
-
Kafka高吞吐量 :
- Kafka的高吞吐量体现在读写上,分布式并发的读和写都非常快,写的性能体现在以O(1)的时间复杂度进行顺序写入。读的性能体现在以O(1)的时间复杂度进行顺序读取,对topic进行partition分区,consume group中的consume线程可以以很高能性能进行顺序读。
-
批量发送 :
- Kafka支持以消息集合为单位进行批量发送,以提高push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
- Kafka集群中broker之间的关系 : 不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
- Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
-
顺序保证性 :
- Kafka只能保证同一分区得消息顺序性,并且需要消费端单线程消费。正常情况下,消费端会多线程处理业务逻辑来提供系统性能,若是多线程消费还想保证消息得顺序性,可以使用spring-kafka来实现,读者具体可自行查阅 spring-kafka得用法。
kafka文件机制:
-
Partition Owner registry :
- 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)。
- 👍持久化 :
- kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.
- 因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.
- 对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升。
-
性能 :
- 除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定. 对于kafka broker端,sendfile系统调用可以潜在的提升网络IO的性能 : 将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换( 这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy)。
- 其实对于producer/consumer/broker三者而言,CPU的开支应该都不大, 因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩. kafka支持gzip/snappy 等多种压缩方式.
- 👍负载均衡 :
- kafka集群中的任何一个broker,都可以向producer提供metadata信息, 这些metadata中包含"集群中存活的servers列表"、"partitions leader列表" 等信息(请参看zookeeper中的节点信息)。
- 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"。
- 异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
-
Topic模型 :
- 在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper提交offset.由此可见,consumer客户端也很轻量级。松弛得控制就会引起重复消费与消息丢失等问题。
- kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.
- 👍log :
- 每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容"; 每个日志都有一个offset来唯一的标记一条消息, offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置.
- 每个partition在物理存储层面由多个log file组成(称为segment) .segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
- 获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数). 根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.
-
Partition Owner registry :
- 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)。例如:当consumer启动时,所触发的操作 :
A) 首先进行"Consumer id Registry";
B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
👍总结👍 :
① Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
② Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.
③ Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。
-
Leader的选择 :
- 必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。
- 👍Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才会通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。
-
副本管理 :
- 👍Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡,这样每个节点都会担任一定比例的分区的leader.
- 优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在有用分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
-
Leader与副本同步 :
- 👍对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。
- Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
- 👍创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。
- 👍Kafka判断一个节点是否活着有两个条件 :
① 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
② 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。 - 👍只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。
- 👍👍partiton中文件存储方式 :
- 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
- 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
- 这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
- 👍👍partiton中segment文件存储结构 :
- producer发message到某个topic,message会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),kafka broker收到message往对应partition leader的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息consumer才能消费,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
- 每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。
- segment file组成:由2大部分组成,分别为index file和data file,此2个文件一 一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
- segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
- 每个segment中存储很多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
-
下图所示segment文件列表形象说明了上述2个规则:
关键字解释说明 :
① 8 byte offset在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
② 4 byte message sizemessage大小
③ 4 byte CRC32用crc32校验message
④ 1 byte “magic"表示本次发布Kafka服务程序协议版本号
⑤ 1 byte “attributes"表示为独立版本、或标识压缩类型、或编码类型。
⑥ 4 byte key length表示key的长度,当key为-1时,K byte key字段不填
⑦ K byte key可选
⑧ value bytes payload表示实际消息数据。 - 👍segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它 比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
8.👍kafka会记录offset到zk中。但是,zk client api对zk的频繁写入是一个低效的操作。0.8.2 kafka引入了native offset storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic与partion的组合作为key直接提交到compacted topic中。同时Kafka又在内存中维护了三元组来保存最新的offset信息,consumer来取最新offset信息的时候直接内存里拿即可。当然,kafka允许你快速的checkpoint最新的offset信息到磁盘上。
-
Partition Replication原则 :
-
Kafka高效文件存储设计特点 :
① Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
② 通过索引信息可以快速定位message和确定response的最大大小。
③ 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
④ 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
-
Kafka高效文件存储设计特点 :
- 👍高效的数据传输 :
- 发布者每次可发布多条消息(将消息加到一个消息集合中发布),consumer每次迭代消费一条消息。
- 不创建单独的cache,使用系统的page cache。发布者顺序发布,订阅者通常比发布者滞后一点点,直接使用Linux的page cache效果比较好,同时减少了cache管理及垃圾收集的开销。
- page cache中的每个文件都是一棵基数树(radix tree,本质上是多叉搜索树),树的每个节点都是一个页。根据文件内的偏移量就可以快速定位到所在的页。
-
Kafka为什么不自己管理缓存,而非要用page cache?原因有如下三点:
① JVM中一切皆对象,数据的对象存储会带来所谓object overhead,浪费空间;
② 如果由JVM来管理缓存,会受到GC的影响,并且过大的堆也会拖累GC的效率,降低吞吐量;
③ 一旦程序崩溃,自己管理的缓存数据会全部丢失。 - producer生产消息时,会使用pwrite()系统调用【对应到Java NIO中是FileChannel.write() API】按偏移量写入数据,并且都会先写入page cache里。
- consumer消费消息时,会使用sendfile()系统调用【对应FileChannel.transferTo() API】,零拷贝地将数据从page cache传输到broker的Socket buffer,再通过网络传输。
- 同时,page cache中的数据会随着内核中flusher线程的调度以及对sync()/fsync()的调用写回到磁盘,就算进程崩溃,也不用担心数据丢失。
kafka分区的原因:
- 指明partition的情况下,直接将指明的值直接作为partition的值。
- 没有指明partition值但有key的值,将key的hash值与toptic的partition数进行取余得到partition值。
- 既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
kafka高吞吐量的原因:
-
kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能,顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写。生产者负责写入数据,Kafka会将消息持久化到磁盘,保证不会丢失数据,Kafka采用了两个技术提高写入的速度:
- 顺序写入:硬盘是机械结构,需要指针寻址找到存储数据的位置,所以,如果是随机IO,磁盘会进行频繁的寻址,导致写入速度下降。Kafka使用了顺序IO提高了磁盘的写入速度,Kafka会将数据顺序插入到文件末尾,消费者端通过控制偏移量来读取消息,这样做会导致数据无法删除,时间一长,磁盘空间会满,kafka提供了2种策略来删除数据:基于时间删除和基于partition文件的大小删除。
- Memory Mapped Files:这个和Java NIO中的内存映射基本相同,,mmf直接利用操作系统的Page来实现文件到物理内存的映射,完成之后对物理内存的操作会直接同步到硬盘。mmf通过内存映射的方式大大提高了IO速率,省去了用户空间到内核空间的复制。它的缺点显而易见--不可靠,当发生宕机而数据未同步到硬盘时,数据会丢失,Kafka提供了produce.type参数来控制是否主动的进行刷新,如果kafka写入到mmp后立即flush再返回给生产者则为同步模式,反之为异步模式。
-
零拷贝:平时从服务器读取静态文件时,服务器先将文件从磁盘复制到内核空间,再复制到用户空间,最后再复制到内核空间并通过网卡发送出去,而零拷贝则是直接从内核到内核再到网卡,省去了用户空间的复制。
- Kafka把所有的消息存放到一个文件中,当消费者需要数据的时候直接将文件发送给消费者,比如10W的消息共10M,全部发送给消费者,10M的消息在内网中传输是非常快的,假如需要1s,那么kafka的tps就是10w。Zero copy对应的是Linux中sendfile函数,这个函数会接受一个offsize来确定从哪里开始读取。现实中,不可能将整个文件全部发给消费者,他通过消费者传递过来的偏移量来使用零拷贝读取指定内容的数据返回给消费者。
kafka消息丢失的原因:
- 由于Producer端设置消息发送ack=0,表示发送完成即可,不会关心broker的确认回复。
- 高阶消费的时候,由于消息消费是自动提交的方法,由于自动提交时间默认是1秒,消费端拉取的消息还没有处理完,但是消费进度offset已经更新了,此时如果消费端挂了,可能存在1秒内的数据丢失。
- ack=all的时候并不能保证数据不会丢失,acks=all必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。
kafka消息重复消费的原因:
- 由于使用高阶消费,自动提交过程,拉取的消息1秒内已经处理完,但是还没有触发提交的时间,此时消费进度offset没有更新,下次拉取时就会存在重复消费的请客。
👍kafka故障处理机制:
- Replica 消息图示:
- follower故障 : follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
- leader故障 : leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
- 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
kafka broker 宕机 :
- 只需要新启动一个broker, 把broker.id设置为 损坏的那个broker的id, 就会自动复制过去丢失的数据。
kafka常用命令:
查询 :
## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper
## topic列表查询
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
## 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --list --new-consumer --bootstrap-server localhost:9092
## 新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
发送和消费
## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
## 消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10
平衡Leader:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
kafka自带压测工具:
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
分区扩容 :
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
☛ 本文部分内容参见 美团技术团队
- ☛ 文章要是勘误或者知识点说的不正确,欢迎评论,毕竟这也是作者通过阅读源码获得的知识,难免会有疏忽!
- ☛ 要是感觉文章对你有所帮助,不妨点个关注,或者移驾看一下作者的其他文集,也都是干活多多哦,文章也在全力更新中。
- ☛ 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处!