kafka学习(2) 生产者:开发步骤,同步异步方式,关键参数

生产者就是负责向kafka发送消息的应用程序

客户端开发

一个正常的生成逻辑需要具备一下几个步骤

(1) 配置生产者客户端参数和创建生产者实例
(2) 构建待发送的消息
(3) 发送消息
(4) 关闭生产者实例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerAnalysis {
    public static final String brokerList = "192.168.61.97:9092";
    public static final String topic = "test_gp";

    // 配置客户端参数
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "producer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        // 实例化生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // 构建待发送的消息
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello kafka2");

        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 关闭生产者实例
        producer.close();
    }
}

构建的消息对象ProducerRecord不只是消息,而是一个包含多个属性的对象,与业务相关的消息是其中的value属性,本例中hello kafka2就是value,主要属性包括

  • topic: 消息要发往的kafka主题
  • partition:消息要发往的kafka分区号
  • key:消息的键,可以用来计算分区号,进而知道消息发往那个kafka分区,消息根据主题topic进行一级归类,再根据key进行二级归类同一个topic同一个key的消息会发送到kafka的同一个主题的同一个分区下,有key的消息同时还支持日志压缩功能
  • value:消息体
  • timestamp:消息的时间戳

发送多条消息

for (int i = 0; i <= 10; i++) {
            try {
                Future<RecordMetadata> future = producer.send(record);
                RecordMetadata metadata = future.get();
                System.out.println(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());
                Thread.sleep(2000);

            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }

必要的参数

在创建生产者前需要配置响应的参数, 比如需要链接的kafka集群地址,有三个参数必填的
(1) bootstrap.servers: 指定生产者客户端链接kafka集群所需要的broker地址清单,格式为 host1:port1,host2:port2, 可以设置一个或多个地址,以逗号隔开, 不需要设置所有的broker地址, 生产者会从给定的单个broker里查找到其他broker信息, 不过建议至少设置2个broker, 其中一个宕机的时候生产者仍然可以连接到kafka集群上
(2) key.serializervalue.serializer: broker端接受到的数据必须以字节数组(byte[])的形式存在, 在发往broker之前需要将key, value做相应的序列化操作来转化为字节数组, key.serializer必须被设置,即使消息中没有指定key。KafkaProducer<String, String>中的泛型两个String对应的就是消息中key和value
其他参数
client.id: 设定kafka生产者对应的客户端id, 如果不设置会自动生成一个非空字符串
kafka的参数众多,可以直接调用kafka客户端中的ProducerConfig类来编写预防输入错误

public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

可以使用类的getName方法代替全限定名

public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        System.out.println(StringSerializer.class.getName());
        return props;
    }

KafkaProducer对象是线程安全的,多个线程可以共享单个KafkaProducer实例

消息的发送

  • 在创建完生产者实例之后下一步构建消息对象ProducerRecord, 其中topicvalue是必填参数, 如果其他参数不填就是null
    指定key的作用是为消息选择存储分区,key可以为null,当指定key且不为空的时候,kafka是根据key的hash值与分区数取模来决定数据存储到那个分区,那么当key为null的时候,kafka也会自动计算一个随机分区
  • 针对不同的消息,需要构建不同的ProducerRecord对象,实际开发中需要频繁创建ProducerRecord
  • 如果发送消息的topic不存在,send方法会自动创建这个topic,创建的topic默认是1个副本1个分区
  • 构造完成后发送消息有三种模式, 发后即忘, 同步(sync), 异步(async)
  • producer.send(record) 就是 发后即忘的方式, 只管发送不管消息是否正确到达, 在大多数情况下没有问题, 这是一种性能最高可靠性差的方法
  • 发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,send方法本身是异步的。
  • send()方法返回是一个Future<RecordMetadata>类型, Future表示一个任务的生命周期, 并且提供了相应的方法判断任务是否已经完成或者取消
  • 同步方式,对send(record)链式调用get()方法阻塞等待kafka响应 producer.send(record).get()
try {
            producer.send(record).get();
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }

也可以将RecordMetadata对象中的信息显示出来,RecordMetadata中包含topicpartitionoffsettimestamp

try {
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata metadata = future.get();
            System.out.println(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
  • 同步方式要么发送成功,要么发送异常,如果发生异常需要捕获处理,而不会像发后即忘的方式直接导致消息丢失,同步方式性能低,需要阻塞等待一条发送完成之后才能发送下一条
  • 异步方式一般采用在send方法中指定一个Callback回调函数,kafka在返回响应时调用该函数来实现异步的发送确认,Callback中onCompletion的两个参数是互斥的,如果发送成功recordMetadata不为null,e为null,发送异常时e不为null,recordMetadata为null。
  • 在所有消息发送完成之后调用producer.close()回收资源,close方法会阻塞等待所有消息发送完毕
  • acks参数,这个参数用来指定分区中必须至少有多少个副本收到这条消息之后,生产者才会认为这条消息是写入成功的。有三种取值“1”,“0”,“-1”,默认是“1”,只要分区的leader副本成功写入生产者就会收到服务端的成功响应,这是在可靠性和吞吐量之间的折中方案。“0”不需要任何服务器响应直接认为发送成功,这种吞吐量最大,“-1”:需要等待ISR(与leader副本保持一定程度同步的副本)中所有的副本都成功,才会收到服务端的响应成功,这样可靠性最高,如果ISR中只有leader副本实际上退化为acks="1"的模式。acks使用如下方式配置
properties.put("acks", "0")
properties.put(ProducerConfig.ACKS_CONFIG, "0")

序列化

生产者需要使用序列化器把对象转化为字节数组才能通过网络发送给Kafka, 而消费者需要使用反序列化器把从kafka收到的字节数组转化成相应的对象, 可以有多种序列化器如StringSerializer, IntegerSerializer等, 生产者使用的序列化器和消费者使用的保持对应

数据位置

发送给kafka的数据存储在kafka日志存储位置对应主题名文件夹下,查看日志存储位置配置

vim kafka.properties
log.dirs=/home/kafka/data

在日志目录下找到clear_data-0clear_data-1clear_data-2,各自对应一个分区,消息的物理存储就是就是这个机器节点的三个文件夹,其中一个目录的结构

[root@cloudera02 pira_clear_save_data-0]# tree
.
├── 00000000000000011472.index
├── 00000000000000011472.log
├── 00000000000000011472.snapshot
├── 00000000000000011472.timeindex
└── leader-epoch-checkpoint

主要包含:

  • log记录数据日志文件
  • index记录偏移量索引文件
  • timeindex时间戳索引文件
  • 其他文件
    kafka会默认保存消息在磁盘7天, kafka.properties中log.retention.hours=168,当LogSegment大小超过设定值,会新生成log文件和配套的index,timeindex文件。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350