KafkaConsumer.java

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    // clientId的生成器
    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    // comsumer的唯一id
    private final String clientId;
    // 控制consumer和服务端Coordinator之间的通信逻辑
    private final ConsumerCoordinator coordinator;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    // poll方法返回用户之前拦截,服务端返回commit响应时拦截
    private final ConsumerInterceptors<K, V> interceptors;
    // 负责consumer与broker之间的通信
    private final ConsumerNetworkClient client;
    // 维护消费者的消费状态
    private final SubscriptionState subscriptions;
    // Kafka集群元信息
    private final Metadata metadata;
    // 当前使用KafkaConsumer的线程id
    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
    // 重入次数
    // 检测是否有多线程并发操作consumer
    private final AtomicInteger refcount = new AtomicInteger(0);

    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        acquire(); // 防止多线程操作
        try {
            long start = time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                // 有消息就返回,退出poll
                if (!records.isEmpty()) {
                    // 为了提升效率,对records集合处理之前,先发送一次FetchRequest,
                    // 这样线程处理完records的同时,FetchRequest和FetchResponse在网络上也在并行传输
                    fetcher.sendFetches();

                    // 发送FetchRequest,不可中断原因:
                    // since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    // 之前的pollOnce
                    client.pollNoWakeup();

                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            // 超过时间还收到消息就返回空
            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.ensureCoordinatorReady();

        if (subscriptions.partitionsAutoAssigned())
            // 如果是AUTO模式,要先完成rebalance
            coordinator.ensurePartitionAssignment();

        // 如果consumer存在订阅的TopicPartition没有position,还需要恢复SubscriptionState中对应TopicPartitionState状态
        // 如果缺失commit,从服务端拉取commited,然后同步到position
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        long now = time.milliseconds();

        // 执行定时任务 HeartbeatTask和AutoCommitedTask
        // 从delayedTasks队列里拉取计划在当前时间前执行的定时任务
        client.executeDelayedTasks(now);

        // 尝试从completedFetch队列缓存里获取解析消息
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;

        fetcher.sendFetches();
        client.poll(timeout, now);
        return fetcher.fetchedRecords();
    }

    private void acquire() {
        ensureNotClosed();
        long threadId = Thread.currentThread().getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            // 检测到被其他线程占用,就抛出异常
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        refcount.incrementAndGet();
    }

    // 释放占用
    private void release() {
        if (refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);
    }

    private void updateFetchPositions(Set<TopicPartition> partitions) {
        // 刷新本地SubscriptionState.TopicPartitionState.commit的offset
        coordinator.refreshCommittedOffsetsIfNeeded();
        // 如果commit是null,就重置offset
        fetcher.updateFetchPositions(partitions);
    }
}

public class ConsumerNetworkClient implements Closeable {
    // NetworkClient
    private final KafkaClient client;
    // consumer之外的thread设置,表示要中断consumer线程
    private final AtomicBoolean wakeup = new AtomicBoolean(false);
    // 定时任务队列,主要是心跳任务
    // 底层实现是PriorityQueue
    private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
    // 缓冲队列
    private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
    // 集群元数据
    private final Metadata metadata;
    // 在unset中缓存的超时时长
    private final long unsentExpiryMs;
    // consumer每进入一个不可中断的method加1,退出时减1
    // 判断是否允许唤醒selector阻塞
    private int wakeupDisabledCount = 0;

