消费者客户端向协调者发送了加入组请求和同步组请求,那么在协调者在接收到两种请求之后,协调者是怎么处理的喃?
kafka服务端是用Scala语言写的,Scala不懂。所以只能看看原理。
消费者在发送加入组请求和同步组请求到协调者后,协调者都不会马上返回响应给消费者。加入组请求的响应结果必须等到协调者收集完消费组中所有消费者的加入组请求后才会发送加入组请求响应给消费者。同步组请求的响应结果必须等到主消费者发送同步组请求后才会发送同步组请求响应后消费者。
为了保证协调者端的高性能,协调者在处理请求时有的不能立即返回响应结果给消费者,但这并不意味协调者对每个请求的处理都是阻塞的。那么既要做到不阻塞请求,又做到必须返回响应给客户端。那么协调者的解决方案是:在处理每个请求时,定义一个发送响应结果的回调方法。当协调者发现请求可以完成时,调用回调方法把结果发送给消费者。协调者发送响应结果给客户端,会创建一个和请求对象互相关联的响应结果对象,确保客户端发送给协调者的请求和协调者返回给客户端的响应结果在同一个网络通道中完成。
消费者和消费组元数据
协调者是同一个消费组下所有消费者的协调节点,那么同一个协调者可能会是多个消费组的协调者节点。所以协调者会管理多个消费组,因此在协调者的元数据中会有两种元数据信息:消费组的元数据和消费者的元数据。其中消费组的元数据包含了该消费组下的所有消费者的元数据。
消费者元数据
消费者元数据中包含了和消费者发送加入组请求时一样的参数:成员编号,消费组编号,协议元数据集,会话超时时间,最近一次发送心跳时间,发送加入组请求响应的回调方法和发送同步组请求响应的回调方法,还有一个重要的变量是记录当前消费者分配了哪些分区。
消费组元数据
一个消费组元数据管理了所有消费者的元数据。在消费组元数据必须存在的前提下添加或更新消费者元数据。协调者会从消费组的所有消费者中选择一个最大的会话超时时间作为再平衡操作的超时时间。每个消费者加入消费组都会经过加入组请求和同步组请求,而协调者在处理两种请求时都会改变消费组的状态:稳定状态,准备再平衡状态,等待同步状态,离开状态。稳定状态分为两种:1,刚刚创建消费组元数据,并且没有其中没有管理任何消费者。2,完成一次分区分配后,消费组管理的所有消费者都收到了分区。
发送加入组响应
协调者返回加入组响应给消费组下所有消费者之前,将消费组的状态改为等待同步状态,然后一次性一起发送响应结果给每个消费者。但是协调者处理消费者发送的加入组请求不是同时进行的,这说明协调者在处理某些消费者的协调者的加入组请求时并不会立即返回响应。
发送同步组响应
普通消费在收到加入组响应会立即发送同步组请求。主消费要在计算完分配分区才会发送同步组请求,协调者也没有同时处理所有消费者的同步组请求,但是最后会同时发送同步组响应给所有的消费者。
协调者在返回同步组响应给消费者之前,会先把消费组的分配结果以普通消息的形式持久化到内部主题中(_consumer_offsets)。这样在协调节点出现问题进行故障迁移时,新的协调者就可以从内部主题中获取到消费组分配结果。一个协调者充当了多个消费组的协调者节点,使用消费组缓存保存管理所有的消费组元数据,迁移协调者时读取了内部主题的消费组分配结果,重新加载到消费组缓存中。要查询消费组元数据时会首先从消费组缓存中读取。
协调者将消费组分配结果保存到内部主题之后,才会调用回调方法发送同步组响应给消费者。协调者保存消费组分配结果是副本管理器的追加消息流程来处理的。
疑问:kafka是怎么做到延迟发送响应给消费者客户端的?怎么判断已经处理完成?
这就需要引出延迟的加入组操作。
延迟的加入组
消费者处理不同消费者的加入组请求,不能立即返回加入组响应给每个消费者。于是协调者就创建一个延迟操作,表示协调者会延迟发送加入组响应给消费者。由于消费组中包含了多个消费者,消费者都会发送加入组请求,协调者不会为每个消费者的加入组请求都创建一个延迟操作对象,而是在消费组的状态从稳定状态转化准备再平衡状态时才会创建延迟操作对象。消费组的初始状态是稳定稳定状态,第一个消费者发送加入组请求,协调者更改消费组的状态为准备再平衡,并创建一个延迟操作对象,当第二个消费者发送加入组请求,因为现在消费组的状态为准备再平衡,所以这时不会创建延迟操作对象。由此可见,在处理加入组请求逻辑时:一个消费组只对应一个延迟操作对象。
成员编号
协调者处理消费者的加入组请求,如果消费者设置的成员编号未知,协调者会为消费者指定一个新的成员编号,然后创建消费者成员元数据,并加入到消费组元数据中;如果消费者成员编号是已知的,说明消费组元数据中已经存在对应的消费者成员元数据,只需更新已有的成员元数据。
协调者为消费者分配的成员编号,会作为加入组响应结果返回给消费者。消费者发送同步请求时必须指定这个分配的成员编号,这样才能找到对应的消费者元数据。如果消费者要重新加入消费组,再次发送加入组请求也要带上分配的成员编号。
协调者为消费者分配了成员编号,协调者的消费组的元数据中会一直保持这个消费者的信息。成员编号用来标识一个消费者,消费者一旦获取了成员编号,后续的所有请求动作都应该带上这个成员编号。
协调者中的消费组元数据管理了消费者的元数据,协调者处理请求时(加入组请求和同步组请求),都需要操作消费组元数据,对消费者的元数据进行增删查改。在协调者侧消费组元数据只有一份,所以为了线程安全,需要对消费组元数据加锁。
准备再平衡
消费组的状态从稳定进入准备再平衡,表示准备开始再平衡操作。一次再平衡操作只会由一个消费者发起,并只会创建一个延迟的操作对象。创建完延迟的操作对象(延迟的加入组请求)后立即尝试看能不能完成。如果不能及时完成,就放入延迟缓存中,延迟缓存中会提供检查延迟操作能否完成的方法,并且保证在指定时间内还未完成的延迟操作,必须强制完成超时的延迟操作。
延迟操作和延迟缓存
延迟操作添加到延迟缓存,需要指定相关的信息:
1,延迟操作需要指定一个超时时间,表示在指定时间内未完成时会被强制完成。
2,加入延迟缓存需要指定一个键:比如,延迟加入组的键就是消费组的编号。
3,协调者创建延迟操作后,都会马上尝试完成延迟操作(延迟操作越早完成越好)。延迟操作都有依赖条件,任何可能改变依赖条件的事件都应该执行尝试完成延迟操作。比如:消费者申请加入消费组而发送的加入组请求,会创建延迟的加入。那么后面的消费者再发送加入组请求时,都应该去尝试完成这个延迟加入的延迟操作对象。
4,有依赖事件发生时去尝试完成延迟操作,怎么判断该延迟操作能不能完成?查看延迟加入操作能不能完成根据状态数据:消费组元数据(消费组元数据包换了消费组下所有的消费者元数据信息)消费组中的所有消费者成员是否都发送或重新发送了加入组请求。
延迟操作的重要方法:
tryComplete()尝试完成,如果不能完成,返回false,表示延迟操作还不能完成。
onComplete()延迟操作完成时的回调方法,完成:正常主动完成和超时被动完成。
onExpiration()延迟操作超时的回调方法,前面一直尝试完成都不能完成,在指定的超时时间过后就会强制完成,调用此方法,之后会调用onComplete()方法。
延迟缓存的重要方法:
tryCompleteElsewatch(operation,key):尝试完成延迟操作,如果不能完成就以指定的键监控这个延迟操作。加入到延迟缓存中。
checkAndComplete(key):检查并尝试完成指定键的延迟操作。
以上延迟缓存的两个方法都会调用延迟操作的tryComplete()方法。
超时时间:延迟加入的超时时间是所有消费者最大的会话时间,因为延迟加入针对一个消费组级别的;延迟心跳的超时时间是对应消费者的超时时间,因为协调者针对每个消费者都会创建一个延迟心跳操作。
尝试完成延迟的加入操作
延迟操作能否完成的判断条件:消费组元数据的notYetRejoinedMembers()方法的返回值是否为空。notYetRejoinedMembers()收集消费组中awaitingJoinCallback值对象为空的消费者元数据。协调者开始处理消费者发送的加入组请求,最开始就会设置awaitingJoinCallback的值为发送响应的回调方法。因此被协调者开始处理的消费者awaitingJoinCallback不为空。notYetRejoinedMembers()方法就不会选择此消费者。消费组元数据保存了消费组管理的所有消费者元数据,只要消费组中有消费者没有发送加入组请求,notYetRejoinedMembers()就会返回数据(不为空),就不能完成延迟操作。
总结:notYetRejoinedMembers()表示还没有重新发送加入组请求的消费者,如果返回有数据说明还有消费者没发送加入组请求,不能结束延迟操作。
协调者处理第一个发送加入组请求的步骤:
1,协调者处理第一个消费者发送的加入组请求,创建消费者元数据,加入消费组元数据。
2,消费组从最开始的稳定状态转化为准备再平衡状态。
3,因为是第一个消费者发送加入组请求,所以协调者为消费组创建一个延迟的加入组对象,并会马上通过延迟缓存尝试完成这个延迟的加入组对象。
4,由于现在消费组中的所有消费者(就只有一个消费者)的awaitingJoinCallback值对象不为空,notYetRejoinedMembers()返回空,满足完成延迟操作的条件。
5,调用可以完成的延迟操作的回调方法事,首先会把消费组的状态改为等待同步。
6,返回加入组响应给所有的消费者(只有一个消费者)。
疑问:协调者在处理第一个消费者的加入组请求,经历了以上的步骤,在以上步骤处理的同时,完全有可能会有第二个,第三个消费者发送加入组请求到协调者上。那么在以上步骤中完全没有看到协调者处理这些消费者的请求。为什么喃?
回答:那是因为在协调者处理第一个消费者的加入组请求时,就已经对消费组元数据进行加锁同步操作,以防止一次处理多个消费者的加入组请求。所有在后面获取消费组元数据中的消费者时只会获取到一个消费者。也就满足了完成延迟操作的条件。
当协调者处理完第一个消费者的加入组请求,并释放了对消费组元数据的锁后,消费者接收到了加入组响应。这是消费组的状态已经从稳定状态到准备再平衡状态到了等待同步状态。消费者收到了加入组响应后,因为只有一个消费者,所以它也是主消费者,就会马上执行分区分配工作,之后会发送同步组请求到协调者。因为协调者已经释放了对消费组元数据的锁,那么在第一个消费者在做这些动作的情况的时候就会出现两种情况:
第一:消费组元数据的锁释放后,在第一个消费者发送同步组请求到协调者之前,其他的消费者已经发送了加入组请求到协调者,这时协调者就会处理新消费者的加入组请求。
说明:协调者在返回加入组响应给消费者之前就已经把消费组的状态改为了等待同步状态。收到加入组响应的消费者就会开始执行分配分区动作(只有一个消费者,那它就是主消费者),这时还没有发送同步组请求给协调者,就有其他的消费者发送了加入组请求给协调者了。这个时候其实第一个消费者执行的分区分配算法是无意义的,但是第一个消费者并不知道,并且会继续执行。并且执行完成后会立即发送同步组请求给协调者,其实这时发送同步组请求给协调者,协调者是不会接受的,因为协调者已经接受了其他消费者的加入组请求,已经把消费组的状态改为了准备再平衡。协调者会返回一个错误码给发送同步组请求的消费者,让其重新发送加入组请求。
疑问:协调者接收并开始处理了其他消费者的加入组请求,那么处理流程会不会和第一个发送加入组请求的消费者流程一样?
回答:不会,因为协调者接收到其他消费者的加入组请求后,就会创建消费者元数据,并且加入到消费组元数据中。在notYetRejoinedMembers()判断的时候,第一个发送加入组请求的消费者的awaitingJoinCallback为空(第一个消费者完成了加入组的延迟操作对象后就会把awaitingJoinCallback重置为空),会被notYetRejoinedMembers()方法选出来,这样就并不满足完成延迟操作的条件,就不会执行返回加入组响应给消费者。
第二:消费组元数据的锁释放后,第一个消费者很快发送同步组请求到协调者,协调者就处理这个消费者的同步组请求,这时又会对消费组元数据加锁。协调者就不会处理其他消费者发送来的加入组请求。
说明:第一个消费者发送了同步组请求后,因为对消费组元数据要进行加锁,所以协调者只会处理此消费者的同步组请求,并会发送同步组响应给消费者。这时就会把消费组的状态改为稳定状态。并释放消费组的锁。这时协调者就可以去处理其他消费者的加入组请求了。协调者处理第二消费者的加入组请求和第一个消费者类似:消费组的状态从稳定状态转换到准备再平衡状态,并创建一个延迟对象,并通过延迟缓存马上尝试完成这个延迟对象,但是这时因为消费组中有两个消费者元数据,并且第一个消费者还没有重新发送加入组请求(这个时候第一个消费者认为自己已经完成了加入同步完成了)。这个时候在调用notYetRejoinedMembers()方法,第一个发送加入组请求的消费者的awaitingJoinCallback为空。notYetRejoinedMembers()返回不为空,不能完成延迟操作。这样协调者第二个消费者的加入组请求结束了,但是延迟操作对象还没有完成,会被加入到延迟缓存中监控起来,后续要完成延迟操作有两种方式:外部事件触发(有消费者发送加入组请求就去尝试一次看是否能完成);超时触发(超过消费组中所有消费者最大的超时时间)。如果第一个消费者在规定时间内又再次发送了加入组请求,那么就会再次去尝试完成延迟操作,这个时候就会满足完成延迟操作的条件,完成这样延迟操作,从延迟缓存中移除,调用延迟操作完成的回调方法,返回加入组响应给两个消费者。如果第一个消费者在超时的时候还没有发送加入组请求,协调者会在延迟操作超时后,强制完成延迟操作,这时也会调用延迟操作的回调方法,返回加入组响应给第二个消费者(因为第一个消费者一直没有发送加入组请求,回调方法是空,所以不会发送响应)。
协调者在处理第一个消费者的“加入组请求”时,创建的延迟操作对象会立即完成 。 但处理第二个消费者的“加入组请求”时,消费组中已经存在第一个消费者的成员元数据,此时消费组中总共有两个消费者了 。 但是第一个消费者成员元数据的回调方法在协调者返回“加入组响应”给它时,就被重置为空了 。必须等待第一个消费者再次发送加入组请求是才会再次尝试完成延迟操作。
总结:其实消费组在处理消费者加入同步请求时,是通过消费者不断的发送请求后,才达到终于的稳定。
消费组在稳定状态下,当有新消费者发送加入组请求,这时将会把消费组的状态改为准备再平衡,消费组中原有的消费者通过心跳的方式(协调者将加入组响应返回给消费者后会马上对改消费者启动心跳监控机制)感知需要重新发送加入组请求。
消费组在等待同步状态(协调者在发送加入组响应之前就更新消费组的状态为等待同步)下,当有新消费者发送加入组请求,这时将会把消费组的状态改为准备再平衡,由于消费组的状态未稳定,原有的消费者不会有心跳任务,这时协调者采用返回错误码的方式返回到消费者,即消费者在发送同步组请求时,返回错误码给消费者,让其重新发送加入组请求。这里有一个特殊的地方,协调者在返回同步组响应给消费者之前,需要保存消费组的分配结果到内部主题中,这个方法不会对消费组的元数据进行操作,所以会释放消费组元数据的锁。协调者就有机会处理其他消费者的加入组请求和同步组请求,看看协调者怎么来处理:现在消费组还是处于等待同步状态,按照等待同步状态的情况来处理。
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现