最近我读了kakfa这本书,自己阅读后的感受总结下,希望可以帮到大家,如果有写的不正确的地方欢迎大家指正。
1.生产者主要功能
kafka生产者主要功能是发送消息,发送消息主要包含4部分:
(1)配置相关参数,设置生产者实例。
(2)构建待发送消息
(3)发送消息
(4)关闭生产者实例
消息发送是将准备好的消息,在需要经过拦截器,序列化器,分区器等一系列操作才发往borker。
首先是消息准备:
生产者构建对象ProducerRecord主要包含了以下内容
struct {
Topic: 主题
Partition:分区
Headers:消息头部
Key:消息键
Value:消息值
Timestamp 消息时间错
}
主题:指的就是topic,是消息的第一次逻辑划分。
分区:消息的第二次逻辑蜂划分(后面章节会讲到)
Headers:消息头部:消息头部主要用来设置应用相关信息。
Key:用来指定消息的键,计算分区号让消息进入指定的分区。可以进行消息的二次分类,相同消息的key可以分配到同一个分区中。Key还可以支持日志的压缩功能。
Value:代表发送的消息内容,
Timestamp:代表消息的发送时间。
2.生产者主要功能
一般是设置,broker地址,客户端参数等等,kafka生产者线程安全。
3.发送消息
在发送消息前首先需要构建ProducerRecord对象,对于发送消息的结构体中其实只有topic和value为必填参数其他的参数均为选填参数。其他参数即可根据业务述求决定,例如:同分区消息有序,时间戳消息放等等。
在构建完消息后,紧接着的就是发送消息。
Kafka的消息发送主要分为:发送即忘(fire-and-forget),同步(sync),异步(async)
发送即忘(fire-and-forget):只关注忘消息队列中发送消息,不关注是否到达,性能是最高的,可靠性是最差的。
Fakfa的生产者不是void方法,而是可以通过get的方式得到发送的结果。Send方法本事是异步的,调用完send方法后,调用get方法是阻塞等待borker的相应。
生产者在发送消息的时候一版会出现可重试的异常和不可重试的异常。可重试异常,例如:网络超时,分区被leader副本不可用,这个异常的副本需要下线,新的leader副本选举成功等等。不可重试的异常例如:消息体太大,消息队列会直接丢弃。
异步发送消息:异步发送消息指定了callback的方式,消息的响应可以使用消息是否发送成功。
4.生产者原理
生产者客户端是由主线程和sender线程协调运行,主线程负责创建消息,然后通过拦截器,序列化器之后追加到消息追加器中,sender现成主要将消息追加器中的消息发送到kafka中。
主线程发送来的消息都会被追加到RecordAccumulator中的某个双端队列,RecordAccumulator中维护了一个双端队列,队列的内容是ProducerBatch,消息写入缓存时,增加到消息队列尾部,sender线程读取时候从从双端队列头部读取,ProducerRecord是生产者发送的消息,ProducerBatch是指一批消息的批次,ProducerRecord被包含在ProducerBatch中。当一条消息流入RecordAccumulator时候,首先寻找消息分区的双端队列(没有则新建),在从这个双端队列尾部获取一个ProducerBatch(没有则创建),查看这个ProducerBatch中是否可以写入ProducerRecord,如果可以则写入,如果不可以写创建一个ProducerBatch。
Sender线程从RecordAccumulator中获取消息之后,会进一步的奖分区,productbatch转换为node,list<productBatch>