    // 待发送的请求封装成ClientRequest,然后保存到unsent
    public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
        long now = time.milliseconds();
        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
        RequestHeader header = client.nextRequestHeader(api);
        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
        put(node, new ClientRequest(now, true, send, future));
        return future;
    }

    public void poll(RequestFuture<?> future) {
        while (!future.isDone()) // 同步阻塞等待future完成响应
            poll(Long.MAX_VALUE);
    }

    // 不阻塞等待也不唤醒
    public void pollNoWakeup() {
        disableWakeups();
        try {
            poll(0, time.milliseconds(), false);
        } finally {
            enableWakeups();
        }
    }

    private void poll(long timeout, long now, boolean executeDelayedTasks) {
        // 遍历处理unsent缓存中的请求
        trySend(now);

        // 比较取最小值,避免影响定时任务执行
        timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
        // 实际发送请求,检测wakeup标识为true就抛出异常中断consumer.poll方法
        clientPoll(timeout, now);
        now = time.milliseconds();

        // 如果连接断开,从unsent队列里删除后,再调用这些request的callback
        checkDisconnects(now);

        // 执行定时任务
        if (executeDelayedTasks)
            delayedTasks.poll(now);

        // 可能已经新建了某些node的连接,再尝试一把
        trySend(now);

        // 遍历unsent中已经超时的request,执行callback,然后从unsent里删除
        failExpiredRequests(now);
    }

    private boolean trySend(long now) {
        boolean requestsSent = false;
        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
            Node node = requestEntry.getKey();
            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                // 检测连接、在途请求队列数量
                if (client.ready(node, now)) {
                    // 复制到KafkaChannel的send
                    client.send(request, now);
                    iterator.remove();
                    requestsSent = true;
                }
            }
        }
        return requestsSent;
    }

    // 设置MAX超时时长,同步阻塞等待
    public void awaitMetadataUpdate() {
        int version = this.metadata.requestUpdate();
        do {
            poll(Long.MAX_VALUE);
        } while (this.metadata.version() == version);
    }

    // 等待unsent和InFlightRequests中的请求全部完成
    public void awaitPendingRequests(Node node) {
        while (pendingRequestCount(node) > 0)
            poll(retryBackoffMs);
    }

    public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler {

        // 请求是否已经完成
        private boolean isDone = false;
        // 成功响应,与exception互斥
        private T value;
        // 导致异常的类
        private RuntimeException exception;
        // 监听请求完成的情况,onSucess和onFailure方法
        private List<RequestFutureListener<T>> listeners = new ArrayList<>();

        @Override
        public void onComplete(ClientResponse response) {
            if (response.wasDisconnected()) {
                ClientRequest request = response.request();
                RequestSend send = request.request();
                ApiKeys api = ApiKeys.forId(send.header().apiKey());
                int correlation = send.header().correlationId();
                raise(DisconnectException.INSTANCE);
            } else {
                complete(response);
            }
        }

        // 适配将本实例的泛型类型T转换成S
        public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
            final RequestFuture<S> adapted = new RequestFuture<S>();
            addListener(new RequestFutureListener<T>() {
                @Override
                public void onSuccess(T value) {
                    adapter.onSuccess(value, adapted);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    adapter.onFailure(e, adapted);
                }
            });
            return adapted;
        }
    }
}

public class SubscriptionState {

    private enum SubscriptionType {
        NONE,
        AUTO_TOPICS,  // 指定topic名
        AUTO_PATTERN,  // 正则匹配topic名
        USER_ASSIGNED // 用户指定
    };

    private SubscriptionType subscriptionType; // 表示订阅的模式
    private Pattern subscribedPattern; // 正则匹配模式的表达式
    private final Set<String> subscription; // 所有订阅的topic名
    private final Set<String> groupSubscription; // 只有consumerGroup的leader才有,记录该consumerGroup订阅的所有topic; follower只有自己订阅的topic
    private final Set<TopicPartition> userAssignment; // 手动分配给consumer的topicPartition集合,与subscription互斥
    // 分配给当前consumer的分区
    // 记录每个topicPartition的消费状况,例如offset
    private final Map<TopicPartition, TopicPartitionState> assignment;
    // 是否需要分区分配, needsRejoin会根据这个判断处理
    // consumer订阅某个topic时设置成true
    private boolean needsPartitionAssignment;
    // 是否需要拉取offset,在异步提交offset或rebalance分区时候会设置成true
    private boolean needsFetchCommittedOffsets;
    private final OffsetResetStrategy defaultResetStrategy; // 重置offset策略
    private ConsumerRebalanceListener listener; // 监听分区分配操作

    private static class TopicPartitionState {
        private Long position; // 最近消费消息的offset
        private OffsetAndMetadata committed; // 最近commit的offset
        private boolean paused; // 是否处于暂停状态
        private OffsetResetStrategy resetStrategy; // 重置offset的策略
    }

