kafka producer

一、参数

1. 必要参数

  • boostrap.servers,boker的地址,部分即可
  • key.serializer和value.serializer,key和value的序列化器

2. 其他

二、发送消息

1. 调用方式

  • 发后即忘
  • 同步
    producer.send(record).get()
  • 异步
    send with callback

2. 异常

  • 可重试异常
    NetworkException、LeadernotAvailableException等,会自动重试retries参数的次数,然后会抛出异常
  • 不可重试异常
    比如RecordTooLargeException,直接抛异常

三、发送流程

架构

图来自https://blog.csdn.net/shufangreal/article/details/110657052

核心组件

  • 拦截器
  • 序列化器
  • 分区器
  • 消息累加器
    这里分区1、分区2就是topic的具体partition,ProducerBatch就是多个ProducerRecord。整个消息累加器的大小由buffer.mermory控制,默认32MB,满了之后阻塞max.block.ms(默认60000),然后抛出异常
  • InFlightRequests
    这里的node1、node2对应的是partition转换成ip+端口的形式,Request是kafka的发送协议结构。这里存着已发送但没有收到相应的请求,每个连接最多缓存max.in.flight.requests.per.connection(默认5)个。保证有序性时这个参数要设为1

四、元数据更新

producer需要通过broker获取kafka集群的元数据,当没有所需元数据或超过metradata.max.age.ms(默认300000)就拉取一次。这里涉及leastLoadedNode的概念,也就是InFlightRequests中等待Requests最少的Node,producer会向这个节点发送拉取请求,因为认为它负载最小。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容