kafka-streams源码阅读

总体介绍

源码目录如下:


  • errors:主要是一些自定义异常及异常处理的handler
  • kstream: 定义了kafka-streams最主要的结构kStream和kTable及其相关的一些操作
  • processor:定义拓扑结构的处理、状态存储等一系列具体的运行过程
  • state:状态存储的具体实现,通过这个可以实现窗口操作和其他需要存储状体的操作
  • 其他:kafka-streams的入口类、初始化、拓扑操作、配置项等。
    这里主要介绍kafka-streams的内部运行机制

kafka-streams状态转移

首先介绍一下kafka-streams的运行时状态转移

public enum State {
        CREATED(1, 2, 3, 5), REBALANCING(1, 2, 3, 4, 5), RUNNING(1, 3, 4, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR;
        private final Set<Integer> validTransitions = new HashSet<>();
        State(final Integer... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }
        public boolean isRunning() {
            return equals(RUNNING) || equals(REBALANCING);
        }
        public boolean isCreatedOrRunning() {
            return isRunning() || equals(CREATED);
        }
        public boolean isValidTransition(final State newState) {
            return validTransitions.contains(newState.ordinal());
        }
    }

kafka-streams定义了6个状态,并且定义了状态转移方式, 入下图:


image.png

当一个kafka-steams应用启动时,初始化为CREATED状态,然后会根据这个应用消费了哪些topic以及topic的partition分布进行消费者的动态平衡,进入REBALANCING状态。准备完成之后,应用进入RUNNING状态,消费者开始消费消息,消息进入拓扑流中流转。当应用被kill掉,kafka-streams应用会进入PENDING_SHUTDOWN状态,等待一些模块的close,如消费者的关闭等,最后进入NOT_RUNNING态。未正常启动或者运行出现问题也会进入NOT_RUNNING。运行出现异常就会进入ERROR状态。
kafka-streams的入口类中定义了如下结构:

    private final Object stateLock = new Object();
    private volatile State state = State.CREATED;
    private KafkaStreams.StateListener stateListener = null;

stateLock是状态锁,用于对象本身的状态同步操作;state初始化为CREATED,状态改变时这个就会改变;stateListener会在状态改变时进行一些处理操作。
入口类内部类StreamStateListener实现了StreamThread.StateListener,用于处理每个stream thread的状态变化,并根据stream thread状态变化更新整个应用的大状态。

KafkaStreams初始化

直接上源码

      private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
        // 获取系统时间
        final Time time = Time.SYSTEM;

        /*初始化processId,processId是在jvm中是唯一的,所有stream线程都使用同一个。
        它只在内部使用,不应向用户公开*/
        processId = UUID.randomUUID();

        //初始化配置
        this.config = config;

        // 应用id,必须要有值,多个kafka-streams应用可以共用同一个applicationId,实现分布式
        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);

        //为内部拓扑构建器设置应用id,拓扑构建器的介绍在processor部分会介绍
        internalTopologyBuilder.setApplicationId(applicationId);

        //设置client id
        String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
        if (clientId.length() <= 0)
            clientId = applicationId + "-" + processId;

        //日志的前缀
        this.logPrefix = String.format("stream-client [%s]", clientId);

        //jmx性能监控
        final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
            MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));

        final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
            .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                TimeUnit.MILLISECONDS);

        metrics = new Metrics(metricConfig, reporters, time);

        //初始化stream线程组和对应状态
        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
        final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
        GlobalStreamThread.State globalThreadState = null;

        //状态存储
        final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
        streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));

        //内部构建器构建全局拓扑
        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();

        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
            log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
        }

        //缓存大小 取配置值和总线程数中最大的值
        final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
            (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));

        //状态字典,用于管理每个线程拥有的task的状态
        stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
        if (globalTaskTopology != null) {
            //初始化全局线程和其状态
            final String globalThreadId = clientId + "-GlobalStreamThread";
            globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                        config,
                                                        clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
                                                        stateDirectory,
                                                        metrics,
                                                        time,
                                                        globalThreadId);
            globalThreadState = globalStreamThread.state();
        }

        //初始化每个stream线程
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new StreamThread(internalTopologyBuilder,
                                          config,
                                          clientSupplier,
                                          applicationId,
                                          clientId,
                                          processId,
                                          metrics,
                                          time,
                                          streamsMetadataState,
                                          cacheSizeBytes,
                                          stateDirectory);
            //设置线程状态
            threadState.put(threads[i].getId(), threads[i].state());
            //状态存储器提供者
            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
        }
        //stream状态监听器
        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
        if (globalTaskTopology != null) {
            globalStreamThread.setStateListener(streamStateListener);
        }
        for (StreamThread thread : threads) {
            thread.setStateListener(streamStateListener);
        }
        //全局状态存储器提供者
        final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
        //状态存储器提供者的包装类
        queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
        final String cleanupThreadName = clientId + "-CleanupThread";
        //状态存储目录的清洁工 定时清理状态存储空间
        stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(final Runnable r) {
                final Thread thread = new Thread(r, cleanupThreadName);
                thread.setDaemon(true);
                return thread;
            }
        });
    }