    // comsumer订阅topic时候回被调用
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        setSubscriptionType(SubscriptionType.AUTO_TOPICS);
        // 缺省的listener是 NoOpConsumerRebalanceListener
        this.listener = listener;
        // 更新subscription、groupSubscription、needsPartitionAssignment=true
        changeSubscription(topics);
    }
}

public final class ConsumerCoordinator extends AbstractCoordinator {
    // consumer发送的JoinGroupRequest中包含了自身支持的PartitionAssigner,
    // GroupCoordinator从所有consumer的分配策略里选择一个,通知leader使用此策略做分区分配
    private final List<PartitionAssignor> assignors;
    private final Metadata metadata;
    private final SubscriptionState subscriptions;
    private final boolean autoCommitEnabled;
    private final AutoCommitTask autoCommitTask; // 自动提交offset的定时任务
    private final ConsumerInterceptors<?, ?> interceptors;
    private final boolean excludeInternalTopics; // 是否排除内部topic
    // 用来检测topic是否发生了分区数量的变化
    private MetadataSnapshot metadataSnapshot;

    // 构造方法
    public ConsumerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs,
                               List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions,
                               Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs,
                               OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled,
                               long autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics) {
        super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs);
        this.metadata = metadata;

        this.metadata.requestUpdate();
        this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
        this.subscriptions = subscriptions;
        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
        this.autoCommitEnabled = autoCommitEnabled;
        this.assignors = assignors;
        // 添加metadata更新监听
        addMetadataListener();

        if (autoCommitEnabled) {
            this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
            this.autoCommitTask.reschedule();
        } else {
            this.autoCommitTask = null;
        }

        this.interceptors = interceptors;
        this.excludeInternalTopics = excludeInternalTopics;
    }

    // Metadata更新监听
    private void addMetadataListener() {
        this.metadata.addListener(new Metadata.Listener() {
            @Override
            public void onMetadataUpdate(Cluster cluster) {
                // 正则匹配topic模式
                if (subscriptions.hasPatternSubscription()) {

                    final List<String> topicsToSubscribe = new ArrayList<>();
                    for (String topic : cluster.topics())
                        if (filterTopic(topic)) // 正则匹配
                            topicsToSubscribe.add(topic);
                    // 更新subscription、groupScription集合、assignment集合
                    subscriptions.changeSubscription(topicsToSubscribe);
                    // 更新元信息的topic集合
                    metadata.setTopics(subscriptions.groupSubscription());
                } else if (!cluster.unauthorizedTopics().isEmpty()) {
                    throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
                }

                // 非手动,即AUTO_TOPICS或AUTO_PATTERN
                if (subscriptions.partitionsAutoAssigned()) {
                    MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
                    // metadataSnapshot底层是map: topic -> partition数量
                    // 不相等说明分区产生了变化,需要rebalance
                    if (!snapshot.equals(metadataSnapshot)) {
                        metadataSnapshot = snapshot;
                        subscriptions.needReassignment();
                    }
                }

            }
        });
    }

    // JoinGroup的入口,即rebalance
    public void ensurePartitionAssignment() {
        // 只有自动分配分区的才需要rebalance
        if (subscriptions.partitionsAutoAssigned()) {
            if (subscriptions.hasPatternSubscription())
                // 订阅是正则匹配模式,还需要检查是否需要更新Metadata
                // 防止使用过期的Metadata进行rebalance
                client.ensureFreshMetadata();

            ensureActiveGroup();
        }
    }

    @Override
    protected void onJoinPrepare(int generation, String memberId) {
        // 如果开启了自动提交offset,则同步提交offset
        maybeAutoCommitOffsetsSync();

        ConsumerRebalanceListener listener = subscriptions.listener();
        Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
        // 调用分区重新分配的callback
        listener.onPartitionsRevoked(revoked);

        assignmentSnapshot = null;
        // groupSubscription收缩到自身的subscription
        // needsPartitionAssignment=true
        subscriptions.needReassignment();
    }

    // 收到JoinGroupResponse后,被指定为join leader的consumer,执行分配策略
    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        Map<String, ByteBuffer> allSubscriptions) {
        // 默认是range分配策略
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);

        Set<String> allSubscribedTopics = new HashSet<>();
        Map<String, Subscription> subscriptions = new HashMap<>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
            Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }

        // leader需要更新整个consumer group的订阅topic
        // 可能有新的topic加入,需要更新Metadata
        this.subscriptions.groupSubscribe(allSubscribedTopics);
        metadata.setTopics(this.subscriptions.groupSubscription());

        client.ensureFreshMetadata();
        assignmentSnapshot = metadataSnapshot;

        // 默认调用RangeAssignor
        // 分配结果: memberId -> 分配结果
        Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
        for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }

        return groupAssignment;
    }

    // 处理SyncGroupResponse
    @Override
    protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
        // 快照与最新的不一致,需要重新分区Assign
        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
            subscriptions.needReassignment();
            return;
        }

        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

        // 从服务端获取最近一次的offset标识
        subscriptions.needRefreshCommits();

        // 更新当前consumer订阅的topic
        subscriptions.assignFromSubscribed(assignment.partitions());

        // 重新启动AutoCommitTask定时任务
        if (autoCommitEnabled)
            autoCommitTask.reschedule();

        // rebalance后执行callback
        ConsumerRebalanceListener listener = subscriptions.listener();
        Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
        listener.onPartitionsAssigned(assigned);
    }
}

