producer开发

本章着重讨论kafka的producer设计以及基于java版本producer的开发与使用

producer概览

kafka producer就是负责向kafka写入数据的应用程序。
子0.9.0.0版本起,kafka发布了java版本producer供用户使用,但作为一个比较完善的生态系统,kafka必然要支持多种语言的producer,这其中比较著名的当属C/C++平台上的producer库librkafka(准确的说,这些库也同时包含了对consumer的支持,统称他们是clients似乎更加合理一些),而向python,go或.net这种主流的编程语言也有对应的producer库。当前apache kafka支持的第三方ciients库的完整列表如下:

  • C/C++
  • python
  • golang
  • erlang
  • .NET
  • clojure
  • ruby
  • Node.js
  • proxy(HTTP REST等)
  • perl
  • stdin/stdout
  • PHP
  • rust
  • alternative java
  • storm
  • scala DSL
  • ...

值得注意的是,上面这些第三方库基本上都是由非Apache Kafka社区的人维护的,其中一些比较大的库已经由Confluent公司的人参与研发和维护(前面提到过的Confluent公司发布的企业级产品包含了librdkafka库支持C++等平台),但如果用户下载的是Apache Kafka,默认是不包含这些库的,需要额外单独下载对应的库。关于这些第三方库的详细信息以及下载地址,请访问https://cwiki.apache.org/confluence/display/KAFKA/Clients

另外,Apache Kafka还封装了一套二进制通信协议,用于对外提供各种各样的服务。对于producer而言,用户几乎可以直接使用任意编程语言按照该协议的格式进行编程,从而实现向Kafka发送消息。实际上内置的Java版本producer和上面列出的所有第三方库在底层都是相同的实现原理,只是在易用性和性能方面有所差别而已。这组协议本质上为不同的协议类型分别定义了专属的紧凑二进制字节数组格式,然后通过Socket发送给合适的broker,之后等待broker处理完成后返还响应(response)给producer。这样设计的好处在于具有良好的统一性——即所有的协议类型都是统一格式的,并且由于是自定义的二进制格式,这套协议并不依赖任何外部序列化框架,从而显得非常轻量级,而且也有很好的扩展性。

第6章将详细讨论这套二进制通信协议的设计与使用。在这里只需要知道producer底层是由它们实现的即可。

Kafka producer在设计上要比consumer简单一些,因为它不涉及复杂的组管理操作,即每个producer都是独立进行工作的,与其他producer实例之间没有关联,因此它受到的牵绊自然也要少得多,实现起来也要简单得多。目前producer的首要功能就是向某个topic的某个分区发送一条消息,所以它首先需要确认到底要向topic的哪个分区写入消息——这就是分区器(partitioner)要做的事情。Kafka producer提供了一个默认的分区器。对于每条待发送的消息而言,如果该消息指定了key,那么该partitioner会根据key的哈希值来选择目标分区;若这条消息没有指定key,则partitioner使用轮询的方式确认目标分区——这样可以最大限度地确保消息在所有分区上的均匀性。当然producer的API赋予了用户自行指定目标分区的权力,即用户可以在消息发送时跳过partitioner直接指定要发送到的分区。

另外,producer也允许用户实现自定义的分区策略而非使用默认的partitioner,这样用户可以很灵活地根据自身的业务需求确定不同的分区策略。后面章节中会详细讨论如何自定义分区策略。

有了partitioner的帮助,我们就可以确信具有相同key的所有消息都会被路由到相同的分区中。这有助于实现一些特定的业务需求,比如可以利用局部性原理,将某些producer发送的消息固定地发送到相同机架上的分区从而减少网络传输的开销等。当然了,如前所述,如果没有指定key,那么所有消息会被均匀地发送到所有分区,而这通常也是最合理的分区策略。

在确认了目标分区后,producer要做的第二件事情就是要寻找这个分区对应的leader,也就是该分区leader副本所在的Kafka broker。前面章节中提到了每个topic分区都由若干个副本组成,其中的一个副本充当leader的角色,也只有leader才能够响应clients发送过来的请求,而剩下的副本中有一部分副本会与leader副本保持同步,即所谓的ISR。因此在发送消息时,producer也就有了多种选择来实现消息发送。比如不等待任何副本的响应便返回成功,或者只是等待leader副本响应写入操作之后再返回成功等。不同的选择也有着不同的优缺点,我们会在后续章节中讨论如何选择不同的策略。

