kafka生产者及问题记录

1.生产者API

1.1 定义

自定义数据流向kafka集群中的TOPIC发送数据的应用程序就是kafka生产者。

注意:在0.10.0的官方文档中指出,目前版本支持java版本的生产者API,旧版的scala实现的生产者API正在逐步淘汰。

1.2 自定义生产者的实现

1.2.1 maven依赖

API依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

1.2.2 生产者使用说明

参考文档:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

  1. KafkaProducer构造器
构造器 参数 参数说明
KafkaProducer(Map<String,Object> config) Map类型 生产者配置参数构建的Map
KafkaProducer(Map<String,Object> config,Serializer<K> keySerializer,Serializer<V> valueSerializer) Map和序列化类(或继承实现了Serializer的类) 生产者配置参数构建的Map和K-V数据的序列化方法
KafkaProducer(Properties prop) Properties 生产者配置参数构建的Properties
KafkaProducer(Properties prop,Serializer<K> keySerializer,Serializer<V> valueSerializer) Properties和序列化类(或继承实现了Serializer的类) 生产者配置参数构建的Properties和K-V数据的序列化方法

说明:KafkaProducer(Properties prop)为常用方法。

  1. 生产者中实现的方法
方法名 参数 返回值 说明
send ProducerRecord<K,V> record Future<RecordMetadata> record是要发送的消息,需要单独构建。该方法执行可以获取返回值查看kafka元数据(offset等)。
send ProducerRecord<K,V> record, Callback callback Future<RecordMetadata> callback可以指定一个方法,当kafka接收消息时执行该方法(故障也会执行回调)。
flush void void 确保缓冲的数据写入kafka,手动将数据写入磁盘(一般情况下会缓存满了会自动写入磁盘)
partitionsFor String topic List<PartitionInfo> 获取指定TOPIC的分区数据
metrics void Map<MetricName,? extend Metric> 获取当前生产者维护的所有metric信息
close void void 等待所有发送完成后关闭生产者
close long timeout, TimeUnit timeUnit void 指定时间内未发送完成就记录失败并关闭生产者
  1. 生产者使用示例

创建生产者方法:

/**
     * 获取kafka producer
     * @return
     */
    public static KafkaProducer<String, String> getProducer() {

        Properties props = new Properties();
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name", "kafka");

        props.put("sasl.jaas.config", KafkaConstants.JAAS_CONFIG);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);

        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        props.put("unclean.leader.election.enable", "false");

        props.put(ProducerConfig.ACKS_CONFIG, "all");// acks
        props.put(ProducerConfig.RETRIES_CONFIG, 0);// retries
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// batch.size
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// linger.ms
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// buffer.memory

        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        logger.info("Get Kafka sender" + producer.toString());
        return producer;

    }

发送数据:

        String  topicName = "topicName";
        String sendData = "Hello Kafka";
        KafkaProducer<String, String> producer = KafkaUtils.getProducer();
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,source, sendData);
        try {
            producer.send(record);
        }catch (Exception e) {
            logger.error("异常信息:",e);
        }

CallBack回调示例:

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });//遇到故障打印offset

1.3 生产者配置说明

参考位置:https://www.bookstack.cn/read/apache-kafka-documentation-cn/030_configuration-01_configuration_cn.md

配置项 示例 说明
bootstrap.servers host1:port1,host2:port2,... 必选,填写kafka集群的节点IP及port列表,不需要全部列出,以防有节点宕机,建议多写几个
key.serializer org.apache.kafka.common.serialization.StringSerializer 指定实现了Serializer接口的类,key的序列化方法
value.serializer org.apache.kafka.common.serialization.StringSerializer 指定实现了Serializer接口的类,value的序列化方法
acks 1 acks=0,kafka收到数据后立即写入socket缓冲区,不做任何确认就认为接收成功;acks=1,kafka leader收到数据后写入log,不等其他flower同步成功就任务数据接收成功,如果当时出现单点故障可能导致数据丢失;acks=all,收到数据后,leader会等待所有flower都同步完成才会确认接收成功。
buffer.memory 33554432 生产者发送消息的缓存空间(byte数long类型),如果发送速度大于服务器可接受的速度会抛出异常。
compression.type none 指定数据的压缩类型none,gzip,snappy,lz4。默认为none,
security.protocol SASL_PLAINTEXT 用于代理通讯的协议,PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
sasl.mechanism GSSAPI 用于客户端连接的SASL机制。默认就是GSSAPI。
sasl.kerberos.service.name kafka Kafka 运行时使用的 Kerberos 主体名称。 这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。
partitioner.class class org.apache.kafka.clients.producer.internals.DefaultPartitioner 实现了Partitioner接口的分区器

详情见参考说明或kafka官方开发手册

1.4 ProducerRecord说明

kafka在发送消息时,需要将消息构建为record对象进行发送。ProducerRecord对象构建时需要TOPIC名称、分区号、timestamp、key、value一共5个参数,其中TOPIC和value是必选项。详情请见API文档:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

1.5 问题记录

1.5.1 空指针异常

在构建了producer向kafka发送消息时,只指定topic、value两个参数,出现了空指针异常的情况。且报错信息极少几乎无法定位。

image.png

学到的调试方法:在使用try-catch捕捉异常时,往往不会打印全部报错信息,如果输出到控制台请使用e.printStackTrace;如果要输出到日志,则使用logger.error("异常信息:",e);

经过阅读公司的kafka工具类源码发现,在获取kafka生产者对象时,属性配置中指定了自定义的分区器,分区器需要根据传入的key值进行hash分区;所以调用send方法时必须指定key。

分区器代码示例:

public class HashPartitioner implements Partitioner{

    public void configure(Map<String, ?> arg0) {
    }

    public void close() {
    }

    public int partition(String arg0, Object arg1, byte[] arg2, Object arg3,
            byte[] arg4, Cluster arg5) {
        int partitionNum = arg5.partitionCountForTopic(arg0);
        int keyHashCode = arg1.hashCode();
        return Math.abs(keyHashCode) % partitionNum;
    }

}

指定自定义分区的配置:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.partitioner.HashPartitioner");
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容