public class RangeAssignor extends AbstractPartitionAssignor {
    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, List<String>> subscriptions) {
        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {

            Collections.sort(consumersForTopic);

            // 每个consumer订阅partition数量
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            // 除不尽余数的partition单独分配给consumer
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
            }
        }
        return assignment;
    }
}

public abstract class AbstractPartitionAssignor implements PartitionAssignor {
    // 完成partition分配
    @Override
    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
        Set<String> allSubscribedTopics = new HashSet<>();
        Map<String, List<String>> topicSubscriptions = new HashMap<>();
        // 父类默认是去掉userData不处理的
        // 如果子类需要用到userData,就要自己实现PartitionAssignor接口的assign方法
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            List<String> topics = subscriptionEntry.getValue().topics();
            allSubscribedTopics.addAll(topics);
            topicSubscriptions.put(subscriptionEntry.getKey(), topics);
        }

        // 统计每个topic的分区数量
        Map<String, Integer> partitionsPerTopic = new HashMap<>();
        for (String topic : allSubscribedTopics) {
            Integer numPartitions = metadata.partitionCountForTopic(topic);
            if (numPartitions != null && numPartitions > 0)
                partitionsPerTopic.put(topic, numPartitions);
        }


        Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);

        Map<String, Assignment> assignments = new HashMap<>();
        for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
            assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
        return assignments;
    }
}

public interface PartitionAssignor {

    // 每个member的订阅信息
    class Subscription {
        private final List<String> topics; // 订阅的topic集合
        private final ByteBuffer userData;
    }

    class Assignment {
        private final List<TopicPartition> partitions; // 分区分配的结果
        private final ByteBuffer userData;
    }
}

public abstract class AbstractCoordinator implements Closeable {
    private final Heartbeat heartbeat; // 心跳任务的辅助类
    private final HeartbeatTask heartbeatTask; // 定时任务,发送心跳和处理响应
    protected final String groupId; // consumer group id
    protected final ConsumerNetworkClient client; // 网络通信

    private boolean needsJoinPrepare = true; // 是否需要发送joinGroupRequest前的准备操作
    // 是否需要重新发送JoinGroupRequest的条件之一
    // 一般收到response的错误码是需要rebalance时,会设置成true
    // JoinGroupResponse收到后设置成false
    // 缺省是true
    private boolean rejoinNeeded = true;

    protected Node coordinator; // 记录服务端GroupCoordinator所在的node节点
    protected String memberId; // 服务端GroupCoordinator返回的分配给consumer的唯一id
    protected int generation; // 可以理解每次rebalance的版本号,避免消费历史的rebalance请求

    private class HeartbeatTask implements DelayedTask {

        // 外部调用触发心跳任务
        public void reset() {
            long now = time.milliseconds();
            heartbeat.resetSessionTimeout(now);
            client.unschedule(this);

            if (!requestInFlight)
                client.schedule(this, now);
        }

