[kafka系列]之producer端消息发送

本小节我们来讨论Kafka生产者是如何发送消息到Kafka的, Kafka项目有一个生产者客户端,我们可以通过这个客户端的API来发送消息。生产者客户端是用Java写的,但Kafka写消息的协议是支持多语言的,其它语言的api可见这个wiki


概要

通过本文,你可以了解到以下内容:

  • kafka producer端的整体结构,相关参数配置,以及性能优化;
  • 分区器,拦截器的扩展;
  • 消息序列化扩展;
  • 分区器,拦截器,序列化的执行顺序;

开始

很多做业务的同学都知道,在我们系统中发送一条消息给kafka 集群,我们只需要简单的调一下已经封装好的接口,下面是来于我实际项目中的接口方法:

kafkaProducer.produce(String topic,Object msg)

每次要发消息,我就是这么简单的调用一下就能确保消息能被consumer端正常的消费,但是kafka producer做了哪些工作我却浑然不知,今天我就跟大家说到底说道这个里面到底有哪些不为人知的操作;

  • 引出第一个问题,kafka消息的发送是一个什么样的过程? 这个过程中做了哪些操作?

借助于kafka官网上的API,首先给大家来一张producer端的消息流转图:

消息发送.png

接下来,结合一段代码,给大家简单说下流程:

    Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("compresstion.type","snappy");
        props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < MAZ_RETRY_SIZE; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
producer.close();

流程如下:

  • 首先需要指定kafka producer端的配置;
  1. zk的地址和端口;
  2. producer端ack应答机制,本demo中ack设置为all,表示生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的;
  3. retries设置标示消息发送失败,生产者可以自动重试,但是此刻设置为0标示不重试;这个参数需要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举群首的时间长,这样可以避免生产者过早结束重试导致失败;
  4. batch.size参数标示生产者为每个分区维护了一个未发送记录的缓冲区,这个缓冲区的大小由batch.size配置指定,配置的很大可能会导致更多的批处理,也需要更多的内存(但是对于每个活动分区,我们通常都有一个这样的缓冲区),默认是16384Bytes;
  5. linger.ms 指定生产者在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小,到达时间后生产者也会发送批量消息到broker.默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息.设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟;
  6. buffer.memory控制生产者可用于缓冲的内存总量;消息的发送速度超过了它们可以传输到服务器的速度,那么这个缓冲空间将被耗尽.
    当缓冲区空间耗尽时,额外的发送调用将阻塞.阻止时间的阈值由max.block.ms确定,在此之后它将引发TimeoutException.这个缓存是针对每个producerThread,不应设置高以免影响内存;

生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多 的网络请求。如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。

  1. key.serializervalue.serializer指定了如何将key和value序列化成二进制码流的方式,也就是上图中的序列化方式;
  2. compresstion.type:默认情况下消息是不压缩的,这个参数可以指定使用消息压缩,参数可以取值为snappy、gzip或者lz4;
  • 接下来,我们需要创建一个ProducerRecord,这个对象需要包含消息的topic和值value,可以选择性指定一个键值key或者分区partition
  • 发送消息时,生产者会根据配置的key.serializervalue.serializer对键值和值序列化成字节数组,然后发送到分配器partitioner
  • 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
  • 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的Kafka broker
  • broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象,否则返回异常.
  • 生产者接收到结果后,对于异常可能会进行重试,根据参数reties的配置决定.

kafka发送端文件存储原理

我们普遍认为一旦涉及到磁盘的访问,数据的读写就会变得很慢,其实不然,操作系统已经针对磁盘的访问速率做了很大的优化;比如,预读会提前将一个比较大的磁盘读入内存,后写会把很多小的逻辑写操作合并起来组合成一个大的物理写操作;并且,操作系统还会将主内存剩余的所有空间都用作磁盘缓存,所有的磁盘读写都会经过统一的磁盘缓存,综上所述,如果针对磁盘的顺序读写,某些情况它可能比随机的内存访问都要快。

