Producer负责向kafka发送消息。Producer客户端如下:
public class KafkaProducer{
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
Properties properties = new Properties();
//key.serializer和value.serializer指定key和value序列化操作的序列化器。
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//bootstrap.servers: kafka集群的broker地址
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties);
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "hello, Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
kafka producer使用方法
1.发送消息
发送消息有三种模式:
- 1.发后即忘(fire-and-forget):性能高,可靠性差
- 2.同步(sync):producer.send(record).get();
- 3.异步(Async):指定callback回调函数
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig() {
Properties props = new Properties();
//bootstrap.servers: kafka集群的broker地址
props.put("bootstrap.servers", brokerList);
//key.serializer和value.serializer指定key和value序列化操作的序列化器。
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) throws InterruptedException {
Properties props = initConfig();
//创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//构建消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
//1.发后即忘(fire-and-forget)
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
//2.同步(sync)
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
//3.异步(Async)
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + ":" + metadata.offset());
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.序列化
生产者需要使用序列化器(Serializer)把对象转换成字节数组才能通过网络发送至kafka,而在对侧,消费者需要使用反序列化器(Deserializer)把kafka中收到的字节数组转换成相应的对象。
3.分区器
分区器的作用是消息分配分区。消息经过序列化之后,需要确定它发往的分区,如果指定了分区partition字段,就不需要分区器的作用了,如果没有指定,就需要根据key字段计算partition值。
kafka提供默认的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner
实现了接口org.apache.kafka.clients.producer.Partitioner
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
}
public class DefaultPartitioner implements Partitioner {
/**
* 计算分区
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
partition的计算方式:
- 如果key为null,则按照一种轮询的方式来计算分区分配
- 如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。
4.生产者拦截器
kafka在消息序列化和计算分区之前调用生产者拦截器onSend()方法对消息进行定制化操作。 主要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
Producer客户端原理分析
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和发送线程。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息收集器(RecordAccumulator,也称为消息累加器)中。发送线程负责从消息收集器中获取消息并将其发送到 Kafka 中。
主要用来缓存消息以便发送线程可以批量发送,进而减少网络传输的资源消耗以提升性能。消息收集器缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。
主线程中发送过来的消息都会被追加到消息收集器的某个双端队列(Deque)中,在其的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。
通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。
如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。
ProducerBatch 的大小和 batch.size 参数也有着密切的关系。当一条消息流入消息收集器时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。
在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建 ProducerBatch,这段内存区域不会被复用。
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区, Deque< ProducerBatch>> 的保存形式转变成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 集群的 broker 节点。
对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,保存对象的具体形式为 Map<NodeId, Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。
与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
生产者参数
1.必选属性有3个:
- bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息,一旦其中一个宕机,生产者仍能连接到集群上。
- key.serializer:生产者接口允许使用参数化类型,可以把Java对象作为键和值传broker,但是broker希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器。key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类,默认为org.apache.kafka.common.serialization.StringSerializer,也可以实现自定义的序列化器。
- value.serializer:同上。
2.可选参数:
-
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
- acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
- acks=1:默认值即为1 。生产者发送消息之后,只要分区的leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader 副本,比如在leader 副本崩溃、重新选举新的leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader 副本并返回成功响应给生产者,且在被其他follower 副本拉取之前leader 副本崩溃,那么此时消息还是会丢失,因为新选举的leader 副本中并没有这条对应的消息。acks 设置为1 ,是消息可靠性和吞吐量之间的折中方案。
- acks=-1或all:生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1 (all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR 中可能只有leader 副本,这样就退化成了acks=1的情况。
buffer.memory:设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
max.block.ms:指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。
batch.size:当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。
retries:指定生产者可以重发消息的次数。
receive.buffer.bytes和send.buffer.bytes:指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
linger.ms:指定了生产者在发送批次前等待更多消息加入批次的时间。