kafka-streams的启动

    public synchronized void start() throws IllegalStateException, StreamsException {
        log.debug("{} Starting Kafka Stream process.", logPrefix);
        //确保只启动了一次 就是判断当前状态是否为created,是就更新到running,否则抛异常
        validateStartOnce();
        //检查broker的版本兼容性,consumer、provider的版本需要跟broker的版本兼容
        checkBrokerVersionCompatibility();
        
        //全局线程的启动
        if (globalStreamThread != null) {
            globalStreamThread.start();
        }

        //stream线程组的启动
        for (final StreamThread thread : threads) {
            thread.start();
        }

        //定时状态清理器的启动
        final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
        stateDirCleaner.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                synchronized (stateLock) {
                    if (state == State.RUNNING) {
                        stateDirectory.cleanRemovedTasks(cleanupDelay);
                    }
                }
            }
        }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);

        log.info("{} Started Kafka Stream process", logPrefix);
    }

kafka-streams的停止

    public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
        log.debug("{} Stopping Kafka Stream process.", logPrefix);

        // 确保清理一次
        if (!checkFirstTimeClosing()) {
            return true;
        }

        //状态清理器的关闭
        stateDirCleaner.shutdownNow();
        //保存每个stream线程,防止死锁,因为有join操作
        //开启新线程去关闭所有任务线程
        final Thread shutdown = new Thread(new Runnable() {
            @Override
            public void run() {
                // signal the threads to stop and wait
                for (final StreamThread thread : threads) {
                    // avoid deadlocks by stopping any further state reports
                    // from the thread since we're shutting down
                    thread.setStateListener(null);
                    thread.close();
                }
                closeGlobalStreamThread();
                for (final StreamThread thread : threads) {
                    try {
                        if (!thread.stillRunning()) {
                            thread.join();
                        }
                    } catch (final InterruptedException ex) {
                        Thread.interrupted();
                    }
                }

                metrics.close();
                log.info("{} Stopped Kafka Streams process.", logPrefix);
            }
        }, "kafka-streams-close-thread");
        shutdown.setDaemon(true);
        shutdown.start();
        try {
            shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
        } catch (final InterruptedException e) {
            Thread.interrupted();
        }
        setState(State.NOT_RUNNING);
        return !shutdown.isAlive();
    }

介绍完了kafka-streams的主线程的运作,接下来几节会介绍各个部分的功能及其运作方式。

kafka streams拓扑流

入口类StreamsBuilder

public class StreamsBuilder {

    /** 拓扑结构 */
    private final Topology topology = new Topology();

    /** 拓扑构建器 */
    final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
    /** streams构建器 */
    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
    //构建stream  有多个重载函数 此处省略
    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    //构建table 有多个重载函数 此处省略
    public synchronized <K, V> KTable<K, V> table(final String topic,
                                                  final String queryableStoreName) ;
    //合并多个kstream
    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
        return internalStreamsBuilder.merge(streams);
    }
    //构建 实际上只是返回已经构造好的拓扑结构
    public synchronized Topology build() {
        return topology;
    }
}

Topology

