kafka 0.11 之前的版本中使用high watermark实现每个分区多副本之间的数据同步,保证多副本之间数据一致性,不过不能保证数据丢失以及数据重复
名词解析
- leader:Kafka的topic每个分区包含一个leader副本和若干个followers副本,Kafka producer和Kafka consumer的请求由leader来响应处理
- follower:通过fetch的方式从leader处获取数据进行同步
- isr:leader以及符合replica.lag.time.max.ms参数要求的follower列表
- logEndOffset:保存当前副本的可写入数据的偏移量,logEndOffset从0开始计数,如当前副本有2个数据,则logEndOffset = 2
- highWatermark:1. leader通过其自身的highWatermark记录数据同步到所有isr的最慢进度,所有isr的highWatermark等于最慢的isr的highWatermark 2. 所有leader和followers在故障恢复后会把其自身的logEndOffset设为其自身的high watermark,进行数据截断
- remoteLogEndOffset:leader副本中保存的关于followers的logEndOffset
相关计算方式
leader.logEndOffset = leader.logEndOffset + n (leader收到producer的n条数据后写入到本地)
leader.highWatermark = min(leader.logEndOffset, leader.remoteLogEndOffset)
follower.logEndOffset = follower.logEndOffset + n (follower fetch到leader的n条数据后写入到本地)
follower.highWatermark = min(leader.highWatermark, follower.logEndOffset)
leader.remoteLogEndOffset = follower.logEndOffset
同步流程之一:正常同步
- 初始状态,leader.logEndOffset、leader.highWatermark、follower.logEndOffset、follower.highWatermark、leader.remoteLogEndOffset均相同,处于数据完全同步的状态,假设均为0
- leader收到n条数据,假设n > 0(若n == 0则不发生任何变化), leader将其按顺序写入本地,并进行以下计算:
leader.logEndOffset = 0 + n = n
leader.highWatermark = min(leader.logEndOffset(n), leader.remoteLogEndOffset(0)) = 0
- 第一轮同步:follower向leader fetch数据,fetch参数中包含fetch.offset(该值等于follower.logEndOffset),leader收到fetch请求后进行以下计算:
leader.remoteLogEndOffset = follower.fetchOffset = follower.logEndOffset = 0
leader.highWatermark= min(leader.logEndOffset(n), leader.remoteLogEndOffset(0)) = 0
- follower获取到从leader处同步到的从fetchOffset开始的n条数据(包含数据以及leader.highWatermark),然后把数据按顺序写入到本地,并且进行以下计算:
follower.logEndOffset = 0 + n = n
follower.highWatermark = min(leader.highWatermark(0), follower.logEndOffset(n)) = 0
此时:
leader.logEndOffset == follower.logEndOffset == n
leader.highWatermark == follower.highWatermark == 0
- leader收到m条数据,m >= 0(在这期间leader可能没有接收到数据也可能有接收到数据,并且事实上leader接收数据与follower同步数据之间是异步的),并进行以下计算:
leader.logEndOffset = n + m
leader.highWatermark = min(leader.logEndOffset(n + m), leader.remoteLogEndOffset(0)) = 0
- 第二轮同步:follower又向leader fetch数据,并进行以下计算:
leader.remoteLogEndOffset = follower.fetchOffset = follower.logEndOffset = n
leader.highWatermark = min(leader.logEndOffset(n + m), leader.remoteLogEndOffset(n)) = n
- follower获取到从leader处同步到的从fetchOffset开始的m条数据并写入本地,并且进行以下计算:
follower.logEndOffset = n + m
follower.highWatermark = min(leader.highWatermark(n), follower.logEndOffset(n + m)) = n
此时:
leader.logEndOffset == follower.logEndOffset == n + m
leader.highWatermark == follower.highWatermark == n
==此时在第二轮同步中leader.highWatermark和follower.highWatermark更新到了第一轮leader收到的数据偏移量,因此说highWatermark的更新是延后一轮的==
如果m == 0,则此时又回到完全同步的状态; 如果m > 0,则watermark又开始下一轮的向logEndOffset追赶~
同步流程之二:无producer数据更新
如果leader收到follower的fetch请求后发现没有符合fetch offset的数据,则会放置该请求(不马上返回响应给follower),然后默认等待500ms后才会返回空数据给follower,或者在这期间收到了producer的新数据也会马上返回,其余流程跟流程一一致。
同步流程之三:leader掉线
leader掉线后,会在isr中选举出新的leader,其先前的follower.highWatermark变成leader.highWatermark,follower.logEndOffset变成leader.logEndOffset。等原来掉线的leader恢复后,会变成follower,并且其原来的leader.highWatermark变成follower.highWatermark,==leader.logEndOffset变成folloer.logEndOffset并且重置为其highWatermark的值,并且其他follower的logEndOffset比新leader多出的部分会被截取掉,使其跟新leader保持logEndOffset一致,防止数据不一致==。在这个时候,处于掉线状态的isr(如旧leader)无法被截取掉多出的数据,所以在副本掉线恢复时,引入了使其logEndOffset恢复至其highWatermark的机制,防止此副本数据不一致。
同步流程之四:follower掉线
follower掉线恢复后仍然是follower,并且其follower.logEndOffset重置为其follower.highWatermark,会导致其logEndOffset比其highWatermark多出的那一部分数据丢失,不过可以重新从leader同步
同步流程之五:partition数据丢失
在kafka中有两个参数声明同步的副本数,producer端的ack和broker端的min.insync.replicas。
ack有三个选项,ack=0表示producer不需要接收到broker的响应,只需要发出数据给leader就认为已经成功。ack=1表示当数据写入leader时(leader的logEndOffset更新后)就给producer返回成功。ack=all表示需要所有isr都写入成功后(以leader.highWatermark为依据)才给producer返回成功。ack默认值为1
min.insync.replicas默认为1,表示当producer.ack=all时,至少需要同步多少个副本才算写入成功(以leader.highWatermark为依据),如果失败则数据最终不会写入到kafka并返回失败给producer org.apache.kafka.common.errors.NotEnoughReplicasExceptoin。
丢失原因1
ack=0,且leader没有成功写入数据
丢失原因2
ack=1,leader掉线了,某个follower还没有fetch到最新的数据然后就被选举成了新的leader,其他follower从该新leader的logEndOffset处开始同步,并移除他们有的而新leader中没有的数据。
ack=0时也有这个问题。
丢失原因3
ack=all,leader掉线了,某个follower已写入最新的logEndOffset但highWatermark没有更新到最新,然后该follower也掉线了,并且在该follower掉线恢复后成了新的leader(在恢复期间没有其他产生新的leader),这时候由于logEndOffset恢复至其highWatermark值导致数据丢失。
ack=0或1时也有这个问题。
同步流程之六:数据重复
ack=all, 在没有完成所有isr同步时,leader掉线了,此时producer认为写入失败,producer会重新往leader发送数据,而在旧leader掉线前已经写入了数据的follower成为新的leader,导致producer第二次发送的数据和第一次的数据重复写入该新leader并把重复数据同步到其他followers。