Kafka 生产者

Kafka的版本为2.0.0、jdk 1.8.0_231、scala 2.11.8。

生产者客户端开发

必要的参数配置

  • bootstrap.servers:指定生产者客户端连接Kafka集群所需的broker地址,多个使用地址使用逗号分割。
  • key.serializer和value.serializer:broker端接收的消息必须以字节数组的形式存在。

在编写代码的过程中,参数的名称,可以使用ProducerConfig中的变量(在java代码中),如果是scala代码,需要自己编写参数。

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

消息的发送

Kafka生产者发送消息的模式为:发后即忘和异步(Kafka0.8.2之后,全部都是使用异步调用)。

发后即忘

只管发送消息,不管消息是否正确到达。会出现消息丢失,这种发送方式,性能最高,可靠性最差。

异步调用

Kafka发送消息都是异步,都会发送一个Future<RecordMetadata>对象,如果想要使用同步的方式,可以在后面调用get方法,
get方法会阻塞Kafka的响应,直到消息发送成功,或者发送异常。RecordMetaData中含有一些元数据:主题、分区号、分区中的偏移量、hash值。

try{
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get();
    System.out.println(metadata.topic()+"-"+metadata.partition()+"-"+metadata.offset());
}catch (ExecutionException | InterruptedException e){ //jdk版本要合适,本人使用的jdk1.8,否则会报错。
    e.printStackTrace();
}

Kafka中一般发生两种类型的异常:可重试异常和不可重试异常。常见的可重试异常:NetworkException(网络故障)、LeaderNotAvailableException(分区leader副本不可用)、
UnknownTopicOrPartitionException、NotEnoughReplicasExceotion、NotCoordinatorException等。不可重试异常:RecordTooLargeException等。为了保证
消息可以顺利达到,可以设置重试次数,参数为retries。

在调用send方法来发送消息时,可以指定或者不指定调用函数,如果想用阻塞请求,可以调用get方法。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic-demo", "key".getBytes(), "value".getBytes());
producer.send(record,
    new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception e) {
    if(e != null)
        e.printStackTrace();
    System.out.println("The offset of the record we just sent is: " + metadata.offset());
    }
});

同一分区发送的数据,每条记录的callback发送都是按照顺序执行的。
KafkaProducer的close方法会阻塞等待之前所有发送的请求完成之后再关闭KafkaProducer。
KafkaProducer使用total.memory.bytes来控制Producer缓存数据的最大字节数(要保证内存充足)。
异步调用,需要设置block.on.buffer.full=false。

序列化

  • 1.默认序列化实现
    KafkaProducer默认的编码格式为UTF-8。在编写自动代码之前,需要添加相应的依赖:
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.10</version>
    <scope>provided</scope>
</dependency>

下面是自定义的序列化代码:

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanySerializer implements Serializer<Company> {
    private String encoding = "UTF8";
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public byte[] serialize(String topic, Company data) {
        try {
            if (null == data){
                return null;
            }
            byte[] name,address;
            if (null != data.getName()){
                name = data.getName().getBytes(encoding);
            }else {
                name = new byte[0];
            }
            if (null != data.getAddress()){
                address = data.getAddress().getBytes(encoding);
            }else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length+address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        }catch (UnsupportedEncodingException e){
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
        return new byte[0];
    }
    public void close() {

    }
}

反序列化:

package com.edu.kafka;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {

    private String encoding = "UTF8";

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public Company deserialize(String topic, byte[] data) {
        if (null == data){
            return null;
        }
        if (data.length < 8){
            throw new SerializationException("Size of data received by deserializer is shorter than expected!");
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen,addressLen;
        String name,address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);
        try {
            name = new String(nameBytes,encoding);
            address = new String(addressBytes,encoding);
        }catch (UnsupportedEncodingException e){
            throw new SerializationException("Error occur when deserializing!");
        }
        return new Company(name,address);
    }
    public void close() {

    }
}
  • 2.使用Protostuff实现序列化

添加相应依赖:

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.2</version>
</dependency>

序列化:

package com.edu.kafka;

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtobufIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanySerializer implements Serializer<Company> {
    private String encoding = "UTF8";

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public byte[] serialize(String topic, Company data) {
        if (null == data){
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(data.getClass());
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        byte[] protostuff = null;
        try {
            protostuff = ProtobufIOUtil.toByteArray(data,schema,buffer);
        }catch (Exception e){
            throw new IllegalStateException(e.getMessage(),e);
        }finally {
            buffer.clear();
        }
        return protostuff;
    }

    public void close() {

    }
}

反序列化:

package com.edu.kafka;

import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {

    private String encoding = "UTF8";

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public Company deserialize(String topic, byte[] data) {
        if (null == data){
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(Company.class);
        Company company = new Company();
        ProtostuffIOUtil.mergeFrom(data,company,schema);
        return company;
    }

    public void close() {

    }
}

使用序列化:

package com.edu.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerAnalysis {

    public static final String topic = "topic-demo";

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
        KafkaProducer<String, Company> producer = new KafkaProducer<String, Company>(prop);
        Company company = Company.builder().name("xiaoyao").address("Beijing").build();
        ProducerRecord<String, Company> record = new ProducerRecord<String, Company>("topic-demo", company);
        System.out.println("===start===");
        try {
            producer.send(record);
        } catch (Exception e) {
            System.out.println("===exception===");
            e.printStackTrace();
        }

        producer.close();
        System.out.println("===finish===");
    }
}

区分器

消息经过序列化之后就需要确定他们发往的分区。如果ProducerRecord指定分区,就会发往指定的分区,
如果没有指定,就需要依赖分区器,根据key字段的哈希值选择一个分区,如果分区和key都没有指定,使用轮训的方式。
在Kafka0.8.2.2中,有一个默认的区分器:Partitioner。
在Kafka2.0.0中有一个默认分区器:DefaultPartitioner,并且提供了Partitioner接口,用户可以自定义实现分区器。

自定义分区器:

package com.edu.kafka;


import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(0);

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int numPartitions = partitionInfos.size();
        if (null == keyBytes){
            return counter.getAndIncrement() % numPartitions;
        }else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

在Kafka 0.8.2.2版本中,也可以使用上面的代码,需要将implements Partitioner去掉,然后修改下面的key移动到相应分区的代码。

生产者拦截器

拦截器是在Kafka0.10.0.0之后才引入的功能。
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。
KafkaProducer会在消息被应答之前或者消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于
用户设定的Callback之前执行。
close()方法主要用于在关闭拦截器时执行一些资源的清理工作。
自定义的拦截器:

package com.edu.kafka;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CustomProducerInterceptor implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String modifiedValue = "prefix1-"+record.value();
        return new ProducerRecord<String, String>(record.topic(),record.partition(),
                record.timestamp(),record.key(),modifiedValue,record.headers());
    }

    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (null == exception){
            sendFailure++;
        }else{
            sendFailure++;
        }
    }

    public void close() {
        double successRatio = (double)sendSuccess/(sendSuccess + sendFailure);
        System.out.println("[INFO] send ratio="+String.format("%f",successRatio*100)+"%");
    }

    public void configure(Map<String, ?> configs) {

    }
}

