1.Producer代码实现
// 同步发送消息、// 异步发送消息
public class KafkaProducerDemo {
private static Properties prop;
private static KafkaProducer<String, String> producer;
private static final String TOPIC_NAME = "topic-07";
static {
// 具体的参数名可以从CommonClientConfigs、ProducerConfig获取
prop = new Properties();
prop.put("bootstrap.servers", "192.168.111.129:9092,192.168.111.129:9093,192.168.111.129:9094");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("acks", "-1");// acks=0 表示不关系是否发送成功;acks=1表示leader写入成功;acks=-1或acks=all表示所有副本写入成功
prop.put("retries", 0);
prop.put("batch.size", 16384);// 消息发送批次大小,16KB,消息大小超过这个大小了,就发送
prop.put("linger.ms", 1);// 消息发送间隔时间,1ms,等待时间到了,不管消息大小是否满足,就发送
prop.put("buffer.memory", 33554432);
producer = new KafkaProducer<>(prop);
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
syncSendMessage(i+"");
}
//asyncSendMessage();
}
/**
* 异步发送消息
**/
private static void asyncSendMessage() {
ProducerRecord<String, String> record = new roducerRecord<>(TOPIC_NAME, "kafka third message");
try {
Future<RecordMetadata> result = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
System.out.println("分区:" + result.get().partition() + ", offset:" + result.get().offset());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 同步发送消息
**/
private static void syncSendMessage(String content) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, content, "kafka second message");
try {
RecordMetadata result = producer.send(record).get();
System.out.println("分区:" + result.partition() + ", offset:" + result.offset());
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. 分区器
- 客户端可以控制将消息发送到哪个分区
-
(org.apache.kafka.clients.producer.internals.DefaultPartitioner)
- 如果记录中指定了分区,则直接使用
- 如果未指定分区,但指定了key值,则根据key的hash值选择一个分区(相同的key所发送到的Partition是同一个,可用来保证消息的局部有序性)
- 如果未指定分区,也未指定key值,则以 '黏性分区' 策略(2.4版本以前使用轮询策略)选择一个分区
- 分区策略
-
(org.apache.kafka.clients.producer.RoundRobinPartitioner)
- 如果key值为null,并且使用了默认的分区器,Kafka会根据轮训(Random Robin)策略将消息均匀地分布到各个分区上。
-
(Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions)
- 如果键值不为null,并且使用了默认的分区器,Kafka会对键进行散列,然后根据散列值把消息映射到对应的分区上
-
(org.apache.kafka.clients.producer.UniformStickyPartitioner)
- 很多时候消息是没有指定Key的。而Kafka 2.4之前的策略是轮询策略,这种策略在使用中性能比较低。所以2.4中版本加入了黏性分区策略(Sticky Partitioning Strategy)。
- 黏性分区器(Sticky Partitioner)主要思路是选择单个分区发送所有无Key的消息。一旦这个分区的batch已满或处于“已完成”状态,黏性分区器会随机地选择另一个分区并会尽可能地坚持使用该分区——象黏住这个分区一样
-
- 默认分区器是使用次数最多的分区器。除了散列分区之外,用户可以根据需要对数据使用不一样的分区策略
- 实现org.apache.kafka.clients.producer.Partitioner接口,在配置中设置实现的类prop.put("partitioner.class", 实现类);