Kafka 源码解析之 Consumer 两种订阅模式

[TOC]
本篇文章讲述的主要内容如下:

  1. consumer 的两种订阅模式, subscribe()和assign() 模式,一种是 topic 粒度(使用 group 管理),一种是 topic-partition 粒度(用户自己去管理);
  2. consumer 的两种 commit 实现,commitAsync()和commitSync(),即同步 commit 和异步 commit;
  3. consumer 提供的两种不同 partition.assignment.strategy,这是关于一个 group 订阅一些 topic 后,group 内各个 consumer 实例的 partition 分配策略。

0.9.X 之前 Kafka Consumer 是支持两个不同的订阅模型 —— high level 和 simple level,这两种模型的最大区别是:第一个其 offset 管理是由 Kafka 来做,包括 rebalance 操作,第二个则是由使用者自己去做,自己去管理相关的 offset,以及自己去进行 rebalance。

在新版的 consumer 中对 high level 和 simple level 的接口实现了统一,简化了相应的相应的编程模型。

订阅模式

在新版的 Consumer 中,high level 模型现在叫做订阅模式,KafkaConsumer 提供了三种 API,如下:

// 订阅指定的 topic 列表,并且会自动进行动态 partition 订阅
// 当发生以下情况时,会进行 rebalance: 1.订阅的 topic 列表改变; 2.topic 被创建或删除; 3.consumer 线程 die; 4. 加一个新的 consumer 线程
// 当发生 rebalance 时,会唤醒 ConsumerRebalanceListener 线程
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener){}
// 同上,但是这里没有设置 listener
public void subscribe(Collection<String> topics) {}
//note: 订阅那些满足一定规则(pattern)的 topic
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener){}

以上三种 API 都是按照 topic 级别去订阅,可以动态地获取其分配的 topic-partition,这是使用 Group 动态管理,它不能与手动 partition 管理一起使用。当监控到发生下面的事件时,Group 将会触发 rebalance 操作:

  1. 订阅的 topic 列表变化;
  2. topic 被创建或删除;
  3. consumer group 的某个 consumer 实例挂掉;
  4. 一个新的 consumer 实例通过 join 方法加入到一个 group 中。

在这种模式下,当 KafkaConsumer 调用 pollOnce 方法时,第一步会首先加入到一个 group 中,并获取其分配的 topic-partition 列表

这里介绍一下当调用 subscribe() 方法之后,Consumer 所做的事情,分两种情况介绍,一种按 topic 列表订阅,一种是按 pattern 模式订阅:

  1. topic 列表订阅

    1. 更新 SubscriptionState 中记录的 subscription(记录的是订阅的 topic 列表),将 SubscriptionType 类型设置为 AUTO_TOPICS;
    2. 更新 metadata 中的 topic 列表(topics 变量),并请求更新 metadata;
  2. pattern 模式订阅

  3. 更新 SubscriptionState 中记录的 subscribedPattern,设置为 pattern,将 SubscriptionType 类型设置为 AUTO_PATTERN;

  4. 设置 Metadata 的 needMetadataForAllTopics 为 true,即在请求 metadata 时,需要更新所有 topic 的 metadata 信息,设置后再请求更新 metadata;

  5. 调用 coordinator.updatePatternSubscription() 方法,遍历所有 topic 的 metadata,找到所有满足 pattern 的 topic 列表,更新到 SubscriptionState 的 subscriptions 和 Metadata 的 topics 中;

  6. 通过在 ConsumerCoordinator 中调用 addMetadataListener() 方法在 Metadata 中添加 listener 当每次 metadata update 时就调用第三步的方法更新,但是只有当本地缓存的 topic 列表与现在要订阅的 topic 列表不同时,才会触发 rebalance 操作。

其他部分,两者基本一样,只是 pattern 模型在每次更新 topic-metadata 时,获取全局的 topic 列表,如果发现有新加入的符合条件的 topic,就立马去订阅,其他的地方,包括 Group 管理、topic-partition 的分配都是一样的。

分配模式

下面来看一下 Consumer 提供的分配模式,熟悉 0.8.X 版本的人,可能会把这种方法称为 simple consumer 的接口,当调用 assign() 方法手动分配 topic-partition 列表时,是不会使用 consumer 的 Group 管理机制,也即是当 consumer group member 变化或 topic 的 metadata 信息变化时是不会触发 rebalance 操作的。比如:当 topic 的 partition 增加时,这里是无法感知,需要用户进行相应的处理,Apache Flink 就是使用的这种方式,后续我会写篇文章介绍 Flink 是如何实现这种机制的。

//note: 手动向 consumer 分配一些 topic-partition 列表,并且这个接口不允许增加分配的 topic-partition 列表,将会覆盖之前分配的 topic-partition 列表,如果给定的 topic-partition 列表为空,它的作用将会与 unsubscribe() 方法一样。
//note: 这种手动 topic 分配是不会使用 consumer 的 group 管理,当 group 的 member 变化或 topic 的 metadata 变化也不会触发 rebalance 操作。
public void assign(Collection<TopicPartition> partitions) {}

这里来看一下 Kafka 提供的 Group 管理到底是什么?

我们再来看一下这个方法: ConsumerCoordinator.poll()