文件写入的逻辑无外乎一下这两种,但kafka选择了第一种,也就是a图的逻辑:

image.png

b图是首先在内存中保存尽可能多的数据,并在需要时将这些数据刷新进磁盘;
a图是所有数据立即写入磁盘,但不进行刷新数据的调用,数据首先会被传输到磁盘缓存,操作系统随后会将数据定期自动刷新到磁盘。

发送端优化

新的API中,生产者要发送消息,并不是直接发送给服务器,而是在客户端先把消息放入一个缓冲队列中,然后由一个消息发送线程从队列中拉取消息,以批盐的方式发送消息给服务端。 Kafka的记录收集器RecordAccumulator 负责缓存生产者客户端产生的消息,发送线程( Sender)负责读取记录收集器的批量消息, 通过网络发送给服务端。

开篇我们便列出了kafka 发送端的流程图,消息发送之初,首先会为消息指定一个分区(发送消息时未指定分区的情况下),对于没有键的消息,通过计数器自增轮询的方式依次将消息分配到不同的分区上;对于有键的消息,对键计算散列值,然后和主题的分区数进行取模得到分区编号,具体的客户端代码实现:

public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
       //获取集群中所有的分区
        List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
        int numPartitions = partitions.size();
        //如果指定分区
        if (record.partition() != null) {
            // they have given us a partition, use it
            if (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
                                                   + " is not in the range [0..."
                                                   + numPartitions
                                                   + "].");
            return record.partition();
      //  如果没有key,则负载均衡的分布
        } else if (record.key() == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
            if (availablePartitions.size() > 0) {
                int part = Utils.abs(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.abs(nextValue) % numPartitions;
            }
        } else {
            // 如果有key,则对key 进行hash取模运算
            return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
        }
    }

}

在客户端就为消息选择分区的目的是什么? 只有为消息选择分区,我们才能知道应该发送到哪个节点,如果随便找一个服务端节点,再由那个节点去决定如何将消息转发给其他正确的节点来保存。这种方式增加了服务端的负担,多了不必要的数据传输。

序列化

在上述代码中,我们看到了kafka producer在发送消息的时候会将key和value进行序列化,上面的程序中使用的是Kafka客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器之外还有:ByteArrayByteBufferBytesDoubleIntegerLong这几种类型,这几个序列化类都实现了org.apache.kafka.common.serialization.Serializer接口接下来,此接口有三种方法:

  • public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  • public byte[] serialize(String topic, T data):用来执行序列化。
  • public void close():用来关闭当前序列化器。一般情况下这个方法都是个空方法,如果实现了此方法,必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。

业界用的多的序列化框架无外乎如Avro、JSON、Thrift、ProtoBuf或者Protostuff等工具,这里我就不扩展开了,读者如果感兴趣可以搜索相关的资料,下面就以一个简单的例子来介绍下如何自定义序列化方式.

假设我们有一个自定义的Company类:

@Data
public class Company {
    private String name;
    private String address;
}

接下来我们Company的name和address属性进行序列化,实现下Serializer接口:

public class CompanySerializer implements Serializer<Customer> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }
    public void close() {}
}

使用自定义的序列化类的方式也简单,在前面的代码中替换下properties中的序列化类即可:

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.xxx.kafka. CompanySerializer");

分区器

在上文的demo中,我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的(也就是我们平时工作中,发送消息时只需要指定topic和message),当不指定key时默认为null.消息的key有两个重要的作用:

  • 提供描述消息的额外信息;
  • 用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中.

如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询round-robin)算法来将消息均衡到所有分区.

