生产者(2019-02-15)

                                                  Kafka生产者

架构图: 

Kafka生产者组件图

必选属性:

    bootstrap.servers: broker的地址清单(host:port)

    key.serializer: 键的序列化器(ByteArraySerializer[这个只做很少的事情], StringSerializer, IntegerSerializer, 自定义序列化器)

    value.serializer: 值的序列化器(同上)

创建Kafka生产者:

    1. 新建一个Properties对象;

    2. 因为我们打算把键和值定义成字符串类型, 所以使用内置的StringSerializer;

    3. 在这里我们创建了一个新的生产者对象, 并为键和值设置了恰当的类型, 然后把Properties对象传给它。

    private Properties kafkaProps = new Properties();

    kafkaProps.put("bootstrap.servers", "broker1:9092, broker2:9092");

    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new kafkaProducer<String, String>(kafkaProps);

发送消息:

    1.同步发送消息

        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

        try {

            producer.send(record).get();

        } catch (Exception e) {

            e.printStackTrace();

        }

    2.异步发送

        private class DemoProducerCallback implements Callback {

            @Override

            public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                if (e != null) {

                    e.printStackTrace();

                }

            }

        }

        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

        producer.send(record, new DemoProducerCallback());

可配置参数:

    1.acks: 有多少个分区副本收到消息生产者才会认为消息写入是成功的;

    2.buffer.memory: 设置生产者内存缓冲区的大小;

    3.compression.type: 指定消息发送时使用哪一种压缩算法进行压缩(snappy, gzip, lz4);

    4.retries: 生产者可以重发消息的次数;

    5.batch.size: 同一批次发送到同一分区使用的内存大小;

    6.linger.ms: 同批次等待时间;

    7.client.id: 任意字符串, 识别消息的来源;

    8.max.in.flight.requests.per.connection: 生产者在收到服务器的响应之前可以发送多少个消息;

    9.timeout.ms, request.timeout.ms 和 metadata.fetch.timeout.ms: 

        timeout.ms: 等待同步副本返回消息确认的时间;

        request.timeout.ms: 生产者在发送数据时等待服务器返回响应的时间;

        metadata.fetch.timeout.ms: 生产者在获取元数据时等待服务器返回响应的时间;

    10.max.block.ms: 获取元数据时的阻塞时间;

    11.max.request.size: 生产者发送请求的大小;

    12.receive.buffer.bytes 和 send.buffer.bytes: TCP socket 接收和发送数据宝的缓冲区大小;

序列化器:

    主要实现 org.apache.kafka.common.serialization.Serializer 的 byte[] serialize(String topic, Customer data) 方法

分区器:

    主要实现 org.apache.kafka.clients.producer.Partitioner 的 int partition(String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) 方法

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 12,238评论 0 10
  • 学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二...
    绍圣阅读 4,826评论 0 3
  • Kafka的基本概念 BrokerKafka集群中包含多个服务器,其中每个服务器称为一个broker。有一点需要注...
    frmark阅读 2,958评论 0 0
  • 方法比努力重要 实践比读书重要 前提是--先读书 蔡康永说过这样一段话:15岁觉得游泳难,放弃游泳,到18岁遇到一...
    三月1006阅读 3,308评论 1 2
  • 偶然看见窗外几棵树因为寒冷掉光了所有的叶,只能下光秃秃的树干在风中摇曳。想想南方此时过于寒冷的外界,只是让失掉假日...
    何以觥筹错阅读 2,444评论 0 0

友情链接更多精彩内容