        @Override
        public void run(final long now) {
            // 之前的心跳请求正常收到响应
            // 不处于正在等待rebalance分配结果的状态
            // 服务端的GroupCoordinator已连接
            if (generation < 0 || needRejoin() || coordinatorUnknown()) {
                return;
            }

            if (heartbeat.sessionTimeoutExpired(now)) {
                // 心跳超时则认为服务端GroupCoordinator已经宕机
                coordinatorDead();
                return;
            }

            if (!heartbeat.shouldHeartbeat(now)) {
                // 还没到下一次心跳间隔触发时间,不发送请求(等于本次任务结束),
                // 更新下一个触发时间点,再添加一个新的定时任务
                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
            } else {
                heartbeat.sentHeartbeat(now);
                requestInFlight = true; // 防止重复发送

                // 发送心跳请求
                RequestFuture<Void> future = sendHeartbeatRequest();
                // 注册该请求收到响应的callback
                future.addListener(new RequestFutureListener<Void>() {
                    // 发送完成后新增定时任务调度
                    @Override
                    public void onSuccess(Void value) {
                        requestInFlight = false;
                        long now = time.milliseconds();
                        heartbeat.receiveHeartbeat(now);
                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        requestInFlight = false;
                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
                    }
                });
            }
        }
    }

    // 处理心跳响应
    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {

        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            Errors error = Errors.forCode(heartbeatResponse.errorCode());
            if (error == Errors.NONE) {
                // 成功响应,传播成功事件
                future.complete(null);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                coordinatorDead();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                // 说明coordinator已经发起了rebalance
                // 触发发送JoinGroupRequest的标识
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.UNKNOWN_MEMBER_ID);
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

    protected void coordinatorDead() {
        if (this.coordinator != null) {
            // unsent缓存中的请求清空,并且调用异常的回调
            client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
            // 表示重新选择GroupCoordinator
            this.coordinator = null;
        }
    }

    // 查找服务端GroupCoordinator
    // 后面的rebalance、fetch消息和commit offset,都是和GroupCoordinator打交道
    public void ensureCoordinatorReady() {
        while (coordinatorUnknown()) {
            RequestFuture<Void> future = sendGroupCoordinatorRequest();
            // 阻塞等待future有响应
            client.poll(future);

            if (future.failed()) {
                if (future.isRetriable())
                    client.awaitMetadataUpdate();
                else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                coordinatorDead();
                // 通过sleep控制重试连接间隔
                time.sleep(retryBackoffMs);
            }
        }
    }

    // 处理服务端返回查找GroupCoordinator的应答
    // 赋值coordinator字段,连接coordinator,启动心跳任务
    private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {

        if (!coordinatorUnknown()) {
            // consumer已经找到GroupCoordinator了,不处理这个应答
            future.complete(null);
        } else {
            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
            Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
            if (error == Errors.NONE) {
                this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
                        groupCoordinatorResponse.node().host(),
                        groupCoordinatorResponse.node().port());

                client.tryConnect(coordinator);

                if (generation > 0)
                    heartbeatTask.reset();
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(error);
            }
        }
    }

    public void ensureActiveGroup() {
        if (!needRejoin())
            return;

        if (needsJoinPrepare) {
            onJoinPrepare(generation, memberId);
            needsJoinPrepare = false;
        }

        while (needRejoin()) {
            // 检查已经连接服务端的groupCoordinator
            ensureCoordinatorReady();

            // 如果还有发送给GroupCoordinator的请求,阻塞等待这些请求收到响应
            // 即等待unsent和InFlightRequests队列为空
            if (client.pendingRequestCount(this.coordinator) > 0) {
                client.awaitPendingRequests(this.coordinator);
                continue;
            }

            RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
            future.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    onJoinComplete(generation, memberId, protocol, value);
                    needsJoinPrepare = true;
                    heartbeatTask.reset();
                }

                @Override
                public void onFailure(RuntimeException e) {
                }
            });
            client.poll(future);

            if (future.failed()) {
                RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;
                // 通过sleep控制重试间隔
                time.sleep(retryBackoffMs);
            }
        }
    }

    // JoinGroupRequest设置到sent字段里
    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs,
                this.memberId, protocolType(), metadata());

        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                .compose(new JoinGroupResponseHandler());
    }

    // 处理JoinGroupResponse
    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                // 更新本地信息
                AbstractCoordinator.this.memberId = joinResponse.memberId();
                AbstractCoordinator.this.generation = joinResponse.generationId();
                AbstractCoordinator.this.rejoinNeeded = false;
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                // 判断自己是不是join leader
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                // 重试
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                coordinatorDead();
                future.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                    || error == Errors.INVALID_SESSION_TIMEOUT
                    || error == Errors.INVALID_GROUP_ID) {
                log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
                future.raise(error);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }

        // join leader的逻辑
        private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
            // 执行分配
            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                    joinResponse.members());
            // 发送请求
            SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
            return sendSyncGroupRequest(request);
        }
    }
}

