Kafka生产者
1. 分区策略
1.1 分区的原因
(1) 方便在集群中扩展,提高集群的负载能力
(2) 可以提高并发,以Partition为单位读写
1.2 生产者角度,分区的原则
我们需要将producer发送的数据封装成一个ProducerRecord对象。
ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key, String value, @Nullable Iterable<Header> headers)
ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key, String value)
ProducerRecord(@NotNull String topic, Integer partition, String key, String value, @Nullable Iterable<Header> headers)
ProducerRecord(@NotNull String topic, Integer partition, String key, String value)
ProducerRecord(@NotNull String topic, String key, String value)
ProducerRecord(@NotNull String topic, String value)
(1)指明partition的情况下,直接将指明的值作为partition值;
(2)没有指明partition值,但有key的情况下,将key的hash值与topic的partition数目进行取余操作,得到partition值;
(3)即没有partition值,又没有key的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin
2. 生产者的数据可靠性保证
2.1 ISR (In-sync Replica)
Leader维护了一个动态的ISR,指的是和Leader保持同步的follower集合。
Leader发生故障后,就会从ISR中选举新的leader。
选哪些follower进ISR呢?replica.lag.time.max.ms
低版本用还同时用replica.lag.max.messages(最大的时间延迟)来选择follower,在高版本被移除了。因为生产者可以按batch发送消息。如果batch数量大于replica.lag.max.messages数量,会造成isr频繁地移入移出,还需要频繁操作zk。
producer发送消息,topic的每个partition收到数据后,都需要向producer发送ack,如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
其中partition的leader收到数据后,需要通知ISR中的follower来同步数据,同步完成之后,follower给leader发消息,leader再ack给producer。
Kafka提供了三种可靠性级别:
- acks = 0: producer不需要等待broker的ack。broker一接收到还没写入磁盘就已经返回,当broker故障时有可能丢失数据;
- acks = 1: 只等待leader的ack,如果在follower同步完成以前发生故障,则可能丢失数据;
- acks = -1(all):等待partition的leader和follower(isr)全部存好消息再返回ack。但是如果在follower同步完成之后,broker发送ack之前,leader发生故障,会造成数据重复。
ack解决的是数据丢不丢的问题。
2.2 故障处理细节
HW (High Watermark):记录了一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息。
LEO (Log End Offset):标识当前日志下一条待写入消息的位移。
follower故障:follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截掉,从HW开始向leader进行同步。等该follower的LEO大于等于该partition的HW,就可以重新加入ISR了。
leader故障:leader故障后,从ISR中选出一个新的leader。其余follower将各自log文件高于HW的部分截掉,然后从新的leader同步数据。
3. Exactly Once 精准一次性
At most once:ack = 0
At least once:ack = -1
enable.idompotence
能解决单次会话,单个分区的问题
Java API实现生产者
1.普通生产者
package com.examples.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class MyProducer {
public static void main(String[] args) throws ClassNotFoundException {
//1. 创建生产者的配置信息
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("test",i + " from Java MyProducer"));
producer.close();
}
}
2. 带回调函数的生产者
package com.examples.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class MyProducer {
public static void main(String[] args) throws ClassNotFoundException {
//1. 创建生产者的配置信息
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("test",i + " from Java MyProducer"));
producer.close();
}
}
3. 自定义生产者分区策略
package com.examples.partitioner;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class MyPartitioner implements Partitioner{
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int cnt = cluster.partitionCountForTopic(topic);
return key.toString().hashCode() % cnt;
}
public void close() {
// TODO Auto-generated method stub
}
}
package com.examples.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class PartitionProducer {
public static void main(String[] args) throws ClassNotFoundException {
//1. 创建生产者的配置信息
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
properties.put("partitioner.class", "com.examples.partitioner.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("test", "key" + i, i + " from Java PartitionProducer"),
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + "--" + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
producer.close();
}
}
4. 默认生产者分区策略
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}