// note: 它确保了这个 group 的 coordinator 是已知的,并且这个 consumer 是已经加入到了 group 中,也用于 offset 周期性的 commit
public void poll(long now) {
    invokeCompletedOffsetCommitCallbacks();// note: 用于测试

    // note: Step1 通过 subscribe() 方法订阅 topic,并且 coordinator 未知,初始化 Consumer Coordinator
    if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
        // note: 获取 GroupCoordinator 地址,并且建立连接
        ensureCoordinatorReady();
        now = time.milliseconds();
    }

    // note: Step2 判断是否需要重新加入 group,如果订阅的 partition 变化或则分配的 partition 变化时,需要 rejoin
    // note: 如果订阅模式不是 AUTO_TOPICS 或 AUTO_PATTERN,直接跳过
    if (needRejoin()) {
        // note: rejoin group 之前先刷新一下 metadata(对于 AUTO_PATTERN 而言)
        if (subscriptions.hasPatternSubscription())
            client.ensureFreshMetadata();

        // note: 确保 group 是 active; 加入 group; 分配订阅的 partition
        ensureActiveGroup();
        now = time.milliseconds();
    }

    // note: Step3 检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间
    // note: 发送心跳请求是在 ensureCoordinatorReady() 中调用的
    pollHeartbeat(now);
    // note: Step4 自动 commit 时,当定时达到时,进行自动 commit
    maybeAutoCommitOffsetsAsync(now);
}

如果使用的是 assign 模式,也即是非 AUTO_TOPICS 或 AUTO_PATTERN 模式时,Consumer 实例在调用 poll 方法时,是不会向 GroupCoordinator 发送 join-group/sync-group/heartbeat 请求的,也就是说 GroupCoordinator 是拿不到这个 Consumer 实例的相关信息,也不会去维护这个 member 是否存活,这种情况下就需要用户自己管理自己的处理程序。但是在这种模式是可以进行 offset commit的。

commit offset 请求处理

当 Kafka Serve 端受到来自 client 端的 Offset Commit 请求时,其处理逻辑如下所示,是在 kafka.coordinator.GroupCoordinator 中实现的。

// kafka.coordinator.GroupCoordinator
//note: GroupCoordinator 处理 Offset Commit 请求
def handleCommitOffsets(groupId: String,
                        memberId: String,
                        generationId: Int,
                        offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                        responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
  if (!isActive.get) {
    responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
  } else if (!isCoordinatorForGroup(groupId)) {
    responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))
  } else if (isCoordinatorLoadingInProgress(groupId)) {
    responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))
  } else {
    groupManager.getGroup(groupId) match {
      case None =>
        if (generationId < 0) {
          // the group is not relying on Kafka for group management, so allow the commit
          //note: 不使用 group-coordinator 管理的情况
          //note: 如果 groupID不存在,就新建一个 GroupMetadata, 其group 状态为 Empty,否则就返回已有的 groupid
          //note: 如果 simple 的 groupId 与一个 active 的 group 重复了,这里就有可能被覆盖掉了
          val group = groupManager.addGroup(new GroupMetadata(groupId))
          doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
        } else {
          // or this is a request coming from an older generation. either way, reject the commit
          //note: 过期的 offset-commit
          responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
        }

      case Some(group) =>
        doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
    }
  }
}

//note: 真正的处理逻辑
private def doCommitOffsets(group: GroupMetadata,
                    memberId: String,
                    generationId: Int,
                    offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
  var delayedOffsetStore: Option[DelayedStore] = None

  group synchronized {
    if (group.is(Dead)) {
      responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
    } else if (generationId < 0 && group.is(Empty)) {//note: 来自 assign 的情况
      // the group is only using Kafka to store offsets
      delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
        offsetMetadata, responseCallback)
    } else if (group.is(AwaitingSync)) {
      responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
    } else if (!group.has(memberId)) {//note: 有可能 simple 与 high level 的冲突了,这里就直接拒绝相应的请求
      responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
    } else if (generationId != group.generationId) {
      responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
    } else {
      val member = group.get(memberId)
      completeAndScheduleNextHeartbeatExpiration(group, member)//note: 更新下次需要的心跳时间
      delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
        offsetMetadata, responseCallback)
    }
  }

  // store the offsets without holding the group lock
  delayedOffsetStore.foreach(groupManager.store)
}

处理过程如下:

  1. 如果这个 group 还不存在(groupManager没有这个 group 信息),并且 generation 为 -1(一般情况下应该都是这样),就新建一个 GroupMetadata, 其 Group 状态为 Empty;
  2. 现在 group 已经存在,就调用 doCommitOffsets() 提交 offset;
  3. 如果是来自 assign 模式的请求,并且其对应的 group 的状态为 Empty(generationId < 0 && group.is(Empty)),那么就记录这个 offset;
  4. 如果是来自 assign 模式的请求,但这个 group 的状态不为 Empty(!group.has(memberId)),也就是说,这个 group 已经处在活跃状态,assign 模式下的 group 是不会处于的活跃状态的,可以认为是 assign 模式使用的 group.id 与 subscribe 模式下使用的 group 相同,这种情况下就会拒绝 assign 模式下的这个 offset commit 请求。

根据上面的讲述,这里做一下小结,如下图所示:

image.png
模式 不同之处 相同之处
subscribe() 使用 Kafka Group 管理,自动进行 rebalance 操作 可以在 Kafka 保存 offset
assign() 用户自己进行相关的处理 也可以进行 offset commit,但是尽量保证 group.id 唯一性,如果使用一个与上面模式一样的 group,offset commit 请求将会被拒绝
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355