kafka stream整个的工作流程实际上是一个有向无环图的结构,与Storm的Topology和Spark的DAG类似,定义了数据在各个处理单元内的处理逻辑。Topology的节点为类ProcessorNode(参考代码ProcessorNode),节点中存储了当前节点的processor(处理器逻辑)、子节点以及上下文。Topology的节点类型又分为三种:SourceNode、SinkNode、ProcessorNode(注:SourceNode和SinkNode都是继承自ProcessorNode,姑且看做是特殊的节点类型)。

  • SourceNode 图的起始节点,此节点可以产生数据给其子节点进行处理,子节点的个数可以大于1。
  • ProcessorNode 数据的处理节点,包括各种变换、存储等,处理完的数据传给其子节点,子节点的个数可以大于1。当把一个ProcessorNode添加到多个Node的子节点集合中时,就相当于这个节点有多个父节点。多个ProcessorNode节点可以串联。
  • SinkNode 图的终结节点,不能添加子节点,数据的重点,一般是将数据写入到topic。

InternalTopologyBuilder

Topology最底层都是由类InternalTopologyBuilder构造的。接下来具体分析InternalTopologyBuilder的实现。

public class InternalTopologyBuilder {

    private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);

    //偏移量topic模式的默认值
    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");

    //前置节点的初始值
    private static final String[] NO_PREDECESSORS = {};

    // 节点工厂,对于相同名称的节点只会有一个节点工厂
    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();

    // 状态仓库工厂,同名的状态仓库只会有一个工厂类
    private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();

    // 全局状态仓库工厂
    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();

    // source节点订阅的所有topic集合,不包含applicationId前缀的内部topic
    private final Set<String> sourceTopicNames = new HashSet<>();

    // 所有拓扑构建器创建的内部topic集合,这些topic只会在source、sink节点被用到
    private final Set<String> internalTopicNames = new HashSet<>();

    // 对source进行分组
    private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();

    // 存储每个source节点订阅的topic列表
    private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>();

    // 存储每个source节点订阅的topic的匹配模式列表
    private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();

    // 存储每个sink节点的topic的列表
    private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();

    // 存储每个topic匹配的模式,目的是确保topic被source接收
    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();

    // key是状态仓库名称,value是连接到这个状态仓库的所有的sourceNode订阅的topic集合
    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();

    // key是状态仓库名称,value是连接到这个状态仓库的所有的sourceNode订阅的topic匹配模式的集合
    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>();

    // 记录每个状态仓库的更新记录topic的名称
    private final Map<String, String> storeToChangelogTopic = new HashMap<>();

    // 所有的全局topic
    private final Set<String> globalTopics = new HashSet<>();

    // offset重置模式为earliest的topic集合
    private final Set<String> earliestResetTopics = new HashSet<>();

    // offset重置模式为latest的topic集合
    private final Set<String> latestResetTopics = new HashSet<>();

    // offset重置模式为earliest的topic的模式集合
    private final Set<Pattern> earliestResetPatterns = new HashSet<>();

    // offset重置模式为latest的topic的模式集合
    private final Set<Pattern> latestResetPatterns = new HashSet<>();

    // 节点连接器 并查集实现 用于节点分组
    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();

    // 订阅更新 用于在分区分配过程中通过模式匹配捕获订阅topic
    private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();

    private String applicationId = null;

    private Pattern topicPattern = null;

    // 节点分组
    private Map<Integer, Set<String>> nodeGroups = null;

    // 状态仓库工厂类,用于生成状态仓库
    private static class StateStoreFactory {
        public final Set<String> users;

        public final StateStoreSupplier supplier;

        StateStoreFactory(final StateStoreSupplier supplier) {
            this.supplier = supplier;
            users = new HashSet<>();
        }
    }

    // 抽象的节点工厂类
    private static abstract class NodeFactory {
        final String name;
        final String[] predecessors;

        NodeFactory(final String name,
                    final String[] predecessors) {
            this.name = name;
            this.predecessors = predecessors;
        }

        // 构建内部processor节点
        public abstract ProcessorNode build();

        // 生成抽象processor节点
        abstract AbstractNode describe();
    }

    // processor节点工厂
    private static class ProcessorNodeFactory extends NodeFactory {
        ……

        //添加状态仓库
        public void addStateStore(final String stateStoreName) {
            stateStoreNames.add(stateStoreName);
        }

        @Override
        public ProcessorNode build() {
            return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
        }

        @Override
        Processor describe() {
            return new Processor(name, new HashSet<>(stateStoreNames));
        }
    }

    // source节点工厂
    private class SourceNodeFactory extends NodeFactory {
        ……

        // 获取匹配的topic
        List<String> getTopics(final Collection<String> subscribedTopics) {
            ……
            return matchedTopics;
        }

        @Override
        public ProcessorNode build() {
            ……
        }

        //判断topic是否匹配
        private boolean isMatch(final String topic) {
            return pattern.matcher(topic).matches();
        }

        @Override
        Source describe() {
            ……
        }
    }

    // sink节点工厂
    private class SinkNodeFactory<K, V> extends NodeFactory {
        ……

        @Override
        public ProcessorNode build() {
            ……
        }

        @Override
        Sink describe() {
            return new Sink(name, topic);
        }
    }

    // 添加source节点  根据offsetReset对source节点进行分组,timestampExtractor用于处理消息中的时间戳
    public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                final String name,
                                final TimestampExtractor timestampExtractor,
                                final Deserializer keyDeserializer,
                                final Deserializer valDeserializer,
                                final String... topics) {
        // 检测topic参数
        if (topics.length == 0) {
            throw new TopologyException("You must provide at least one topic");
        }
        // 检测name参数
        Objects.requireNonNull(name, "name must not be null");
        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }

        //添加topic,根据offsetReset分组
        for (final String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            validateTopicNotAlreadyRegistered(topic);
            maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
            sourceTopicNames.add(topic);
        }

        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
        nodeToSourceTopics.put(name, Arrays.asList(topics));
        nodeGrouper.add(name);
    }

    // 添加sink节点
    public final <K, V> void addSink(final String name,
                                     final String topic,
                                     final Serializer<K> keySerializer,
                                     final Serializer<V> valSerializer,
                                     final StreamPartitioner<? super K, ? super V> partitioner,
                                     final String... predecessorNames) {
        // 检测参数
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }

        // 添加前置节点
        for (final String predecessor : predecessorNames) {
            if (predecessor.equals(name)) {
                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
            }
            if (!nodeFactories.containsKey(predecessor)) {
                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
            }
            if (nodeToSinkTopic.containsKey(predecessor)) {
                throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
            }
        }

        nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topic, keySerializer, valSerializer, partitioner));
        nodeToSinkTopic.put(name, topic);
        nodeGrouper.add(name);
        nodeGrouper.unite(name, predecessorNames);
    }

    // 添加处理节点
    public final void addProcessor(final String name,
                                   final ProcessorSupplier supplier,
                                   final String... predecessorNames) {
        // 参数检查
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(supplier, "supplier must not be null");
        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }

        // 添加前置节点
        for (final String predecessor : predecessorNames) {
            if (predecessor.equals(name)) {
                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
            }
            if (!nodeFactories.containsKey(predecessor)) {
                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
            }
        }

        nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
        nodeGrouper.add(name);
        nodeGrouper.unite(name, predecessorNames);
    }

    // 添加状态仓库
    public final void addStateStore(final StateStoreSupplier supplier,
                                    final String... processorNames) {
        Objects.requireNonNull(supplier, "supplier can't be null");
        if (stateFactories.containsKey(supplier.name())) {
            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
        }

        stateFactories.put(supplier.name(), new StateStoreFactory(supplier));

        if (processorNames != null) {
            for (final String processorName : processorNames) {
                connectProcessorAndStateStore(processorName, supplier.name());
            }
        }
    }

    // 添加全局状态仓库
    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                     final String sourceName,
                                     final TimestampExtractor timestampExtractor,
                                     final Deserializer keyDeserializer,
                                     final Deserializer valueDeserializer,
                                     final String topic,
                                     final String processorName,
                                     final ProcessorSupplier stateUpdateSupplier) {
        ……
    }

    // 将processor跟状态仓库连接起来,就是在工厂类里面添加状态仓库
    public final void connectProcessorAndStateStores(final String processorName,
                                                     final String... stateStoreNames) {
        Objects.requireNonNull(processorName, "processorName can't be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null");
        if (stateStoreNames.length == 0) {
            throw new TopologyException("Must provide at least one state store name.");
        }
        for (final String stateStoreName : stateStoreNames) {
            connectProcessorAndStateStore(processorName, stateStoreName);
        }
    }

    /* 对节点进行分组
       nodeGrouper已经存储了所有的节点,source在nodeGrouper中只会充当root节点,而sink和processor节点在加入nodeGrouper中时,
       都会与其前置节点进行unite操作。最终的结果就是分成了若干个不相关的流
     */
    private Map<Integer, Set<String>> makeNodeGroups() {
        final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
        final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();

        int nodeGroupId = 0;

        // 获取所有的source节点
        final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
        allSourceNodes.addAll(nodeToSourcePatterns.keySet());

        // 对source节点进行排序,nodeGrouper中根节点相同的source放到同一组
        for (final String nodeName : Utils.sorted(allSourceNodes)) {
            final String root = nodeGrouper.root(nodeName);
            Set<String> nodeGroup = rootToNodeGroup.get(root);
            if (nodeGroup == null) {
                nodeGroup = new HashSet<>();
                rootToNodeGroup.put(root, nodeGroup);
                nodeGroups.put(nodeGroupId++, nodeGroup);
            }
            nodeGroup.add(nodeName);
        }

        // 对非source节点进行分组 排序,根据根节点进行排序
        for (final String nodeName : Utils.sorted(nodeFactories.keySet())) {
            if (!nodeToSourceTopics.containsKey(nodeName)) {
                final String root = nodeGrouper.root(nodeName);
                Set<String> nodeGroup = rootToNodeGroup.get(root);
                if (nodeGroup == null) {
                    nodeGroup = new HashSet<>();
                    rootToNodeGroup.put(root, nodeGroup);
                    nodeGroups.put(nodeGroupId++, nodeGroup);
                }
                nodeGroup.add(nodeName);
            }
        }

        return nodeGroups;
    }

    // 构建拓扑
    public synchronized ProcessorTopology build(final Integer topicGroupId) {
        final Set<String> nodeGroup;
        ……
        return build(nodeGroup);
    }

    /**
     * Builds the topology for any global state stores
     * @return ProcessorTopology
     */
    public synchronized ProcessorTopology buildGlobalStateTopology() {
        final Set<String> globalGroups = globalNodeGroups();
        if (globalGroups.isEmpty()) {
            return null;
        }
        return build(globalGroups);
    }

    private Set<String> globalNodeGroups() {
        ……
    }

    // 根据节点分组构建拓扑 nodeGroup为空时会构建所有节点
    private ProcessorTopology build(final Set<String> nodeGroup) {
        final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
        final Map<String, ProcessorNode> processorMap = new HashMap<>();
        final Map<String, SourceNode> topicSourceMap = new HashMap<>();
        final Map<String, SinkNode> topicSinkMap = new HashMap<>();
        final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();

        // 根据拓扑排序创建processor节点关系图 nodeFactories已经是拓扑排序后的
        for (final NodeFactory factory : nodeFactories.values()) {
            if (nodeGroup == null || nodeGroup.contains(factory.name)) {
                // 创建processor节点
                final ProcessorNode node = factory.build();
                processorNodes.add(node);
                processorMap.put(node.name(), node);

                if (factory instanceof ProcessorNodeFactory) {
                    // ProcessorNode工厂
                    // 获取当前节点的前置节点,将节点添加到前置节点的子节点集合中
                    for (final String predecessor : ((ProcessorNodeFactory) factory).predecessors) {
                        final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor);
                        predecessorNode.addChild(node);
                    }
                    // 处理节点的状态仓库
                    for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
                        // 当stateStoreMap不包含stateStoreName才处理 去重,一个状态仓库只会被添加一次
                        if (!stateStoreMap.containsKey(stateStoreName)) {
                            final StateStore stateStore;

                            if (stateFactories.containsKey(stateStoreName)) {
                                final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
                                stateStore = supplier.get();

                                // remember the changelog topic if this state store is change-logging enabled
                                if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
                                    final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName);
                                    storeToChangelogTopic.put(stateStoreName, changelogTopic);
                                }
                            } else {
                                stateStore = globalStateStores.get(stateStoreName);
                            }

                            stateStoreMap.put(stateStoreName, stateStore);
                        }
                    }
                } else if (factory instanceof SourceNodeFactory) {
                    // SourceNodeFactory
                    final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
                    // 获取所有订阅的topic
                    final List<String> topics = (sourceNodeFactory.pattern != null) ?
                            sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
                            sourceNodeFactory.topics;
                    // 每个topic对应一个source node
                    for (final String topic : topics) {
                        if (internalTopicNames.contains(topic)) {
                            // prefix the internal topic name with the application id
                            topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
                        } else {
                            topicSourceMap.put(topic, (SourceNode) node);
                        }
                    }
                } else if (factory instanceof SinkNodeFactory) {
                    // SinkNodeFactory
                    final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory;

                    // 获取节点的前置节点,并将节点加入到前置节点的子节点
                    for (final String predecessor : sinkNodeFactory.predecessors) {
                        processorMap.get(predecessor).addChild(node);
                        if (internalTopicNames.contains(sinkNodeFactory.topic)) {
                            // prefix the internal topic name with the application id
                            topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
                        } else {
                            topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
                        }
                    }
                } else {
                    throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                }
            }
        }

        // processorNodes:节点 topicSourceMap:topic->Source topicSinkMap:topic->Sink 状态仓库列表 状态仓库更新日志topic 全局状态仓库
        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
    }

    /**
     * Get any global {@link StateStore}s that are part of the
     * topology
     * @return map containing all global {@link StateStore}s
     */
    public Map<String, StateStore> globalStateStores() {
        return Collections.unmodifiableMap(globalStateStores);
    }

    /**
     * Returns the map of topic groups keyed by the group id.
     * A topic group is a group of topics in the same task.
     *
     * @return groups of topic names
     */
    public synchronized Map<Integer, TopicsInfo> topicGroups() {
        ……
    }

    //抽象的拓扑节点 包含名称、前置节点、后置节点
    public abstract static class AbstractNode implements TopologyDescription.Node {
        ……
    }

    // Source节点 继承抽象节点
    public final static class Source extends AbstractNode implements TopologyDescription.Source {
        ……
    }

    // processor节点
    public final static class Processor extends AbstractNode implements TopologyDescription.Processor {
        ……
    }

    //sink节点
    public final static class Sink extends AbstractNode implements TopologyDescription.Sink {
        ……
    }

    // 拓扑的子拓扑结构 包含若干个节点
    public final static class Subtopology implements org.apache.kafka.streams.TopologyDescription.Subtopology {
        ……
    }

    // topic信息 包含sinkTopics、sourceTopics、stateChangelogTopics、repartitionSourceTopics
    public static class TopicsInfo {
        ……
    }

    // 拓扑描述 上述几个内部类都服务于这个类。此类主要用于整个拓扑结构的元表示
    public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
        ……
    }

}

