kafka客户端
Kafka除了提供内置Java客户端外,还提供了二进制连接协议,即向Kafka网络端口发送适当的字节序列,就可以实现从Kafka读取消息或写入消息。许多语言实现了Kafka连接协议,从而提供了许多非Java客户端,比如Python、Go、C++等。
https://cwiki.apache.org/confluence/display/KAFKA/Clients
Kafka生产者
不同的使用场景对生产者API的使用和配置会有直接影响。比如信用卡处理系统,不允许消息丢失、重复,可以接受的写入延迟最大为500ms,吞吐量需要每秒钟能够处理一百万个消息。在比如用户点击事件存储场景,允许丢失少量消息或出现少量重复消息,写入延迟可以高一些,吞吐量则取决于网站的使用频度。
Kafka发送消息主要步骤:
首先我们会创建一个ProducerRecord对象,ProducerRecord中包含了目标topic和消息内容,我们同时还可以指定键和分区。在发送ProducerRecord对象前,生产者会把键和值数据进行序列化,以便在网络中进行传输。
接下来,数据会发发送给分区器,如果我们制定了写入分区,则分区器不会做任何事情。如果没有指定分区,分区器会根据ProducerRecord对象的键选择一个分区。选择好分区后,生产者就知道该往哪个主题的哪个分区下发送这条记录了。紧接着,这条记录会被添加到一个记录批次里面,这个批次里面的消息会被发送到相同主题和分区上。生产者中会有一个独立的线程,将批次记录发送到相应的broker上。
broker在接收到这些消息后会返回一个响应信息,如果写入成功,则会返回一个RecordMetaData对象,它包含了主题和分区信息,以及在分区的偏移量信息。如果写入失败,则会返回一个错误信息。生产者在接收到错误信息后,会进行重新发送,如果重新发送几次之后仍然失败,则生产者返回错误信息。
创建Kafka生产者
在创建Kafka生产者对象时,我们可以设置一些属性。Kafka生产者有3个必选的属性。
- bootstrap.servers:指定broker的地址清单,地址格式为:host1:port1,host2:port2。清单里不需要包含所有地址,因为生产者会从给定的broker中找到其它broker信息(仍然建议设置两个以上,防止其中一台broker宕机)。
- key.serializer:指定消息键采用的序列化类型,key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer。需要注意,即便我们只发送value,也需要设置key.serializer。
- value.serializer:同key.serializer,value.serializer指定的类会将值序列化。
创建生产者对象:
Properties kafkaProps = new Properties();
kafkaProps.put(“bootstrap.servers”,”broker1:9092,broker2:9092”);
kafkaProps.put(“key.serializer”,”org.apache.kafka.common.serialization.StringSerializer”);
kafkaProps.put(“value.serializer”,”org.apache.kafka.common.serialization.StringSerializer”);
Producer<String,String> producer = new KafkaProducer<>(kafkaProps);
发送消息:
ProducerRecord<String,String> record = new ProducerRecord<>("my-demo-topic”,”message-key”,”message-value")
producer.send(record);
消息发送方式
Kafka生产者发送消息有两种方式:同步发送和异步发送。
同步发送
producer.send()方法会返回一个Future对象,如果我们调用Future对象的get()方法,则生产者会等待Kafka的相应。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,则会得到一个RecordMetaData对象。
Future<RecordMetaData> future = producer.send(record);//返回future对象
try {
RecordMetadata recordMetadata = future.get();//同步阻塞等待返回结果
System.out.println("topic:" + recordMetadata.topic() + ",partition:" + recordMetadata.partition() + "offset:" + recordMetadata.offset());//获取写入结果元数据信息
}catch (Exception e) {
e.printStackTrace();
}
异步发送
同步发送结果写入效率非常低,并且很多时候我们不需要等待RecordMetaData,这时候我们就可以采用异步发送了。这时候我们只需要不调用get()方法即可实现异步发送消息。对于异步发送,我们有可能需要知道当消息发送失败时,我们能够感知到。为了在异步发送消息的同时能够对异常消息进行处理,生产者提供了回调支持。
class ProducerCallBack implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {
//出现异常,进行处理
e.printStackTrace();
}
}
}
producer.send(record,new ProducerCallBack());
我们的回调类需要实现org.apache.kafka.clients.producer.CallBack
接口,这个接口只有一个onCompletion方法。当Kafka返回一个错误信息时,omComplete方法中的异常对象会为一个非空值。
错误类型
KakfaProducer一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息来解决,比如连接错误、无主(no leader)错误等。KafkaProducer可以配置成自动重试,如果多次重试之后的结果仍然无法解决,则应用程序会收到一个重试异常。另一类是无法通过重试能够解决的异常,比如消息太大,对于这类异常Kafka不会进行重试,而是直接抛出异常。
需要注意的是,我们最好在producer.send()方法周围使用try-catch来捕捉一下异常。因为在消息发送之前,生产者还有可能发生其它异常,比如SerializationException(序列化失败)、BufferExhaustedException或TimeoutException(缓冲区已满)、又或者InterruptException(发送线程被终端)等。
序列化器
Kafka自带了字符串、整数和字节数组序列化器,但是还不能满足大部分场景。如果发送到Kafka的对象不是简单的字符串或整数,那么可以使用序列化框架来创建消息记录,比如Avro、Thrift、Protobuf,或者自定义序列器。
自定义序列化器
为了对Kafka序列化器有一个深入的了解,我们可以自定义一个序列化器,但是生成环境中,不建议使用自定义序列化器,除非场景非常特殊。
首先创建一个数据bean:
public class Customer {
private int customerId;
private String customerName;
public int getCustomerId() {
return customerId;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerId(int customerId) {
this.customerId = customerId;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
}
创建序列化器,实现org.apache.kafka.common.serialization.Serializer
接口:
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, Customer customer) {
byte[] serializedName;
if(customer == null) {
return null;
}else {
try {
if(customer.getCustomerName() != null) {
serializedName = customer.getCustomerName().getBytes("UTF-8");
} else {
serializedName = new byte[0];
}
//前四个字节存储id,后四个字节存储name长度,最后存储name
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + serializedName.length);
buffer.putInt(customer.getCustomerId());
buffer.putInt(serializedName.length);
buffer.put(serializedName);
return buffer.array();
}catch (IOException ioe) {
throw new SerializationException("serializer error!");
}
}
}
@Override
public void close() {
}
}
接下来就可以直接使用这个序列化器了,首先将value.serializer
设置为该类,然后就能使用ProducerRecord<String,Customer>
了。
自定义序列化器有许多问题,比如版本变迁,需要考虑版本兼容,多团队写kafka需要使用相同的序列化器等等。所以还是推荐已经程序的序列化框架,比如Protobuf、Avro、JSON或Thrift等。
之后可以学习整理一下在Kafka中使用第三方序列化框架。
分区器
Kafka消息是一个个键值对,但是键可以为null。键的用途主要有两个:作为消息的附加信息和用来分区。
如果消息键为null,即new ProducerRedcord<>(“topic”,”value")
不设置key值,并且使用默认分区,那么所有消息会被随机发送到主题的各个可用分区上。分区器使用轮询算法将消息均衡地分布到各个可用分区上。
如果消息键不为null,并且使用了默认分区器,那么Kafka会使用自己的散列算法对键进行散列,然后根据散列值将消息映射到特定的分区上(所有相同的键都会映射到同一个分区上)。这里需要注意,这时候散列映射是对topic的所有分区,而不仅仅是可用的分区,如果这时候有分区不可用,就可能会发生错误。还需要知道,如果主题的分区数发生了改变,根据键的散列映射就可能发生变化。
自定义分区器
如果我们不想使用默认的分区器,也可以实现自己的分区器。
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic("testTopic");
int numPartitions = partitions.size();
//通过key的长度对分区数取模
if(keyBytes == null || !(key instanceof String))
throw new InvalidRecordException("key is null or not is string!");
String keyStr = String.valueOf(key);
return keyStr.length() % (numPartitions - 1);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
//通过producer参数设置分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.learn.tc.kafka.demo.CustomPartitioner");
生产者配置
生产者除了上面三个必要配置参数,还有许多可配置的参数,它们大部分都有合理的默认值,所以不需要修改它们。但是有一些参数在内存使用、性能和可靠性方面对生产者影响比较大。下面来看一下:
配置项 | 类型 | 默认值 | 取值范围 | 描述 |
---|---|---|---|---|
acks | string | 1 | [all,-1,0,1] | 需要多少个分区副本接收到消息,才认为写入成功。acks=0:生产者不会等待服务器响应,如果消息丢失,生产者也无感知。因为无需等待相应,所以能够以网络支持的最大速度发送消息,从而达到很高的吞吐量。acks=1:集群leader接受到消息后,生产者就会接受到来自服务器的成功相应。如果leader接受到消息后挂掉了,还没有来得及向其它节点发送消息,则可能出现消息丢失的问题。acks=all:所有参与服务的节点都接受到消息时,生产者才会收到写入成功的相应。这是最安全的模式,不过它的延迟也相对高一些。该设置同acks=-1。 |
buffer.memory | long | 33554432(32M) | [0,…] | 生产者内存缓冲区大小,生产者使用它缓存要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。send()方法会阻塞到max.block.ms参数所配置时间,如果超过这个时间,则会抛出异常。 |
max.block.ms | long | 60000 | [0,...] | 阻塞等待最长时间,比如缓冲区已满,或者没有可用元数据时,这些方法就会阻塞。在阻塞达到max.block.ms之后就会抛出超时异常。 |
compression.type | string | none | [none,gzip,snappy,lz4] | 生产者发送到服务器的消息是否进行压缩,默认是不压缩的。可以选择gzip(客观的压缩比)、snappy(节省CPU)或lz4。可以降低网络传输开销和存储开销。 |
retries | int | 0 | [0,…,2147483647] | 当生产者接收到错误是临时性错误时,可以通过retries参数设置重发消息的次数。重试时间间隔由retry.backoff.ms参数指定,默认为100ms(这个时间最好测试一下节点恢复时间,总尝试次数时间大约节点恢复时间)。 |
batch.size | int | 16384(16k) | [0,...] | 当多个消息发送到同一个分区时,生产者会把他们放到同一个批次里。该参数指定了一个批次可以使用的内存大小。当被填满后,这个批次消息就会被发送出去(不一定等填满才发送,达到了linger.ms上限)。如果设置为0,则每条消息都会立即发送。它的大小需要你在内存、吞吐量和延迟上做出一个平衡。 |
linger.ms | long | 0 | [0,...] | 指定生产者在发送批次之前等待多长时间,KakfaProducer会在批次填满或linger.ms达到上限时把批次消息发送出去。默认为0,即只要有线程可用,就会将批次消息发送出去。 |
client.id | string | "" | 任意字符串,服务器会用它标识消息来源,还可用在日志和配额指标里。 | |
max.in.flight.requests.per.connection | int | 5 | [1,...] | 指定生产者在收到服务器响应之前可以发送多少个消息。值越高,占用内存越高,并且可能导致重排序风险(失败重试导致),但是可以得到很高的吞吐量。 |
timeout.ms | int | 30000 | [0,...] | 指定了broker等待副本同步返回确认消息的时间,与acks的配置相匹配(设置为大于1)。 |
request.timeout.ms | int | 30000 | [0,...] | 生产者发送数据时等待服务器返回相应时间。 |
max.request.size | int | 1048576(1M) | [0,...] | 控制生产者发送数据的大小,但这个是指生产者向服务器发送的批次的大小,即该批次下的消息总大小。(感觉和batch.size在使用上有点相似) |
receive.buffer.bytes | int | 32768(32k) | [-1,...] | 指定TCP socket接受数据包的缓冲区大小,如果被设置成-1,就是用操作系统的默认值。当生产者或消费者不在同一个数据中心的时候,可以适当调大,因为跨数据中心,网络传输有较高延迟和比较高低的延迟带宽。 |
Kafka消费者
消费者和消费者群组
消费Kafka中的数据,Kafka提供了消费者和消费者群组的概念。Kafka消费者从属于消费者群组,一个群组里的消费者订阅的是相同主题,也就是主题订阅是消费者群组级别的订阅,而不是单个消费者订阅。消费者群组中的每个消费者都会接受订阅主题的部分分区的数据,也就是说同一个分区中的消息只会被消费者群组中的一个消费者所消费。
为什么要使用消费者群组的概念
消费者群组是为了横向伸缩消费者处理能力主要方式,当一个消费者的处理能力跟不上生产者的生产速度时,这时候就需要增加消费者,从同一个主题中读取消息,对消息进行分流,从而扩展了数据处理能力。
消费者如何对主题消息进行分流
下面是一组主题T1和对应消费者群组的映射图:
当消费者群组中只有一个消费者时候,这个消费者会接受主题内所有分区的数据:
当消费者群组有两个消费者时候,这个两个消费者会平分主题内的所有分区:
当消费者个数与分区数相同时,每个消费者都会消费特定分区消息,即分区与消费者进行了一一映射:
当消费者个数超过了分区数时,这时候多于的消费者就会闲置:
从中可以看到,消费者群组中有效的消费者个数,是由消费主题决定的。往消费者群组里面增加消费者是横向伸缩消费能力的主要方式,由于Kafka消费者经常做一些高延迟的操作,这种情况下单个消费者无法跟上数据的生产速度,所以需要添加更多的消费者。这时候也就需要为主题创建大量的分区,在负载增长时可以加入更多的消费者。
Kafka还支持多个应用程序消费相同的主题,在这种情况下,每个应用程序都会获取到主题的所有消息,而不是部分消息。这时候只要保证每个应用程序有自己的消费者群组即可。
分区再均衡
当群组中新增消费者或移除消费者时,会触发分区重新分配,新增的消费者消费的分区原本是由其它分区消费的,而移除的消费者,原本由它处理分区会分配给其它消费者。除了新增或移除消费者外,新增分区也会导致再分区。
分区的所有权发生变更,这种行为称为再均衡。再均衡非常重要,因为它为消费者群组带来了高可用和伸缩型。但是再均衡会导致消费者群组一小段的时间不可用,并且当一个分区重新分配给一个消费者时,消费者的当前状态会丢失,它有可能还需要去刷新缓存,从而拖慢整个应用程序。所以我们应该尽量避免再均衡的问题。
Kafka集群如何感知消费者不可用
消费者是通过向群组协调器(Kafka为每个消费者群组都指定了一个broker)来维持它和群组的关系以及它对分区的所有权关系,只要消费者在规定时间内向发送心跳,就会被认为活跃的,说明它还在读取分区中的消息。消费者是利用轮询消息或提交偏移量时发送心跳的,如果在规定时间间隔内没有发送心跳,群组协调器就会认为该消费者已经死亡,就会触发一次再均衡。
需要注意,在0.10.1之后的版本里,Kafka引入了一个独立的心跳线程,可以在轮询消息的空挡时间发送心跳。这样发送心跳的频率与消息轮询的频率就进行了解耦。
创建Kafka消费者
创建Kakfa消费者需要创建KafkaConsumer对象,KafkaConsumer和KakfaProducer一样,也有一些必要属性。
- bootstrap.server:指定了Kakfa集群的连接字符串,用途和KakfaProducer一样。需要注意,在0.8.2版本中这个参数是zookeeper.connect,即Kafka集群所使用的zk集群地址。
- key.deserializer:与KafkaProducer的key.serializer一样,只不过是负责将key由字节转为java对象。
- value:deserializer:与KakfaProducer的value.serializer一样,只不过是负责将value由字节转为java对象。
- group.id:不是必须的字段,但是一般都会设置,这个是该消费者所属的消费者群组。
Properties props = new Properties();
props.put("bootstrap.servers","192.168.0.1:9092");
props.put("group.id","test");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
创建完KafkaConsumer之后,接下来就需要订阅消费的主题了,它支持订阅一个主题列表,同时也支持正则表表达式,当有满足正则表达式的主题被创建后,消费者群组就会触发一次再均衡,来消费新的主题。
consumer.subscribe(Arrays.asList(“my-demo-topic”));
接下来就是轮询消息了,因为消费者是一个常驻任务,所以将其放在一个无限循环中。
try{
while (true) {
ConsumerRecords<String,String> records = consumer.poll(100);
for(ConsumerRecord<String,String> record : records) {
System.out.printf("offset = %d ,key = %s, value = %s%n",record.offset(),record.key(),record.value());
}
}
}finally{
consumer.close();
}
poll()方法就是对Kafka进行轮询(心跳发送也在这时候),poll()方法中是一个超时参数,当消费者缓冲区里面没有可用数据时,会发生阻塞,当到达超时时间后,无论缓冲区有没有数据都会返回。如果该参数设为0,pll()方法会立即返回,否则它会在超时时间内一直等待broker返回数据。
poll()方法会返回一个ConsumerRecords列表,每个ConsumerRecords代表一条记录,它记录了主题信息、分区、偏移量,以及对应的键值对。
最后在退出应用程序之前,调用close方法关闭消费者。网络连接和socket也会随之关闭,并且立触发发一次再均衡,而不是等待群组协调器发现死亡,从而节省了再分配的时间。
消费者配置
配置项 | 类型 | 默认值 | 取值范围 | 描述 |
---|---|---|---|---|
fetch.min.bytes | int | 1 | [0,...] | 指定了消费者从服务器获取记录的最小字节数,如果broker可用数据小于该值,那么broker会等到有足够数据时才会返回。能够降低消费者和broker的负载。 |
fetch.max.wait.ms | int | 500 | [0,…] | 指定了有足够数据时在返回消费者,而fetch.max.wait.ms则指定了等待时间。当kafka接收到请求后,或者达到feth.min.bytes指定大小返回,或者达到fetch.max.wait.ms指定的时间返回,看哪个先满足。 |
max.partition.fetch.bytes | int | 1048976(1M) | [0,...] | 指定了服务器从每个分区里返回给消费者的最大字节数,这个配置应该比broker能够接受的最大消息大,否则消费者可能无法读取这些大消息,导致消费之一直挂起重试。 |
session.timeout.ms | int | 10000 | 指定了消费者被认为死亡之前,能够与服务器断开连接的时间。即在超过该参数指定的时间外,没有发送心跳,就会被认为死亡,触发再均衡。该参数与heartbeat.interval.ms相关,因为它指定了发送心跳的时间间隔。 | |
heartbeat.interval.ms | int | 3000 | 用途如上所说,控制发送心跳的时间间隔。一般为session.timeout.ms的1/3。可以将它们设置的低一些,以便尽快触发再均衡。 | |
auto.offset.rest | string | latest | [latest,earliest,none] | 指定了消费者在读取一个没有偏移量的分区或者偏移量无效(消费者长时间失效,偏移量被删除)时,该从哪里读取。默认为latest,即最新记录开始读取。earliest为从起始位置读取。none为当找不到偏移量时抛出异常。 |
enable.auto.commit | boolean | true | 指定了消费者是否自动提交偏移量。为了避免数据丢失或出现重复读取数据,可以改为false,由自己控制提交偏移量。 | |
max.poll.records | int | 500 | 指定了单次调用poll()方法所能够返回最大数据量。 |
提交偏移量
Kafka不同其它的消息系统需要等待消费者确认才认为该消息被处理了,Kafka是采用偏移量来帮助消费者追踪分区内消息被处理的位置,消费者更新当前位置的操作在kafka中称为提交。
消费者会向一个叫做_consumer_offset的特殊topic中发送消息,消息包含了每个分区的偏移量。如果消费者处于一直运行状态,这个偏移量是没有用的,但是当消费者崩溃或者有新的消费者加入消费者群组是,就会触发再均衡。这时候每个消费者可能分配到新的分区,为了能够继续之前的工作,消费者需要读取每个分区的偏移量,然后从最后提交偏移量的位置继续工作。
当发生再均衡的时候,提交偏移量方式会对客户端影响很大,会导致重复消费消息或导致消息丢失。
如果最后提交偏移量的位置小于客户端最后处理消息的位置,那么会导致两个偏移量之间消息被重复处理。
如果最后提交偏移量的位置大于客户端最后处理消息的位置,那么会导致两个偏移量之间消息丢失。
针对客户端不同的需求,Kafka的Consumer提供了多种提交偏移量的API。
自动提交
Kafka提供了自动提交偏移量的方式,这也是最简单的方式。通过将enable.auto.commit(默认为true,老版本叫auto.commit.enable)设置为true,然后每隔auto.commit.interval.ms(默认为5s)会自动将从poll()方法接收到的最大偏移量提交上去。这种提交方式,也和消费者的其它东西一样,自动提交也是在轮询过程中完成的。每次轮询自动提交上去的偏移量,都是上一次poll方法返回的最大偏移量,它并不知道哪些消息真正被处理了,所以在每次调用之前,最好确保当前poll()方法返回的消息都被处理了,消费者也会在调用close()方法之前进行自动提交。
需要注意使用自动提交这种方式,一般不会有什么问题,不过在产生异常或者提前退出轮询时,就会产生我们上面说的两种情况,可能导致消息重复消费或消息丢失。如果是对消息处理严格的业务,建议不要使用这种方式。
我们可以通过缩小提交偏移量的时间来消除消息丢失的可能性,并在再均衡时减少重复消息的数量。
手动提交
消费者还提供了另外一种提交偏移量的方式,就是把enable.auto.commit设置为false,然后通过commitSync()方法提交偏移量。commitSync()方法是最可靠也是最简单的方式,它会把poll()方法返回的最新偏移量提交上去,如果提交成功立马返回,如果提交失败则会抛出异常。
需要注意的是commitSync()方法一定要在poll()方法返回的消息处理完成后在提交,否则还会有消息丢失的风险的。commitSync()能够有效避免消息丢失情况,但是还会存在消息重复处理的问题的,当发生再均衡时,从最近一批消息到发生再均衡之前的消息会被重复处理。
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//处理逻辑
}
try {
consumer.commitSync(); //处理完本批次消息在提交偏移量
}catch (Exception e) {
LOG.error("commit offset failed",e);
}
}
当使用commitSync()提交偏移量发生异常时,它会进行重试提交,直到提交成功。
通过commitSync()的方法名称就能直到,这是一个同步提交偏移量的方法,在结果返回之前,它会一直阻塞,这样会限制应用程序的吞吐量。虽然我们可以通过降低提交频率来提升吞吐量,但是当发生再均衡时,会导致增大重复处理消息数量。
异步提交
Kafka提供了commitAsync()方法来提交偏移量,commitAsync()在提交完偏移量后立马返回,并且比如偏移量提交失败,它也不会进行重复提交,因为下一批次更大的偏移量可能已经提交了,重复提交会导致覆盖更大的偏移量。不过commitAsync()方法提供回调,当broker做出相应时执行回调。回调一般用来记录异常和度量指标,如果我们重新提交偏移量,一定要注意提交顺序,因为有可能更大偏移量已经提交了,一般可以通过在回调加一个递增的序列号来维护异步提交的顺序。
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//处理逻辑
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null) {
LOG.error("Commit offset fail.",e);
}
}
});
}
同步和异步组合提交
一般偶尔出现提交失败,不进行重试影响不大,因为提交失败时临时问题导致的,那么后续总有提交成功的时候。但是如果这时关闭消费者或者再均衡最后一次的提交,就需要确保提交成功。
try {
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//处理逻辑
}
consumer.commitAsync();
}
}catch (Exception e) {
e.printStackTrace();
}finally {
try {
consumer.commitSync();
}catch (Exception e) {
e.printStackTrace();
}
}
一般时候使用异步提交,来保证系统的吞吐量。如果直接关闭消费者,就需要使用commitSync()方法确保一定提交成功。
提交指定位置的偏移量
commitSync()和commitAsync()方法每次只会提交poll()下来消息的最大偏移量,有时候我们poll()下来大批量数据,为了避免再均衡引起的大量重复处理消息,可能想在中途过程中就提交偏移量。Kafka针对这种情况,允许我们在调用commitAsync()和commitSync()时传入希望提交的分区和对应的偏移量。
Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//处理逻辑
currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset() + 1));
if(count % 100000 == 0) {
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null) {
LOG.error("commit offset fail!");
}
}
});
}
}
}
之所以使用Map存储分区和offset,是因为一个消费者可能订阅了多个topic。
再均衡监听器
当发生再均衡时,我们可能有一些事情需要做,比如提交最后的偏移量。Kafka为我们提供了再均衡监听器,当发生再均衡时会调用监听器中的相应方法。再均衡监听器是在consumer订阅主题时传递进去的,你需要实现ConsumerRebalanceListener接口。
class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//再均衡开始之前和消费者停止读取消息之后调用
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//再均衡分区重新分配之后和消费者开始消费之前调用
}
}
使用再均衡监听器来提交偏移量。
class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//再均衡开始之前和消费者停止读取消息之后调用
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//再均衡分区重新分配之后和消费者开始消费之前调用
}
}
consumer.subscribe(Arrays.asList("demo-topic"),new RebalanceHandler());
int count = 0;
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//处理逻辑
currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset() + 1));
if(count % 100000 == 0) {
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null) {
LOG.error("commit offset fail!");
}
}
});
}
}
}
}
从特定偏移量处理记录
Kafka提供从指定位置读取消息的API以便我们使用,如果想要从开始分区的起始位置开始读取,或者直接跳到分区的末尾开始读取消息,可以使用seekToBeginning(Collection<TopicPartition> tp)
方法和seekToEnd(Collection<TopicPartition> tp)方法。
consumer.seekToBeginning(Arrays.asList(new TopicPartition("demo-topic",0))); //指定topic和分区
consumer.seekToEnd(Arrays.asList(new TopicPartition("demo-topic",1)));
Kafka也为我们提供了查找特定偏移量的API,它有许多用途,比如向前或向后偏移几个记录等等。seek()方法能够用来指定特定的偏移量,我们只需要指定topic、分区以及具体的偏移量即可。
consumer.seek(new TopicPartition(“demo-topic”,3),1500); //主题demo-topic的分区3,从偏移量1500开始读取
使用seek()方法指定偏移量读取,一般是将偏移量存储到Kafka之外了。因为Kafka的提交偏移量无论怎么优化都有可能导致重读消息的风险(再均衡监听器只能对正常服务的消费者起作用,如果消费者挂掉后,是触发不到的),因为消费数据和提交偏移量本身不是一个原子操作。这时候我们可以能借助外部存储系统,比如我们将从Kafka处理之后的数据写到Mysql,然后也将偏移量写到Mysql中,然后通过事务将二者关联起来,这样要么都处理成功,要么都处理失败。
class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//即将失去分区前提交事务,确保记录成功保存
//commitDBTransaction();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//从数据库获取偏移量,再分配到新分区后,使用seek()定位
for(TopicPartition partition : partitions){
//consumer.seek(partition,getOffsetFromDB(partition));
}
}
}
consumer.subscribe(Arrays.asList("demo-topic"),new RebalanceHandler());
//调用一次pool()方法,目的是让消费者加入到消费者群组,并获取到分区,然后调用seek()方法定位到分区偏移量
consumer.poll(Duration.ofMillis(0));
for (TopicPartition partition : consumer.assignment()) {
//consumer.seek(topic,getOffsetFromDB(partition)); //获取每个主题分区的维值
}
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//processRecord(record);//处理逻辑
//storeRecord(record);//保存处理记录
//storeOffsetInDB(record.topic(),record.partition(),record.offset());//保存偏移量
}
//commitDBTransaction();//提交事务
}
}
消费者优雅退出
Kafka提供了consumer.weakup()方法通过抛出异常的方式来停止消费者,weakup()是kafka提供的唯一一个可以从其它线程里安全调用的方法。调用consumer.weakup()可以退出poll(),并且抛出WeakupException异常,我们不需要处理这种异常,因为它只是跳出循环的一种方式。还需注意,在退出线程之前,还需要调用consumer.close()方法,它会提交任何还没有提交的任务,并向群组协调器发送消息,告知离开,立即触发再均衡,而无需等待会话超时。
//main shuthook()
public void shuthook(Consumer consumer,Thread mainThread) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
mainThread.join(); //主线程等待
}catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
//
try{
while(true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//处理逻辑
}
consumer.commitAsync();
}
}catch(WakeupException e) {
//无需处理
}finally {
consumer.close()
}
自定义反序列化器
生产者中我们说了如何定义序列化器,消费者中的反序列化器定义和序列化器定义一样,只不过是实现Deserializer接口,其它使用方式都和生产者一样。