如果key不为null且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区.注意的是,在计算消息与分区的映射关系时,使用的是全部的分区数而不仅仅是可用的分区数.这也意味着,如果某个分区不可用(虽然使用复制方案的话这极少发生),而消息刚好被分配到该分区,那么将会写入失败.另外,如果需要增加额外的分区,那么消息与分区的映射关系将会发生改变,因此尽量避免这种情况,具体的信息可以查看DefaultPartitioner中的代码实现:

    /**
     * Compute the partition for the given record.
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取指定topic的partitions
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //key=null 
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            //可用分区
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                //消息随机分布到topic可用的partition中
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 无分区可利用, 给定一个不可用的分区
                return Utils.toPositive(nextValue) % numPartitions;
            }
            //如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值
        } else {//通过hash获取partition
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

现在来看下如何自定义一个分配器,下面将key为Test的消息单独放在一个分区,与其他的消息进行分区隔离:

public class TestPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String)))
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Test"))
        return numPartitions; // Banana will always go to last partition
   
     // Other records will get hashed to the rest of the partitions
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }
    
    public void close() {}
 
}

使用自定义的分区器

使用很简单,在配置文件中或者properties文件中指定分区器的类即可;

props.put("partitioner.class", "com.xxx.kafka.TestPartitioner");

拦截器

Producer拦截器是个相当新的功能.对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等.同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链,Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
  • onAcknowledgement(RecordMetadata, Exception e):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
  • close:关闭interceptor,主要用于执行一些资源清理工作,一般不作实现;

interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全.另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递.这在使用过程中要特别留意.

下面我们简单演示一个双interceptor组成的拦截链,第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数.

第一个,在send方法中,我们会创建一个新的message,把时间戳写入消息体的最前部.

public class TimeStampPrependerInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
 
    @Override
    public ProducerRecord onSend(ProducerRecord msg) {
        return new ProducerRecord(
                msg(), msg(), record.timestamp(), msg(), System.currentTimeMillis() + "," + msg().toString());
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
 
    }
 
    @Override
    public void close() {
    }
}

定义第二个interceptor:CounterInterceptor,该interceptor会在消息发送后更新"发送成功消息数"和"发送失败消息数"两个计数器,并在producer关闭时打印这两个计数器;

public class CounterInterceptor implements ProducerInterceptor<String, String> {
 
    private int errorCounter = 0;
    private int successCounter = 0;
 
    @Override
    public void configure(Map<String, ?> configs) {
    }
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }
 
    @Override
    public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}

定义好interceptor之后,我们需要在producer中这样指定即可,代码如下:

List<String> interceptors = new ArrayList<>();
interceptors.add("com.xxx.kafka.TimeStampPrependerInterceptor"); // interceptor 1
interceptors.add("com.xxx.kafka.CounterInterceptor"); // interceptor 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

// 一定要关闭producer,这样才会调用interceptors中的close方法
producer.close();

写了这么多,基本上将一个简单消息从kafka producer发送时可以做的事情弄清了,但是我还是有一个疑问,分区器,拦截器,序列化他们之间有顺序?
这个疑问留给大家自己去解决!!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,458评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,030评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,879评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,278评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,296评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,019评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,633评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,541评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,068评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,181评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,318评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,991评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,670评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,183评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,302评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,655评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,327评论 2 358

推荐阅读更多精彩内容

  • 一天晚上,在楼下奶茶店,偶然间看到这个乖巧的小可爱,就把它定格了下来。因为最近考研的事情,真的好烦人,总是觉得时间...
    突然就想你了阅读 174评论 0 0
  • 喜欢的女生终于发消息给我,我却果断拉黑 大学喜欢一个女生四年,被拒绝了4年 毕业后她去了外省,我留在本地,从此不再...
    迷茫懒惰君阅读 585评论 0 0
  • 想有一首歌 我们在歌里 冬天入夜猩红的天空 再来一场细碎的大雪 空静无人的广场 出现一个你 陪我看完这雪景 若真有...
    竹七君阅读 359评论 0 0
  • 一、10月整体分析 月计划60%完成,重要事项取得的效果比预期更好!优化了工作上的石墨协作工作集流程,建立了家庭石...
    扬头望月亮阅读 236评论 0 0