三 Kafka生产者分区策略,Java API

Kafka生产者

1. 分区策略
1.1 分区的原因

(1) 方便在集群中扩展,提高集群的负载能力
(2) 可以提高并发,以Partition为单位读写

1.2 生产者角度,分区的原则

我们需要将producer发送的数据封装成一个ProducerRecord对象。

ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key, String value, @Nullable Iterable<Header> headers)

ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key, String value)

ProducerRecord(@NotNull String topic, Integer partition, String key, String value, @Nullable Iterable<Header> headers)

ProducerRecord(@NotNull String topic, Integer partition, String key, String value)

ProducerRecord(@NotNull String topic, String key, String value)

ProducerRecord(@NotNull String topic, String value)

(1)指明partition的情况下,直接将指明的值作为partition值;
(2)没有指明partition值,但有key的情况下,将key的hash值与topic的partition数目进行取余操作,得到partition值;
(3)即没有partition值,又没有key的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin

2. 生产者的数据可靠性保证

2.1 ISR (In-sync Replica)
Leader维护了一个动态的ISR,指的是和Leader保持同步的follower集合。
Leader发生故障后,就会从ISR中选举新的leader。

选哪些follower进ISR呢?replica.lag.time.max.ms
低版本用还同时用replica.lag.max.messages(最大的时间延迟)来选择follower,在高版本被移除了。因为生产者可以按batch发送消息。如果batch数量大于replica.lag.max.messages数量,会造成isr频繁地移入移出,还需要频繁操作zk。

producer发送消息,topic的每个partition收到数据后,都需要向producer发送ack,如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

其中partition的leader收到数据后,需要通知ISR中的follower来同步数据,同步完成之后,follower给leader发消息,leader再ack给producer。

Kafka提供了三种可靠性级别:

  • acks = 0: producer不需要等待broker的ack。broker一接收到还没写入磁盘就已经返回,当broker故障时有可能丢失数据;
  • acks = 1: 只等待leader的ack,如果在follower同步完成以前发生故障,则可能丢失数据;
  • acks = -1(all):等待partition的leader和follower(isr)全部存好消息再返回ack。但是如果在follower同步完成之后,broker发送ack之前,leader发生故障,会造成数据重复。

ack解决的是数据丢不丢的问题。

2.2 故障处理细节
HW (High Watermark):记录了一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息。
LEO (Log End Offset):标识当前日志下一条待写入消息的位移。

follower故障:follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截掉,从HW开始向leader进行同步。等该follower的LEO大于等于该partition的HW,就可以重新加入ISR了。
leader故障:leader故障后,从ISR中选出一个新的leader。其余follower将各自log文件高于HW的部分截掉,然后从新的leader同步数据。

3. Exactly Once 精准一次性

At most once:ack = 0
At least once:ack = -1

enable.idompotence
能解决单次会话,单个分区的问题

Java API实现生产者

1.普通生产者
package com.examples.kafka_producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyProducer {
    public static void main(String[] args) throws ClassNotFoundException {
        //1. 创建生产者的配置信息
        Properties properties = new Properties();
        
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 3);
        
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        
        for (int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("test",i + " from Java MyProducer"));
        
        producer.close();
    }
}
2. 带回调函数的生产者
package com.examples.kafka_producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyProducer {
    public static void main(String[] args) throws ClassNotFoundException {
        //1. 创建生产者的配置信息
        Properties properties = new Properties();
        
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 3);
        
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        
        for (int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("test",i + " from Java MyProducer"));
        
        producer.close();
    }
}
3. 自定义生产者分区策略
package com.examples.partitioner;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class MyPartitioner implements Partitioner{

    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int cnt = cluster.partitionCountForTopic(topic);
        
        return key.toString().hashCode() % cnt;
    }

    public void close() {
        // TODO Auto-generated method stub
        
    }

}
package com.examples.kafka_producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class PartitionProducer {
    public static void main(String[] args) throws ClassNotFoundException {
        //1. 创建生产者的配置信息
        Properties properties = new Properties();
        
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        
        properties.put("partitioner.class", "com.examples.partitioner.MyPartitioner");
        
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        
        for (int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("test", "key" + i, i + " from Java PartitionProducer"),
                    new Callback() {

                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (exception == null) {
                                System.out.println(metadata.partition() + "--" + metadata.offset());
                            } else {
                                exception.printStackTrace();
                            }
                        }
                
            });
        
        producer.close();
    }
}
4. 默认生产者分区策略
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}

}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容