Java版本producer的工作原理如图4.1所示。


producer首先使用一个线程(用户主线程,也就是用户启动producer的线程)将待发送的消息封装进一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定了目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而producer的另一个工作线程(I/O发送线程,也称Sender线程)则负责实时地从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker。整个producer的工作流程大概就是这样的。

构造producer

producer程序实例

首先,下面给出了一份可运行的producer程序代码清单。这份代码实现了最简单的功能:构造一条消息,然后发送给Kafka。在运行这个producer程序之前,要保证启动一个最小规模的Kafka单机或集群环境。Kafka运行环境搭建方法请参考第3章。

package com.huxi.kafkaapi;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // 必须指定
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 必须指定
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 必须指定

        props.put("acks", "-1");
        props.put("retries", 3);
        props.put("batch.size", 323840);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        props.put("max.block.ms", 3000);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }
}

构造一个producer实例大致需要以下5个步骤。

  1. 构造一个java.util.Properties对象,然后至少指定bootstrap.servers、key.serializer和value.serializer这3个属性。在上面的代码清单中这3个属性后面都追加了注释,表明这是必须要指定的参数,它们没有默认值。

  2. 使用上一步中创建的Properties实例构造KafkaProducer对象。

  3. 构造待发送的消息对象ProducerRecord,指定消息要被发送到的topic、分区以及对应的key和value。注意,分区和key信息可以不用指定,由Kafka自行确定目标分区。

  4. 调用KafkaProducer的send方法发送消息。

  5. 关闭KafkaProducer。

1. 构造Properties对象