KafkaProducer可以指定多个拦截器,形成拦截链,按照指定的顺序执行。

原理分析

整体架构

20190620170845181.png

执行流程:
1.在启动main方法之后,KafkaProducer发送的数据到ProducerInterceptor拦截器。
2.ProducerInterceptor拦截器对数据进行过滤,然后将数据发送到序列化。
3.将过滤之后的数据发送到序列化,然后发送到不同的分区。
4.分区中数据,会发送到RecordAccumulator(消息累加器),RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络的资源消耗以提升性能,默认大小为 buffer.memory=32MB,max.block.ms = 60000(超时时间)。
RecordAccumulator维护多个双端队列,数据会堆积到一个ProducerBatch(多个ProducerRecord),然后批量发送。消息在网络上都是以字节的形式传输,RecordAccumulator的内部有一个BufferPool,用来实现ByteBuffer的复用,以实现缓存的高效利用。
5.Sender从RecordAccumulator中获取缓存数据,转换为<Node,List<ProducerBatch>>,Node表示broker节点。
6.将<Node,List<ProducerBatch>>转换为<NodeRequest>的形式。
7.将信息缓存起来,保存到InFlightRequests中,InFlightRequest保存对象的具体形式为Map<NodeId,Deque<Request>>,它的主要作用是缓存已经发出去但还没有收到响应的请求。
8.将信息提交给selector准备发送。
9.selector将信息发送到KafkaCluster,KafkaCluster信息接收到信息之后,返回数据。
10.selector将KafkaCluster发送过来的数据,传递给InFlightRequests。
11.InFlightRequests接收到信息之后,会将信息发挥给主进程。

元数据的更新

InFlightRequests中可以获取LeastLoadedNode,即所有Node中负载最小。元数据操作是在客户端内部进行的。更新元数据时,
会挑选出LeastLoadedNode,然后这个Node发送MetadataRequest请求来获取具体的元数据信息。这里的数据同步是通过
synchronized和final来保证。

生产者参数

参数 默认值 描述
acks 1 指定分区中必须要有多少个副本收到这条信息
acks = 1,如果失败,会重发,只要有一个接收,就是成功;acks = 0,不需要等待服务端的响应;acks = -1,需要将所有的副本都返回成功才可以。
max.request.sizes 1MB 发送消息的最大值
retries 0 重试次数
retries.backoff.ms 100 重试之间时间间隔
compress.type none 可以对消息进行压缩
connections.max.idle.ms 540000ms 闲置多长时间之后关闭连接
receive.buffer.bytes 32KB 接收消息缓存区大小
send.buffer.bytes 128KB 发送消息缓存区大小
request.timeout.ms 3000ms 等待请求响应的最大时间

其他的参数请参考官网

参考文献

kafka消息发送模式
深入理解Kafka:核心设计与实践原理
Kafka Documentation

后续

本文是《深入理解Kafka:核心设计与实践原理》的读书笔记。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356