public final class Heartbeat {
    private final long timeout; // 过期时间
    private final long interval; // 2次心跳的间隔,缺省3000

    private long lastHeartbeatSend; // 最后发送心跳请求的时间
    private long lastHeartbeatReceive; // 最后收到心跳响应的时间
    private long lastSessionReset; //心跳重置时间

    // 计算下次心跳发送时间
    public long timeToNextHeartbeat(long now) {
        long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);

        if (timeSinceLastHeartbeat > interval)
            return 0;
        else
            return interval - timeSinceLastHeartbeat;
    }

    // 判断是否超时
    public boolean sessionTimeoutExpired(long now) {
        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
    }
}

// 从服务端拉取消息
public class Fetcher<K, V> {
    // 负责网络通信
    private final ConsumerNetworkClient client;
    // 服务端收到FetchRequest后并不是立即响应,当返回的消息积累到至少minBytes个字节才响应, 提高网络有效负载
    // 服务端根据请求里的minBytes决定啥时候返回?
    private final int minBytes;
    // 等待FetchResponse的最长时长,服务端根据此事件决定何时响应
    private final int maxWaitMs;
    // 每次fetch的最大字节数
    private final int fetchSize;
    // 每次获取Record的最大数量
    private final int maxPollRecords;
    private final Metadata metadata; // Kafka集群元数据
    private final SubscriptionState subscriptions; // 记录每个TopicPartition的消费情况
    // FetchResponse先转换成CompletedFetch对象进入队列缓存,后续再解析响应消息
    private final List<CompletedFetch> completedFetches;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    // 保存了CompletedFetch解析后的结果集合
    // CompletedFetch里的消息只是ByteBuffer,经过offset+size确定长度,然后反序列拿到实际结构消息
    // 泛型的ConsumerRecord集合存放在nextInLineRecords里,也是最终KafkaConsumer返回的结果
    private PartitionRecords<K, V> nextInLineRecords = null;

    private static class PartitionRecords<K, V> {
        private long fetchOffset; // 记录了records中的第一个消息的offset
        private TopicPartition partition;
        private List<ConsumerRecord<K, V>> records; // 消息集合
    }

