KafkaConsumer 流程解析

前言

对于中间件的使用都必须按照他们自己的规范和流程来使用, KafkaConsumer也对消息的负载消费定义了流程,新版本的流程跟老版本的流程有一些改变,这里只对新版本的流程来介绍,同时也会介绍这样去设计组件与流程的好处

KafkaConsumer 对于多线程访问是不安全的,通过使用acquire()release()方法来操作AtomicLong currentThread字段(保存当前访问线程ID), 有多个线程同时访问抛出ConcurrentModificationException, 来防止对个线程同时访问。

核心组件

  • ConsumerCoordinator: 消费者的协调者, 管理消费者的协调过程
    • 维持coordinator节点信息(也就是对consumer进行assignment的节点)
    • 维持当前consumerGroup的信息, 当前consumer已进入consumerGroup
  • Fetcher: 数据请求类
  • ConsumerNetworkClient: 消费者的网络客户端,负责网络传输的流程
  • SubscriptionState: 订阅状态类
  • Metadata: 集群的元数据管理类,使用租约机制

工作流程

kafka是以拉模式去消费数据,可由用户自由控制消费速度,对用户的消费位置可以选择自动异步commit,或者由用户主动同步commit, 实例代码如下:

KafkaConsumer consumer = ...
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
    ConsumerRecords records = consumer.poll(long timeout);
    // Handle new records 用户处理消息
    // consumer.commitSync 可由用户自主提交消费位置
}

时序图:


消费流程
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,399评论 19 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 175,376评论 25 709
  • 未来藏在当下,我们能把握的只有当下,叔本华说:你只能做你想做的,不能要你想要的。在当下做你想做的,尚且可以把握,但...
    余岁记阅读 2,299评论 0 1
  • 如何全局安装一个 node 应用? npm install -g package.json 有什么作用? npm上...
    zy懒人漫游阅读 15,809评论 0 3
  • 君在金陵城 我在姑苏畔 念念思君如丹桂 花开满城等君来 曾经与君诺 万水千山如平地 见君只在朝露间 今日与君明 一...
    做作自己阅读 1,131评论 0 0