本小节我们来讨论Kafka生产者是如何发送消息到Kafka的, Kafka项目有一个生产者客户端,我们可以通过这个客户端的API来发送消息。生产者客户端是用Java写的,但Kafka写消息的协议是支持多语言的,其它语言的api可见这个wiki
概要
通过本文,你可以了解到以下内容:
- kafka producer端的整体结构,相关参数配置,以及性能优化;
- 分区器,拦截器的扩展;
- 消息序列化扩展;
- 分区器,拦截器,序列化的执行顺序;
开始
很多做业务的同学都知道,在我们系统中发送一条消息给kafka 集群,我们只需要简单的调一下已经封装好的接口,下面是来于我实际项目中的接口方法:
kafkaProducer.produce(String topic,Object msg)
每次要发消息,我就是这么简单的调用一下就能确保消息能被consumer端正常的消费,但是kafka producer做了哪些工作我却浑然不知,今天我就跟大家说到底说道这个里面到底有哪些不为人知的操作;
- 引出第一个问题,kafka消息的发送是一个什么样的过程? 这个过程中做了哪些操作?
借助于kafka官网上的API,首先给大家来一张producer端的消息流转图:
接下来,结合一段代码,给大家简单说下流程:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("compresstion.type","snappy");
props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < MAZ_RETRY_SIZE; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
流程如下:
- 首先需要指定
kafka producer
端的配置;
- zk的地址和端口;
- producer端
ack
应答机制,本demo中ack
设置为all
,表示生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的; -
retries
设置标示消息发送失败,生产者可以自动重试,但是此刻设置为0标示不重试;这个参数需要结合retry.backoff.ms
(重试等待间隔)来使用,建议总的重试时间比集群重新选举群首的时间长,这样可以避免生产者过早结束重试导致失败; -
batch.size
参数标示生产者为每个分区维护了一个未发送记录的缓冲区,这个缓冲区的大小由batch.size配置指定,配置的很大可能会导致更多的批处理,也需要更多的内存(但是对于每个活动分区,我们通常都有一个这样的缓冲区),默认是16384Bytes; -
linger.ms
指定生产者在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小,到达时间后生产者也会发送批量消息到broker.默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息.设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟; -
buffer.memory
控制生产者可用于缓冲的内存总量;消息的发送速度超过了它们可以传输到服务器的速度,那么这个缓冲空间将被耗尽.
当缓冲区空间耗尽时,额外的发送调用将阻塞.阻止时间的阈值由max.block.ms
确定,在此之后它将引发TimeoutException
.这个缓存是针对每个producerThread,不应设置高以免影响内存;
生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多 的网络请求。如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。
-
key.serializer
和value.serializer
指定了如何将key和value序列化成二进制码流的方式,也就是上图中的序列化方式; -
compresstion.type
:默认情况下消息是不压缩的,这个参数可以指定使用消息压缩,参数可以取值为snappy、gzip或者lz4;
- 接下来,我们需要创建一个
ProducerRecord
,这个对象需要包含消息的topic
和值value
,可以选择性指定一个键值key
或者分区partition
。 - 发送消息时,生产者会根据配置的
key.serializer
和value.serializer
对键值和值序列化成字节数组,然后发送到分配器partitioner
。 - 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
- 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的
Kafka broker
。 - 当
broker
接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata
对象,否则返回异常. - 生产者接收到结果后,对于异常可能会进行重试,根据参数
reties
的配置决定.
kafka发送端文件存储原理
我们普遍认为一旦涉及到磁盘的访问,数据的读写就会变得很慢,其实不然,操作系统已经针对磁盘的访问速率做了很大的优化;比如,预读会提前将一个比较大的磁盘读入内存,后写会把很多小的逻辑写操作合并起来组合成一个大的物理写操作;并且,操作系统还会将主内存剩余的所有空间都用作磁盘缓存,所有的磁盘读写都会经过统一的磁盘缓存,综上所述,如果针对磁盘的顺序读写,某些情况它可能比随机的内存访问都要快。
文件写入的逻辑无外乎一下这两种,但kafka选择了第一种,也就是a图的逻辑:
b图是首先在内存中保存尽可能多的数据,并在需要时将这些数据刷新进磁盘;
a图是所有数据立即写入磁盘,但不进行刷新数据的调用,数据首先会被传输到磁盘缓存,操作系统随后会将数据定期自动刷新到磁盘。
发送端优化
新的API中,生产者要发送消息,并不是直接发送给服务器,而是在客户端先把消息放入一个缓冲队列中,然后由一个消息发送线程从队列中拉取消息,以批盐的方式发送消息给服务端。 Kafka的记录收集器RecordAccumulator 负责缓存生产者客户端产生的消息,发送线程( Sender)负责读取记录收集器的批量消息, 通过网络发送给服务端。
开篇我们便列出了kafka 发送端的流程图,消息发送之初,首先会为消息指定一个分区(发送消息时未指定分区的情况下),对于没有键的消息,通过计数器自增轮询的方式依次将消息分配到不同的分区上;对于有键的消息,对键计算散列值,然后和主题的分区数进行取模得到分区编号,具体的客户端代码实现:
public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
//获取集群中所有的分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
//如果指定分区
if (record.partition() != null) {
// they have given us a partition, use it
if (record.partition() < 0 || record.partition() >= numPartitions)
throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
+ " is not in the range [0..."
+ numPartitions
+ "].");
return record.partition();
// 如果没有key,则负载均衡的分布
} else if (record.key() == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
if (availablePartitions.size() > 0) {
int part = Utils.abs(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.abs(nextValue) % numPartitions;
}
} else {
// 如果有key,则对key 进行hash取模运算
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
}
}
}
在客户端就为消息选择分区的目的是什么? 只有为消息选择分区,我们才能知道应该发送到哪个节点,如果随便找一个服务端节点,再由那个节点去决定如何将消息转发给其他正确的节点来保存。这种方式增加了服务端的负担,多了不必要的数据传输。
序列化
在上述代码中,我们看到了kafka producer在发送消息的时候会将key和value进行序列化,上面的程序中使用的是Kafka客户端自带的org.apache.kafka.common.serialization.StringSerializer
,除了用于String类型的序列化器之外还有:ByteArray
、ByteBuffer
、Bytes
、Double
、Integer
、Long
这几种类型,这几个序列化类都实现了org.apache.kafka.common.serialization.Serializer
接口接下来,此接口有三种方法:
-
public void configure(Map<String, ?> configs, boolean isKey)
:用来配置当前类。 -
public byte[] serialize(String topic, T data)
:用来执行序列化。 -
public void close()
:用来关闭当前序列化器。一般情况下这个方法都是个空方法,如果实现了此方法,必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。
业界用的多的序列化框架无外乎如Avro、JSON、Thrift、ProtoBuf或者Protostuff等工具,这里我就不扩展开了,读者如果感兴趣可以搜索相关的资料,下面就以一个简单的例子来介绍下如何自定义序列化方式.
假设我们有一个自定义的Company类:
@Data
public class Company {
private String name;
private String address;
}
接下来我们Company的name和address属性进行序列化,实现下Serializer接口:
public class CompanySerializer implements Serializer<Customer> {
public void configure(Map<String, ?> configs, boolean isKey) {}
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
byte[] name, address;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getAddress() != null) {
address = data.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
public void close() {}
}
使用自定义的序列化类的方式也简单,在前面的代码中替换下properties中的序列化类即可:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.xxx.kafka. CompanySerializer");
分区器
在上文的demo中,我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的(也就是我们平时工作中,发送消息时只需要指定topic和message),当不指定key时默认为null.消息的key有两个重要的作用:
- 提供描述消息的额外信息;
- 用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中.
如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询round-robin)算法来将消息均衡到所有分区.
如果key不为null且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区.注意的是,在计算消息与分区的映射关系时,使用的是全部的分区数而不仅仅是可用的分区数.这也意味着,如果某个分区不可用(虽然使用复制方案的话这极少发生),而消息刚好被分配到该分区,那么将会写入失败.另外,如果需要增加额外的分区,那么消息与分区的映射关系将会发生改变,因此尽量避免这种情况,具体的信息可以查看DefaultPartitioner
中的代码实现:
/**
* Compute the partition for the given record.
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取指定topic的partitions
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//key=null
if (keyBytes == null) {
int nextValue = nextValue(topic);
//可用分区
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
//消息随机分布到topic可用的partition中
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 无分区可利用, 给定一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
//如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值
} else {//通过hash获取partition
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
现在来看下如何自定义一个分配器,下面将key为Test的消息单独放在一个分区,与其他的消息进行分区隔离:
public class TestPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Test"))
return numPartitions; // Banana will always go to last partition
// Other records will get hashed to the rest of the partitions
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
}
使用自定义的分区器
使用很简单,在配置文件中或者properties文件中指定分区器的类即可;
props.put("partitioner.class", "com.xxx.kafka.TestPartitioner");
拦截器
Producer拦截器是个相当新的功能.对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等.同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链,Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
-
onSend(ProducerRecord)
:该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算 -
onAcknowledgement(RecordMetadata, Exception e)
:该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率 -
close
:关闭interceptor,主要用于执行一些资源清理工作,一般不作实现;
interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全.另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递.这在使用过程中要特别留意.
下面我们简单演示一个双interceptor组成的拦截链,第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数.
第一个,在send方法中,我们会创建一个新的message,把时间戳写入消息体的最前部.
public class TimeStampPrependerInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord onSend(ProducerRecord msg) {
return new ProducerRecord(
msg(), msg(), record.timestamp(), msg(), System.currentTimeMillis() + "," + msg().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
定义第二个interceptor:CounterInterceptor
,该interceptor会在消息发送后更新"发送成功消息数"和"发送失败消息数"两个计数器,并在producer关闭时打印这两个计数器;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
定义好interceptor之后,我们需要在producer中这样指定即可,代码如下:
List<String> interceptors = new ArrayList<>();
interceptors.add("com.xxx.kafka.TimeStampPrependerInterceptor"); // interceptor 1
interceptors.add("com.xxx.kafka.CounterInterceptor"); // interceptor 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// 一定要关闭producer,这样才会调用interceptors中的close方法
producer.close();
写了这么多,基本上将一个简单消息从kafka producer发送时可以做的事情弄清了,但是我还是有一个疑问,分区器,拦截器,序列化他们之间有顺序?
这个疑问留给大家自己去解决!!!