ProcessorNode

每个Topology都是由ProcessorNode构成的,ProcessorNode又可以细分为SourceNode、SinkNode、processor(就是普通的ProcessorNode)。

  • SourceNode 没有上一层的节点,将消息发送给拓扑的上下文,以供下一层处理。
  • SinkNode 没有下一层节点,将所有上层的消息sink出去。
  • Processor, 即普通的ProcessorNode。这个节点是消息的处理器单元,从上一层节点获取数据,处理完成之后传递给下一层的子节点。内部有两个重要的操作process和punctuate。process实现了对消息的处理、转化等操作。另外一个则比较特殊,节点上下文会根据配置按照程序处理的时间点或者消息中带的时间点来调用punctuate。比如当前配置的是程序时间,则每当程序执行若干时间之后,会自动调用节点中的punctuate方法;当配置的是消息时间时,则消息中的时间戳每增加若干个时间单位之后,会调用节点中的punctuate方法。这个操作用于完成一些特殊的状态操作,比如刷盘等。

Processor

前面两节讲的都是kafka-streams内部的结构,不易扩展。因此kafka-streams对外暴露了Processor接口,用于实现拓扑流的内部节点。Processor有以下几个接口方法:

public interface Processor<K, V> {

    /**
     * 用给定的上下文多processor进行初始化。框架本身确保当拓扑流初始化时,
     * 它包含的每个processor都会执行这个方法一次
     */
    void init(ProcessorContext context);

