单点内部机制
- broker内部进程接收数据,是顺序追加的append,最终持久化到磁盘,磁盘顺序读写的效率要高于随机读写的速度
- 持久化到磁盘的数据有两种数据结构,一种是data,一种是index(offset 和 timestamp) 详见:Kafka的Offset、Index(三)
- 写磁盘的粒度越小,则速度越慢,反之越快
- 把单机 的持久化可靠性转向集群多机方式
- consumer pull数据时,分为步骤1,先请求内核;步骤2,内核将请求转发到broker进程(知道了offset及具体的数据);步骤3,根据索引数据就可以知道具体的数据位置,调用内核的sendfile(in,offset,out); 步骤4,发送给consumer(数据来源与pagecache如果有,如果pagecache没有,则由磁盘中构建,再由sendfile发送出去)
ACK机制
Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡。
分区可靠性机制
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
- 1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
- 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
- -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况。
如果要提高数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合,这样才能发挥最大的功效。min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认值为1,当且仅当request.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
-
ack=-1的场景
- ack=-1的场景,follower与leader之间维持“心跳”,默认阈值是10s,超出阈值则认为其下线了,数据拉取时,该下线的副本中不会有后续的数据append。
同步(Kafka默认为同步,即producer.type=sync)的发送模式,replication.factor>=2且min.insync.replicas>=2的情况下,不会丢失数据。
有两种典型情况。acks=-1的情况下(如无特殊说明,以下acks都表示为参数request.required.acks),数据发送到leader, ISR的follower全部完成数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会丢失。
acks=-1的情况下,数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复。这需要HW协同处理,详见HW小节
当然上图中如果在leader crash的时候,follower2还没有同步到任何数据,而且follower2被选举为新的leader的话,这样消息就不会重复。
-
ack=1的场景
- ack=1的场景,分区leader副本已经成功写入数据时,则相应生产者成功消息。此时follower副本还来得及同步新增的数据,如上图中的数据5和6,在follower副本中没有,就在这时consumer开始拉取数据,只能获取follower副本中的数据,即木桶短板效应(HW)详见图ack=1的场景_1.png
- LEO,LogEndOffset的缩写,表示每个partition的log最后一条Message的位置
- HW是HighWatermark的缩写,是指consumer能够看到的此partition的位置
注:Kafka只处理fail/recover问题,不处理Byzantine(拜占庭)问题。
HW探讨
考虑上图ack=-1场景场景演示_3.png(即acks=-1,部分ISR副本同步)中的另一种情况,如果在Leader挂掉的时候,follower1同步了消息4,5,follower2同步了消息4,与此同时follower2被选举为leader,那么此时follower1中的多出的消息5该做如何处理呢?
这里就需要HW的协同配合了。如前所述,一个partition中的ISR列表中,leader的HW是所有ISR列表里副本中最小的那个的LEO。类似于木桶原理,水位取决于最低那块短板。
如上图,某个topic的某partition有三个副本,分别为A、B、C。A作为leader肯定是LEO最高,B紧随其后,C机器由于配置比较低,网络比较差,故而同步最慢。这个时候A机器宕机,这时候如果B成为leader,假如没有HW,在A重新恢复之后会做同步(makeFollower)操作,在宕机时log文件之后直接做追加操作,而假如B的LEO已经达到了A的LEO,会产生数据不一致的情况,所以使用HW来避免这种情况。A在做同步操作的时候,先将log文件截断到之前自己的HW的位置,即3,之后再从B中拉取消息进行同步。
如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息。leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。
当ISR中的个副本的LEO不一致时,如果此时leader挂掉,选举新的leader时并不是按照LEO的高低进行选举,而是按照ISR中的顺序选举。至于选举就自行度娘吧
消息传输机制
接下来讨论的是Kafka如何确保消息在producer和consumer之间传输。有以下三种可能的传输保障(delivery guarantee):
- At most once: 消息可能会丢,但绝不会重复传输
- At least once:消息绝不会丢,但可能会重复传输
- Exactly once:每条消息肯定会被传输一次且仅传输一次
Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中,所以目前Kafka实现的是at least once。
consumer从broker中读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然也可以将consumer设置为autocommit,即consumer一旦读取到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了exactly once, 但是如果由于前面producer与broker之间的某种原因导致消息的重复,那么这里就是at least once。
考虑这样一种情况,当consumer读完消息之后先commit再处理消息,在这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于at most once了。
读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于at least once。
要做到exactly once就需要引入消息去重
机制。
消息去重
如上一节所述,Kafka在producer端和consumer端都会出现消息的重复,这就需要去重处理。
Kafka文档中提及GUID(Globally Unique Identifier)的概念,通过客户端生成算法得到每个消息的unique id,同时可映射至broker上存储的地址,即通过GUID便可查询提取消息内容,也便于发送方的幂等性保证,需要在broker上提供此去重处理模块,目前版本尚不支持。
针对GUID, 如果从客户端的角度去重,那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小难以界定。
不只是Kafka, 类似RabbitMQ以及RocketMQ这类商业级中间件也只保障at least once, 且也无法从自身去进行消息去重。所以我们建议业务方根据自身的业务特点进行去重,比如业务消息本身具备幂等性,或者借助Redis等其他产品进行去重处理。
一致性
1,强一致性,所有节点必须全部存活,一致性破坏可用性
2-1,最终一致性,[网络到分布式],过半通过
,最常用的分布一致性解决方案
2-2,
ISR(in-sync replicas),连通性&活跃性
OSR(outof-sync replicas),超过阈值时间(10秒),没有"心跳"
AR(Assigned replicas),面向分区的副本集合,创建topic的时候给出分区的副本数,那么controller在创建的时候就已经分配了broker和分区的对应关系,并得到了该分区的broker集合
AR=ISR+OSR
总结
另外一个trade off:
不要强调单机磁盘的可靠性,转向异地多机的同步如果拿磁盘做持久化,在做trade off
优先pagecache或者绝对磁盘在多机集群分布式的时候
tradeoff
强一致,最终一致性(过半,ISR)
———————————————————
坐标帝都,白天上班族,晚上是知识的分享者
如果读完觉得有收获的话,欢迎点赞加关注