Kafka系列 - 生产者详解

前言

一个消息队列,必然存在着生产者和消费者,而生产者(Producer)负责向Kafka服务节点(Broker)。

从一个示例开始

public class KafkaProducerDemo {
    private static final String brokerList = "localhost:9092";
    private static final String topic = "topic-demo";
    
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.server", brokerList);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "id.demo");
        return props;
    }

    public static void main(String[] args) throw Exception {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<topic, "Message Test">;
        producer.send(record);
    }
}

1. 生产者客户端

1.1 生产者创建及参数说明

创建Kafka生产者客户端KafkaProducer有3个参数必填项:
1)bootstrap.servers:指定生产者客户端连接Kafka集群的地址列表,多个以逗号隔开(如:127.0.0.1:9092,127.0.0.1:9093)。连接Kafka集群并不需要配置所有的broker地址,因为生产者能从broker获取到其他broker的信息,一般至少设置两个,一个broker宕机时也仍然能连接到Kafka集群。
2)key.serializer:将Key(可以用key来计算分区号,从而将消息归类到某个指定分区)序列化成字节数组。
3)value.serializer:将value序列化成字节数组。

1.2 消息发送

根据参数构建完生产者后,就是创建消息对象 ProducerRecord,属性如下:

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partitoin;
    private final Headers headers; 
    private final K key;
    private final V value;
    private final Long timestamp; 
}
  • 其中 topicvalue 为必填项,其余属性可选填
  • key 用来指定消息的键,属于消息的附加信息,可以用来计算分区号,让消息可以发往特定的分区,除了 topic 外消息的二次归类,即同一个key的消息会被划分到同一个分区

发送消息的三种模式

发送消息的方法本身是异步的,同步只是在调用方法后对后续操作进行了阻塞

发送消息方法如下:

public class KafkaProducer<K, V> implements Producer<K, V> {
    // ...
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }
    // ...
}
  • 发后即忘
producer.send(record);
  • 同步
Future<RecordMetadata> future = producer.send(record);
// 调用 get方法阻塞等待响应,从而达到同步效果
RecordMetadata metadata = future.get(); 
  • 异步
    利用send()方法的Callback,在Kafka返回响应时调用该函数实现异步的操作,如:
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            log.error("send message exception:{}", exception);
        } else {
            log.info("send message success, topic:{} - partition:{} - offset:{}", metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
});

通常KafkaProducer不会只负责发送单条消息,一般是发送多条消息。对于同一分区的不同消息,先发送的消息,回调也会先执行,即回调函数的执行是分区有序的。

关于close方法
发送完消息后,需要调用KafkaProducer.close()方法回收资源。
close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。
也提供了带有超时时间的close方法,在超过等待时间后会强行关闭KafkaProducer,一般不建议使用。

1.3 序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka,同样的,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。

序列化器均实现接口:

public interface Serializer<T> extends Closeable {
   
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

1)configure()方法在KafkaProducer创建时调用,用来配置当前类,如编码类型的确定。
2)serialize()方法用来执行序列化操作。
3)close()方法用来关闭当前的序列化器(一般是个空方法),该方法可能会被KafkaProducer调用多次,实现的话需要保证方法的幂等性。

默认实现的序列化器如:
org.apache.kafka.common.serialization.StringSerializer

1.4 分区器

消息经过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)才会被送达broker。

如果消息ProducerRecord 指定了 partition 字段,就不需要分区器进行分区,因为分区已被 partition 指定。

分区器默认需要实现接口:

public interface Partitioner extends Configurable, Closeable {
    int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);

    void close();

    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

默认实现:org.apache.kafka.clients.producer.internals.DefaultPartitioner

public class DefaultPartitioner implements Partitioner {
    // ...
    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    public void close() {
    }
}

默认分区器DefaultPartitioner的分区规则:

  • 如果 key 不为null,则对 key 进行哈希,最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区(计算得到的分区号会是所有分区中的任意一个,与key为null是轮询可用分区有差别)。
  • 如果 key 为null,消息会已轮询的方式发往 topic 内的各个可用分区
1.5 拦截器

生产者拦截器可以用来在消息发送前做一些准备工作,如按指定规则过滤不符合要求的消息,对消息内容进行加工处理等;也可以用来在发送回调逻辑前做一些操作,比如统计消息发送的成功率。

public interface ProducerInterceptor<K, V> extends Configurable {
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

1)将消息序列化和计算分区之前会调用拦截器的onSend()方法对消息进行相应的定制化操作(一般不要对topic、key 和 partition进行修改)。
2)在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,且优先于Callback。该方法运行在Producer的IO线程,应尽量简单,不然会影响消息发送效率。

2. 原理分析

2.1 生产者客户端架构
生产者客户端整体架构(图片来源https://blog.csdn.net/LINBE_blazers/article/details/104072886

整个生产者客户端由主线程和Sender线程协调运行。主线程中创建消息,然后通过拦截器、序列化器和分区器处理后缓存到消息累加器(RecordAccumulator),Sender线程负责从消息累加器中获取消息并发送到broker。

RecordAccumulator用来缓存消息便于Sender线程可以批量发送,减少网络传输的资源消耗,从而提高性能。如果生产者发送消息的速度超过Sender发送到服务器的速度,会导致生产者空间不足,将会阻塞一段时间后,抛出异常,与参数max.block.ms有关,默认60秒。而buffer.memory参数则可以设置缓存空间大小,默认为32MB。

RecordAccumulator中为每个分区维护了一个双端队列,队列中的内容是ProducerBatch,即Deque<ProduderBatch>,创建消息写入到尾部,发送消息从头部读取。ProducerBatch是消息发送的一个批次,里面包含了一个或多个ProducerRecord。

Sender从RecordAccumulator中获取到缓存的消息之后,会进一步将<分区,Dequeue<ProducerBatch>>
转换为<Node,List<ProruderBatch>>,Node表示的是kafka集群的broker节点。这里是一个概念的转变,对于网络连接来说,生产者客户端与具体broker节点建立的连接,也就是向具体的broker节点发送消息而不关心具体分区。而对于KafkaProducer来说,它只关心向哪个分区发送消息。所以这里做一个从应用逻辑层面到网络IO层面的转换。

请求在发送给Kafka之前还会保存到InFlightRequests中,形式为: Map<NodeId,Dequeue<Request>>
主要作用是缓存了已经发出去但是还未收到响应的请求。InFlightRequests通过配置参数max.flight.requests.per.connection可以限制每个链接最多缓存数量,默认值为5,即每个链接最多只能缓存5个未响应的请求,超过该参数之后就不能继续像这个连接发送请求。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352