    /**
     * 处理给定的k-v对
     */
    void process(K key, V value);

    /**
     * 定时调用的方法,新版本已经被标记为废弃,不建议使用
     */
    @Deprecated
    void punctuate(long timestamp);

    /**
     * 关闭
     */
    void close();
}

StateStore

StateStore是一个存储引擎接口,用来管理processor内部的状态。有一些基于窗口的计算或者聚合计算就需要使用到StateStore来存储状态。

public interface StateStore {

    /**
     * 存储器名称
     */
    String name();

    /**
     * 初始化状态存储
     */
    void init(ProcessorContext context, StateStore root);

    /**
     * 清理所有的缓存数据
     */
    void flush();

    /**
     * 关闭存储器
     */
    void close();

    /**
     * 返回这个存储器是否持久化
     */
    boolean persistent();

    /**
     * 能否进行读写操作
     */
    boolean isOpen();
}

kakfa-streams代码中在StateStore的基础上又封装了KeyValueStore、WindowStore、SessionStore、WrappedStateStore等基础接口,internals包下实现多个状态存储类。

ProcessorContext

Processor的运行时状态被封装成了ProcessorContext接口

public interface ProcessorContext {

    /**
     * 应用id,多个实例使用同一个id实现分布式
     */
    String applicationId();