下面将详细展开每一步要做的事情。首先要构造一个Properties对象,在这一步中有3个参数或属性是必须要指定的。如果我们翻开Kafka官网中producer的参数列表(详见https://kafka.apache.org/documentation/#producerconfigs)会发现这3个参数是没有默认值的。它们分别如下。

bootstrap.servers

该参数指定了一组host:port对,用于创建向Kafka broker服务器的连接,比如k1:9092,k2:9092,k3:9092。上面的代码清单中指定了localhost:9092,producer使用时需要替换成实际的broker列表。如果Kafka集群中机器数很多,那么只需要指定部分broker即可,不需要列出所有的机器。因为不管指定几台机器,producer都会通过该参数找到并发现集群中所有的broker。为该参数指定多台机器只是为了故障转移使用。这样即使某一台broker挂掉了,producer重启后依然可以通过该参数指定的其他broker连入Kafka集群。

另外,如果broker端没有显式配置listeners使用IP地址,那么最好将该参数也配置成主机名,而不是IP地址。因为Kafka内部使用的就是FQDN(Fully Qualified Domain Name)。

key.serializer

被发送到broker端的任何消息的格式都必须是字节数组,因此消息的各个组件必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化之用的。这个参数指定的是实现了org.apache.kafka.common.serialization.Serializer接口的类的全限定名称。Kafka为大部分的初始类型(primitive type)默认提供了现成的序列化器。上面的代码清单中使用了org.apache.kafka.common.serialization.StringSerializer,该类会将一个字符串类型转换成字节数组。这个参数也揭示了一个事实,那就是用户可以自定义序列化器,只要实现Serializer接口即可。

需要注意的是,即使producer程序在发送消息时不指定key,这个参数也是必须要设置的,否则程序会抛出ConfigException异常,提示“key.serializer”参数无默认值,必须要配置。

value.serializer

和key.serializer类似,只是它被用来对消息体(即消息value)部分做序列化,将消息value部分转换成字节数组。上面的代码清单中该参数指定了与key.serializer相同的值,即都使用StringSerializer。当然了,value.serializer也可以设置成与key.serializer不同的值。
一定要注意的是,这两个参数都必须是全限定类名。

2. 构造KafkaProducer对象

设置了这3个属性之后,下面就要构造KafkaProducer对象了。KafkaProducer是producer的主入口,所有的功能基本上都是由KafkaProducer来提供的。创建KafkaProducer实例很简单,只需要下面一句命令即可:


image.png

创建producer时也可以同时指定key和value的序列化类,比如这样:


image.png

如果采用这样的方式创建producer,那么就不需要显式地在properties中指定key和value的序列化类了。

构造ProduceRecord对象

构造好kafkaProdcer实例后,下一步就是构造消息实例。java版本producer使用ProducerRecord类来表示每条消息。创建prodducerRecord也很简答,最简单的形式就是指定topic和value,如下:

image.png

当然,producerRecord还支持指定更多的消息消息,比如可以控制该消息直接被发往的分布以及消息的时间戳,
具体API格式请参见https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

不过这里要注意的是,一定要谨慎指定时间戳,因为在目前的kafka设计中,时间戳索引文件中的索引都是按照时间戳顺序排列的,所以如果在producer端随意指定时间戳,会导致该信息的时间序列混乱,这样在根据时间戳查询位移的功能时不会找到这条消息。同时kafka的消息留存策略也会受到影响,因此最好还是然让kafka自行来指定戳比较好。

发送消息

发送消息的主方法是send,producer在地城完全的实现了异步化,并且通过java的tuture同时实现了同步和异步发送 + 回调两种发送方式。

异步发送

实际上所有的写入操作都是异步的。java版本的producer的send方法会返回一个java future对象供用户稍后获取发送结果,这就是所谓的回调机制。代码如下:

producer.send(record,new CallBack(){
    @Override
    public void onCompletion(RecordMetadata metadata,Exception exception){
            if(exception == null){
            //消息发送成功
            }else{
            //执行错误处理逻辑
            }
    }
});

上面代码中的CallBack就是发送消息后的回调类,实现方法是onCompletion。该方法的两个输入参数metadata,和exception会同时非空,也就是说至少一个是null。当消息发送成功时,exception是null;繁殖,若消息发送失败,metadata就是null。因此在写producer程序时,最好写if判断。

另外,上面的Callbacl实际上是一个java接口,用户可以创建自定义的CallBack实现类来处理消息发送后的逻辑,只需要该具体类实现org.apache.kafka.clients.producer.CallBack接口即可

同步发送

同步发送和异步发送其实就是通过java的future来区分的,调用Future.get()无限等待结果返回,即实现同步发送的效果

ProduceRecord<String,String> record = new ProducerRecord<>("test",Integer.toString(i));
producer.send(record).get();

使用Future.get会一直等待下去直至kafka broker将发送结果返回给producer程序。当结果从broker处返回时gett方法要么返回发送结果要么跑出异常交由producer自行处理。如果没有错误,get方法将返回对应的RecordMetadata实例(包含了已发送消息的所有元数据信息)包括消息发送的topic、分区以及该消息在对应分区的位移信息。

不管是同步发送还是异步发送,发送都有可能失败,导致返回异常错误。当前kafka的错误类型包含了两类:可重试异常和不可重试异常。常见的可重试异常吐下。

  • LeaderNotAvailableException: 分区的副本不可用,这通常出现在leader换届选举期间,因此通常是顺时的异常,重试之后可以自行恢复

  • NotControllerException :controller当前不可用(controller是kafka集群中非常重要的角色)。这通常表名controller在经历新一轮的选举,这也是可以通过重试机制进行恢复的。

  • NetworkException:网络瞬时故障导致的异常,可重试

对于可重试的异常,如果在producer程序中配置了重试次数,那么只要在规定的重试次数内自行恢复了,便不会出现在onCompletion的exception中。不过若超过了重试次数仍旧没有成功,则会被封装进exception中。此时就需要producer程序自行处理这种异常。

所有可重试异常都继承自org.apache.kafka.common.errors.RetriableException抽象类。理论上讲所有未继承自RetriableException类的其他异常都属于不可重试异常,这类异常通常都表明了一些非常严重或kafka无法处理的问题,与producer相关的如下。

  • RecordTooLargeException: 发送的消息尺寸过大,超过了规定的大小上限。显然这种异常无论如何重试都是无法成功的。

  • SerializationException:序列化失败异常,这也是无法恢复的。

  • KafkaException:其他类型的异常

所有这些不可重试异常一旦被捕获都会被封装进Future的计算结果并返回给producer程序,用户需要自行处理这些异常。由于不可重试异常和可重试异常在producer程序端可能有不同的处理逻辑,因此可以使用下面的代码进行区分:

producer.send(record,new Callback()){
     @Override
     puvlic void onCompletion(RecordMetadata metadata,Exception exception){
            if(exception == null){
                  //消息发送成
            }else {
                  if(exception instanceof RetriableException){
                        //处理可重试异常
                  }else{
                        //处理不可重试异常
                  }
            }
      }
}

5.关闭producer

producer程序结束时一定要关闭producer!,这一点怎么强调都不为过。毕竟producer程序运行时占用了系统资源(比如创建了额外的线程、申请了很多内存以及创建了多个socket连接等)因此必须要显式地调用KafkaProducer.close方法关闭producer。不管发送是成功还是失败,只要producer程序完成了既定的工作,就应该被关闭。

如果是调用普通的无参数close方法,producer会被允许先处理之前的发送请求后再关闭,即所谓的“优雅”关闭退出(graceful shutdown);同时,kafkaProducer还提供了一个带超时的参数close(timeout)。如果调用此方法,prodcuer会等待timeout时间来完成所有处理中的请求,然后强行退出。这就是说,若timeout超时,则producer会强制结束,并立即丢弃所有未发送以及应答的发送请求。在某种程度上,这会给用户一种错觉:仿佛producer端的程序丢失了重要的消息。因此在实际场景中一定要谨慎使用带超时的close方法。

producer 主要参数

除了前面的boostrap.servers , key.serializer 和value.serializer之外,java版本producer还提供了很多其他重要的参数。详细的参数列表以及含义和默认值可以访问https://kafka.apache.org/documentation/#producer-configs

acks

acks参数用于控制producer生产消息的持久性(durability)。对于producer而言,kafka在乎的 是“已提交”消息的持久性。一旦消息成功提交,那么只要有任何一个保存了该消息的副本存活,这条消息就会被视为不可丢失的。经常碰到用户抱怨kafka的producer会丢消息,其实这里混淆了一个概念,即那些所谓的已丢失的消息其实并没有被成功写入kafka。换句话说,他们并没有被成功提交,因此kafka对这些消息的持久性不做任何保证——当然,producer API确实提供了毁掉机制供用户处理发送失败的情况。
具体来说,当producer发送一条消息给kafka集群时,这条消息会被发送到指定topic分区leader所在的broker上,producer等待从该leader broker返回消息的写入结果(当然并不是无限等待,是由超时时间的)以确定消息被成功提交。这一切完成后producer可以继续发送新的消息。kafka能够保证的是consumer永远不会读取到尚未提交完成的消息——这和关系型数据库类似,即在大部分情况下,某个事物的SQL查询都不会看到另一个事物中尚未提交的数据。
显然,leader broker何时发送写入结果返回给producer就是一个需要仔细考虑的问题了,
它也会直接影响到消息的持久性甚至是producer端的吞吐量:producer端越快的接收到leader broker响应,它就越快地发送下一条消息,即吞吐量也就越大。producer端的acls参数就是用来控制做这件事的。acks指定了在给producer发送响应前,leader broker必须要确保已成功写入消息的副本数。当前acks有3个取值:0、1和all

  • acks = 0:表示producer完全不理睬leader端的处理结果,此时,producer发送消息后立即开启下一条消息的发送,根本不等待leaderr broker端返回结果。由于不接收发送结果,因此在这种情况下producer.send的回调也就完全失去了作用,即用户无法通过回调机制感知任何发送过程中的失败,所以acks = 0时producer并不保证消息会被成功发送。但凡事有利有弊,由于不需要等待响应结果,通常这种设置下producer的吞吐量是最高的。

  • acks = all或者-1:表示当消息发送时,leader broker不仅会将消息写入本地日志,同时还会等待ISR中所有其他副本都成功写入他们各自的本地日志后,才发送响应结果给producer。显然当设置acks=all时,只要ISR中至少有一个副本处于存活的状态,那么这条消息就肯定不会丢失,因而可以达到最高的消息持久性,但通常这种设置下producer的吞吐量也是最低的。

  • acks = 1:是0和all的折中方案,也就是默认的参数值。producer发送消息后leader boker仅将该消息写入本地日志,然后便发送响应结果给producer,而无需等待ISR中其他副本写入该消息。那么此时只要该leader broker一直存活,kafka就能够保证这条消息的持久性,同时也保证了producer的吞吐量。

总结一下,acks参数控制producer实现不同程度的消息持久性,他有3个取值,对应的优缺点以及使用的场景如下:


image.png

在producer程序中设置acks非常简单,只需要在构造kafkaProducer的Properties对象中增加“acks”属性即可:

props.put("acks","1");
//或者
props.put(ProducerConfig.ACKS_CONFIG,"1");

值得注意的是,该参数的类型是字符串,因此必须要写成“1”而不是1,否则程序会报错,提示你没有指定正确的参数类型。

buffer.memory

...待续

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容