生产者客户端:主线程和 Sender 线程
1、主线程:由 KafkaProducer 创建消息,通过拦截器、序列化器和分区器后缓存到消息累加器(RecordAccumulator,也称消息收集器)中。
2、Sender 线程:从 RecordAccumulator 中获取消息,发送到 Kafka
概要:
一、拦截器:发送前准备:过滤、修改消息,发送回调前:统计
二、序列化器:对象转换成字节数组发送给 Kafka
三、分区器:根据 key 计算 partition
一、拦截器
1、作用发送前准备:过滤、修改消息,发送回调前:统计
2、实现 org.apache.kafka.clients.producer. ProducerInterceptor 接口,包含3个方法:
2、何时KafkaProducer调拦截器:
1)序列化和计算分区前调拦截器 onSend() 修改(一般不修改topic、key 和 partition等)
2)消息被应答前(Acknowledgement)或发送失败时调拦截器onAcknowledgement() ,优先于用户设定Callback前执行。
ps:运行在 Producer I/O线程中,实现越简单越好,否则影响消息发送速度
3)close() 关闭拦截器时清理
二、序列化器
1、作用:对象转换成字节数组发送给 Kafka。消费者用反序列化器转成对象(对应序列化器)
2、实现org.apache.kafka.common.serialization.Serializer 接口,3个方法:配置、序列化、关闭
configure():创建 KafkaProducer 实例时调,确定编码类型。
serialize:编解码,如几种序列化器都无法满足,可用 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用序列化工具或自定义
三、分区器
1、作用:根据 key 计算 partition。如果=指定partition 字段,不需分区器
2、实现:默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,实现 org.apache.kafka.clients.producer.Partitioner 接口,2个方法
3、partition() :哈希计算分区号,参数:主题、键、序列化后键、值、序列化后值,集群元数据。默认分区器 DefaultPartitioner 中,key null,轮询发往各个分区
4、close() :关闭分区器回收资源,空方法(默认分区器中)。
5、自定义分区器,实现某系列key都发到同一分区 有序消费,实现Partitioner 接口。
https://mp.weixin.qq.com/s/C6dfvzFkNDYgiNeZ4eWPBQ