    /**
     * 任务id
     */
    TaskId taskId();

    /**
     * key序列化
     */
    Serde<?> keySerde();

    /**
     * value序列化
     */
    Serde<?> valueSerde();

    /**
     * 状态存储目录,用于状态持久化
     */
    File stateDir();

    /**
     * 度量
     */
    StreamsMetrics metrics();

    /**
     * 状态存储器的注册
     */
    void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);

    /**
     * 根据状态内存储器的名称获取存储器
     */
    StateStore getStateStore(String name);

    /**
     * processors周期性操作的调度器。这个方法会在processor初始化或者processor执行process的时候按照周期执行。
     */
    Cancellable schedule(long interval, PunctuationType type, Punctuator callback);

    /**
     * 上一个函数的较早版本已经废弃
     */
    @Deprecated
    void schedule(long interval);

    /**
     * 把一个key/value对从当前节点传递给下游节点
     */
    <K, V> void forward(K key, V value);

    /**
     * 把一个key/value对从当前节点传递给下游节点中的某一个
     */
    <K, V> void forward(K key, V value, int childIndex);

    /**
     * 把一个key/value对从当前节点传递给下游节点中的某一个
     */
    <K, V> void forward(K key, V value, String childName);

    /**
     * 提交所有操作
     */
    void commit();
}