    // 创建FetchRequest请求
    private Map<Node, FetchRequest> createFetchRequests() {
        Cluster cluster = metadata.fetch();
        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
        // 遍历可以fetch的partition
        for (TopicPartition partition : fetchablePartitions()) {
            // leader副本
            Node node = cluster.leaderFor(partition);
            if (node == null) {
                metadata.requestUpdate();
            }
            // 如果leader副本对应的unsent或InFlightRequest队列里还有请求为发送
            // 就不对这个node请求fetch消息
            else if (this.client.pendingRequestCount(node) == 0) {
                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                if (fetch == null) {
                    fetch = new HashMap<>();
                    fetchable.put(node, fetch);
                }

                // 通过SubscriptionState 查找每个partition对应的position
                // 然后封装成PartitionData对象
                long position = this.subscriptions.position(partition);
                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
            }
        }

        Map<Node, FetchRequest> requests = new HashMap<>();
        for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
            // 将发往统一node的所有TopicPartition封装成一个FetchRequest对象
            Node node = entry.getKey();
            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
            requests.put(node, fetch);
        }
        return requests;
    }

    private Set<TopicPartition> fetchablePartitions() {
        // 先获取consumer订阅的partition
        Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
        // 下面2个队列如果存在就说明已经fetch过了,不用再fetch了
        if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
            fetchable.remove(nextInLineRecords.partition);
        for (CompletedFetch completedFetch : completedFetches)
            fetchable.remove(completedFetch.partition);
        return fetchable;
    }

    // 发送fetch请求
    public void sendFetches() {
        for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
            final FetchRequest request = fetchEntry.getValue();
            client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
                    .addListener(new RequestFutureListener<ClientResponse>() {
                        @Override
                        public void onSuccess(ClientResponse resp) {
                            FetchResponse response = new FetchResponse(resp.responseBody());
                            // 收到的FetchResponse缓存到completedFetches队里里
                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition partition = entry.getKey();
                                long fetchOffset = request.fetchData().get(partition).offset;
                                FetchResponse.PartitionData fetchData = entry.getValue();
                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
                            }
                        }
                    });
        }
    }

    // 处理completedFetches队列里的缓存
    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {
            // 需要进行rebalance,返回空
            return Collections.emptyMap();
        } else {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
            int recordsRemaining = maxPollRecords; // 一次去除消息个最大个数
            Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();

            while (recordsRemaining > 0) {
                // 先把completedFetches转移到nextInLineRecords里
                if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
                    if (!completedFetchesIterator.hasNext())
                        break;

                    CompletedFetch completion = completedFetchesIterator.next();
                    completedFetchesIterator.remove();
                    nextInLineRecords = parseFetchedData(completion);
                } else {
                    recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
                }
            }

            return drained;
        }
    }

    // 解析CompletedFetch
    private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
        TopicPartition tp = completedFetch.partition;
        FetchResponse.PartitionData partition = completedFetch.partitionData;
        long fetchOffset = completedFetch.fetchedOffset;
        int bytes = 0;
        int recordsCount = 0;
        PartitionRecords<K, V> parsedRecords = null;

        if (!subscriptions.isFetchable(tp)) {
        } else if (partition.errorCode == Errors.NONE.code()) {
            Long position = subscriptions.position(tp);

            ByteBuffer buffer = partition.recordSet;
            MemoryRecords records = MemoryRecords.readableRecords(buffer);
            List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
            boolean skippedRecords = false;
            for (LogEntry logEntry : records) {
                if (logEntry.offset() >= position) {
                    parsed.add(parseRecord(tp, logEntry));
                    bytes += logEntry.size();
                } else {
                    // 忽略在本地记录offset之前的消息
                    skippedRecords = true;
                }
            }

            recordsCount = parsed.size();

            if (!parsed.isEmpty()) {
                // 解析后的Record集合封装成PartitionRecords对象
                parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
                ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
            }
        }

        return parsedRecords;
    }

    private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords) {
        long position = subscriptions.position(partitionRecords.partition);
        if (partitionRecords.fetchOffset == position) {
            // 获取消息集合,最多maxRecords个
            List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
            long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; // 最后一个消息的offset

            // 追加消息集合
            List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
            if (records == null) {
                records = partRecords;
                drained.put(partitionRecords.partition, records);
            } else {
                records.addAll(partRecords);
            }

            // 更新对应topicPartition的position
            subscriptions.position(partitionRecords.partition, nextOffset);
            return partRecords.size();
        }

        return 0;
    }

    // 重置TopicPartition的position
    public void updateFetchPositions(Set<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
                continue; // 如果consumer有position就跳过重置

            if (subscriptions.isOffsetResetNeeded(tp)) {
                // 按照指定的策略重置position
                resetOffset(tp);
            } else if (subscriptions.committed(tp) == null) {
                // consumer没有commit的offset,按缺省策略重置
                subscriptions.needOffsetReset(tp);
                resetOffset(tp);
            } else {
                // 否则就将position更新为commit的offset
                long committed = subscriptions.committed(tp).offset();
                subscriptions.seek(tp, committed);
            }
        }
    }

    private void resetOffset(TopicPartition partition) {
        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
        final long timestamp;
        if (strategy == OffsetResetStrategy.EARLIEST)
            timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
        else if (strategy == OffsetResetStrategy.LATEST)
            timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
        else
            throw new NoOffsetForPartitionException(partition);

        // 根据时间戳向分区的leader node发送OffsetRequest
        long offset = listOffset(partition, timestamp);

        if (subscriptions.isAssigned(partition))
            this.subscriptions.seek(partition, offset); // 更新position
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容