生产者客户端架构
生产者客户端发送详细说明
生产者由两个线程运行,分别是主线程和Sender线程,主线程创建消息将消息放到是消息收集器,消息收集器将消息按照分区维护在不同Deque中,Deque中存的是ProducerBatch,包含多个消息,当ProducerBatch满了时,会唤醒Sender线程。当Sender被唤醒后,会将消息按照nodeId进行聚合,这样的话同一节点下的分区会被封装成一个Request发送到服务器,当发送后会将Request会被放到InFlightRequests中,表示没有得到响应的请求,如果InFlightRequests过大,达到了max.in.flight.requests.per.connection,那么将无法像这个节点发送请求
生产者产生速度比发送到服务端快会出现什么问题?
会将生产者的缓存堆满,此时生产者会阻塞一定时间,超时还没有发送的话会抛出异常。
生产者发送消息图解

inFlightRequests中的数据会按照先进先出的方式进行处理,服务端怎样保证消息处理顺序?
服务端在收到客户端消息时,会将连接进行mute,等处理完消息后会进行unMute,在mute期间收到同一连接的请求是不会进行处理的
元数据更新
当客户端没有需要使用的元数据信息或者超过了一定时间没有获取过元数据,就会发起元数据更新。当需要更新元数据信息时,会挑选出leastLoadedNode,然后发起请求获取元数据。leastLoadedNode是所有Node中没有收到响应的请求数最少的那一个。
生产者重要的参数
acks,生产者发送消息后,需要多少副本收到消息才认为写入成功。acks=1,只要写入leader就认为成功,acks=0,只要发送后就认为成功,acks=-1,等待ISR中所有副本都成功后才认为成功
linger.ms 多久发一次消息
消费者
消费者订阅方式
subscribe和assign两种方式进行订阅,subscribe方法由Consumer自动分配消费的分区并且能实现负载均衡和故障自动转移,assign模式由用户自己控制
消费者获取消息流程
kafka在首次poll时,首先获取coordinator,然后进行rebalance,之后consumer加入消费者组,然后获取到自己的分区和分区之前消费的位移,如果获取不到会通过配置方式拿到最新或者最旧的位移,获取到消息后会更新本地保存的位移值,位移值保存在subscriptions。消费完成后,提交位移时,从subscriptions拿出当前消费到的位移值进行提交,位移提交,会将消费位移发送到kafka内部主题_consumer_offsets中
再均衡原理
首先向负载最小的节点发送FindCoordinatorRequest,节点拿到Consumer的groupId,然后使用groupId的hashCode对_consumer_offsets取余得到位移主题的partitionId,然后查找partitionId的leader副本所对应的节点id,这个节点id就是groupCoordinator的id,随后返回给Consumer。之后Consumer发送joinGroupRequest请求到coordinator,coordinatro选择一个客户端为消费者组的leader,然后Consumer投票分区策略,选出分区策略后将策略返回给Consumer,leader Consumer收到的消息包含其它消费者的信息,然后通过分区策略进行分区,之后将分区结果发到服务器,服务器发送响应给各个Consumer,获取到对应的分区,之后Consumer会通过OffsetFetchRequest获取分区的位移。
_consumer_offsets的写入
_consumer_offsets位移主题,客户端通过发送OffsetCommitRequest进行提交位移,主要包含groupId,topic,patition,offset信息,可以通过topic,groupId,partition唯一确定一个offset,可以通过日志压缩讲同一个key下的位移值只保留一个,key中包含groupId,topic,patition,version
_consumer_offsets的读取
coordinator会读取位移主题中的数据并写入到元信息中
日志存储
日志架构

日志结构
log是一个文件夹,命名为topic-partition,文件夹中包含多个logSegment文件,每个logSegment包含日志文件和偏移量索引文件和时间戳索引文件,文件的命名都是以logSegment的baseOffset来命名的
消息压缩机制
kafka会将多条消息一起压缩,在broker保存,consumer拉取时都是压缩状态,
详细说一下kafka服务端的索引
日志索引,kafka使用ConcurrentSkipListMap来保存各个日志分段,用baseOffset作为key,这样能快速定位到消息所在的日志分段
偏移量索引,保存在mappedByteBuffer中,内容是相对偏移量和消息的物理地址,相对偏移量是单调递增的,可以使用二分查找来加快确定物理地址
时间戳索引,保存的是时间戳和对应的相对偏移量,可以使用二分查找来加快确定相对偏移量
消息定位
kafka用跳表保存各个日志分段,每个日志分段的key是baseOffset,这样可以根据指定偏移量快速获取到日志分段,然后就能找到偏移量索引。之后在偏移量索引中使用二分查找找到消息所在的物理地址,然后从物理位置顺序查找直到找到要拉取的消息
日志是怎样删除的
日志删除,kafka的日志管理器有一个专门的任务周期性的删除文件,基于时间,删除过期的logSegment,基于日志大小,根据日志总大小是否超过阈值,基于日志起始偏移量,
副本
简单说一下什么是副本
一个分区中包含一个或多个副本,其中一个为leader,其它的为follower,各个副本位于不同的broker,只有leader提供服务,follower只负责同步数据,所有副本统称为AR,与leader保持同步状态的副本集合叫做ISR ,LEO标识分区最后一条消息的下一个位置,每个副本都有自己的LEO,ISR中最小的LEO即为HW,俗称高水位,消费者只能拉倒HW之前的消息,leader epoch防止在极端情况下,比如多个副本被选主后又立刻宕机的情况下造成数据截断,从而丢失数据或者数据不一致
磁盘IO流程

服务器
处理流程
Acceptor建立连接然后将连接传到processor,之后由processor来监听连接的读写事件,发现读事件时,将事件转换为读请求放到requestQueue,之后handler轮训获取requestQueue中的数据,获取到请求话将请求传给kafkaApis进行处理,kafkaApis处理完后将响应写到responseQueue,之后processor轮训responseQueue将响应返回给客户端
为什么是轮训而不是长轮训?
每次轮训后更新请求处理的频率等统计信息

写入消息
如果ack = 0,io线程解析出数据后就响应客户端,如果ack=1,那么等io线程将消息写入到本地好就响应,如果ack=-1,那么要等ISR中其他的副本都获取到此消息后才返回

拉取消息

控制器的选举,kafka集群中的多个broker通过zk创建临时节点的方式选主,选主成功后将/controller_epoch下的值加1,表示当前控制器的版本
控制器的用处,监听分区变化,监听主题变化,监听broker变化,更新集群元数据信息