任务执行

StreamThread

StreamThread是处理任务的线程,每个kafka-streams应用可以有多个StreamThread,通过NUM_STREAM_THREADS_CONFIG参数指定。StreamThread有如下几个状态:CREATED, RUNNING, PARTITIONS_REVOKED, ASSIGNING_PARTITIONS, PENDING_SHUTDOWN, DEAD。每个StreamThread可能会包含多个实际执行的StreamTask。StreamTask是处理从源处获取到的消息的独立单元,而StreamThread对StreamTask进行调度。StreamThread本身是单独的线程,并且里面有多个StreamTask,是整个kafka-streams并发处理的基础。Kafka Stream的并行模型基于Kafka的分区机制和Rebalance机制,可以在线动态调整StreamTask的执行。

StreamThread执行过程

  1. StreamThread初始化,包括一些元数据和成员变量的初始化
  2. 循环处理数据,具体看代码
    private void runLoop() {
        long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
        // 初始化consumer,每个线程只会有一个consumer,当partition只有一个时,只会有一个线程在消费数据
        // rebalanceListener用于发生动态平衡消费者时的回调
        consumer.subscribe(sourceTopicPattern, rebalanceListener);

        while (stillRunning()) {
            timerStartedMs = time.milliseconds();

            // kafka的consumer poll到消息
            final ConsumerRecords<byte[], byte[]> records = pollRequests();
            if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
                streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
                // 按照partition将records分给activeTasks
                addRecordsToTasks(records);
                // task运行,处理records
                final long totalProcessed = processAndPunctuateStreamTime(activeTasks, recordsProcessedBeforeCommit);
                if (totalProcessed > 0) {
                    final long processLatency = computeLatency();
                    streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
                        timerStartedMs);
                    recordsProcessedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,
                        processLatency, commitTimeMs);
                }
            }

            // 这个函数的作用是每个一段时间进行一段特殊处理,调用task里面的punctuate函数
            maybePunctuateSystemTime();
            maybeCommit(timerStartedMs);
            maybeUpdateStandbyTasks(timerStartedMs);
        }
        log.info("{} Shutting down at user request", logPrefix);
    }

循环开始之前,线程内部的消费者会根据sourceTopicPattern来订阅topics。循环开始后,
消费者会从topics中poll消息,然后按照partition进行分组,再将每个partition的消息分配给独立的执行单元StreamTask。task执行完毕再进行commit等操作。

StreamTask

StreamTask负责消息的处理,内部有如下几个关键的操作:

  • resume,初始化拓扑。当exactly_once语义被启用时,会使producer启动事务
    public void resume() {
        log.debug("{} Resuming", logPrefix);
        if (eosEnabled) {
            producer.beginTransaction();
            transactionInFlight = true;
        }
        initTopology();
    }
  • process,处理单个消息。
    public boolean process() {
        // 获取下一个消息
        final StampedRecord record = partitionGroup.nextRecord(recordInfo);

        // if there is no record to process, return immediately
        if (record == null) {
            return false;
        }

        try {
            // 通过拓扑的SourceNode处理这个消息
            final ProcessorNode currNode = recordInfo.node();
            final TopicPartition partition = recordInfo.partition();

            log.trace("{} Start processing one record [{}]", logPrefix, record);

            // 设置上下文
            updateProcessorContext(record, currNode);
            // 当前拓扑节点处理消息
            /*
            * processor节点的process实现是类似于递归的,当前节点处理完就直接传递这条消息给子节点,
            * 直到最终传递给sink节点,并且处理完成
            * */
            currNode.process(record.key(), record.value());

            log.trace("{} Completed processing one record [{}]", logPrefix, record);

            // 处理完一条就更新consumedOffsets
            consumedOffsets.put(partition, record.offset());
            commitOffsetNeeded = true;

            // after processing this record, if its partition queue's buffered size has been
            // decreased to the threshold, we can then resume the consumption on this partition
            if (recordInfo.queue().size() == maxBufferedSize) {
                consumer.resume(singleton(partition));
            }
        } catch (final KafkaException e) {
            throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d",
                id(),
                processorContext.currentNode().name(),
                record.topic(),
                record.partition(),
                record.offset()
            ), e);
        } finally {
            // 处理完成重置上下文状态
            processorContext.setCurrentNode(null);
        }

        return true;
    }
  • punctuate,processor有定时执行的操作,通过这个函数调度执行
    public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
        if (processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%s Current node is not null", logPrefix));
        }

        // 更新上下文,将上下文中的节点设置成为参数node。因此此操作前必须要判断上下文节点中的当前节点,防止冲突
        updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);

        if (log.isTraceEnabled()) {
            log.trace("{} Punctuating processor {} with timestamp {} and punctuation type {}", logPrefix, node.name(), timestamp, type);
        }

        try {
            // 执行punctuate,内部使用线程单独执行,因此多个node的punctuate互不阻塞
            node.punctuate(timestamp, punctuator);
        } catch (final KafkaException e) {
            throw new StreamsException(String.format("%s Exception caught while punctuating processor '%s'", logPrefix,  node.name()), e);
        } finally {
            processorContext.setCurrentNode(null);
        }
    }
  • commit,提交。每个task执行完成进行提交操作,只有提交之后才会执行下一个task,task之间就不会互相影响。通过task的commit,拓扑的独立执行来控制流量。

同一StreamTask包含了一个子Topology的所有Processor,所有处理逻辑都在同一线程内完成,避免了不必的网络通信开销,从而提高了效率。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容