Flink-1.10 源码笔记 JobGraph生成

Flink源码 - 生成JobGraph

什么是jobGraph

如果没了解StreamGraph单击我

JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

在jobgraph中有一个组成“元素”:JobVertex:jobVertex用于产生intermediateDataset,并通过jobEdge串联不同的jobVertex同时也是将operator chain的"关键点"。 jobVertex是从job层面对task进行抽象。

在获取到StreamGraph后,继续分析,如何通过StreamGraph来生成JobGraph,StreamGraph和JobGraph都是在Client端生成的,我们可以通过debug断点观察Grpah的生成过程

JobGraph的入口在StreamingJobGraphGenerator.createJobGraph()方法中
    //创建JobGraph
    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return createJobGraph(streamGraph, null);
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {

        //StreamingJobGraphGenerator 传入StreamGraph,以及jobId,初始化StreamingJobGraphGenerator
        //并创建JobGraph对象
        //createJobGraph  具体JobGraph生成的逻辑

        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

进入到StreamingJobGraphGenerator方法中,这个方法初始化了一些需要的参数,并通过,JobID和JobName创建了一个JobGraph,

private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {

        this.streamGraph = streamGraph;
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());

        this.jobVertices = new HashMap<>();
        // 已经构建的JobVertex的id集合
        this.builtVertices = new HashSet<>();
        // 保存chain信息,部署时用来构建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)
        this.chainedConfigs = new HashMap<>();
        // 所有节点的配置信息,id -> StreamConfig
        this.vertexConfigs = new HashMap<>();
        // 保存每个节点的名字,id -> chainedName
        this.chainedNames = new HashMap<>();
        this.chainedMinResources = new HashMap<>();
        this.chainedPreferredResources = new HashMap<>();
        this.chainedInputOutputFormats = new HashMap<>();
        // 物理边集合(排除了chain内部的边), 按创建顺序排序
        this.physicalEdgesInOrder = new ArrayList<>();

        jobGraph = new JobGraph(jobID, streamGraph.getJobName());
    }

在调用createJobGraph这个方法时候开始构建JobGraph图,进入到createJobGraph方法中

在这里可以分为6 步

  1. 广度遍历StreamGraph,为每个SteramNode生成hash值,这个算法能够保证如果拓补图没有发生改变,每次生成的hash值都是一样的,同时会生成用户指定hash值(即map.setUidHash("hash")的值),
    这里是根据StreamGraph的配置生成的,给每个StreamNode生成一个长度为16的字节数组的hash值,这个散列值用于生成JobGraph的JobVertex的id,在checkpoint恢复的时候会根据jobVertexId为依据,所以在重启的任务的时候,对于对于相同的任务,其各JobVertexID能够保持不变,而StreamGraph中各个StreamNode的ID,就是其包含的StreamTransformation的ID,而StreamTransformation的ID是在对数据流中的数据进行转换的过程中,通过一个静态的累加器生成的,比如有多个数据源时,每个数据源添加的顺序不一致,则有可能导致相同数据处理逻辑的任务,就会对应于不同的ID,所以为了得到确定的ID,在进行JobVertexID的产生时,需要以一种确定的方式来确定其值,要么是通过用户为每个ID直接指定对应的一个散列值,要么参考StreamGraph中的一些特征,为每个JobVertex产生一个确定的ID。
    如果用户指定了hash值,则会基于用户的指定的值生成一个长度为16的字节数组,如果用户没有指定,则会由flink生成一个长度为16的字节数组
  2. 构建任务链,属于Flink对我们的任务进行调优,在这里会遍历所有的source节点开始构建chain,当条件满足的时候会将两个operator放到一个线程并行执行,这也可以减少网络的传输,并且在chain中的operator之间传输数据也不需要进行序列化和反序列化,可以提交任务执行效率
  3. 将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中(出边在构建chain的时候已经写入conf中).
    4.为每个JobVertex指定 slot sharing group,以及CoLocationGroup
    5.设置托管内存分数(权重比)
    6.配置检查点,保存点恢复模式以及添加用户分布式缓存的文件
private JobGraph createJobGraph() {

        // 创建之前进行预检查
        preValidate();

        // make sure that all vertices start immediately
        //设置调度模式,    --  todo 默认是所有job都启动
        jobGraph.setScheduleMode(streamGraph.getScheduleMode());

        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        /**
         * todo defaultStreamGraphHasher 使用的是StreamGraphHasherV2类,
         *      legacyStreamGraphHashers使用的是StreamGraphUserHashHasher类
         *      两个类都实现了StreamGraphHasher接口,在使用userHash的时候,是调用
         *      我们在算子后面调用.setUidHash("hash")传入的值生成的
         */
        // todo 广度优先遍历遍历StreamGraph,并且为每个SteamNode生成散列值, 这里的散列值产生算法,可以保证如
        //   果提交的拓扑没有改变,则每次生成的散列值都是一样的。一个StreamNode的ID对应一个散列值。
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        // 为了保持兼容性创建的hash, 这里是user 传入的setUidHash
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        //主要从source构建任务链
        setChaining(hashes, legacyHashes);

        // 将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中
        // (出边集合已经在setChaining的时候写入了)
        setPhysicalEdges();

        //设置slot共享和coLocation。同一个coLocationGroup的task需要在同一个slot中运行
        // CoLocationGroup =设置标识共存组的键。具有相同共定位键的操作符将由调度器将它们对应的子任务放到相同的槽中
        setSlotSharingAndCoLocation();

        // 设置托管内存分数(权重比) 
        setManagedMemoryFraction(
            Collections.unmodifiableMap(jobVertices),
            Collections.unmodifiableMap(vertexConfigs),
            Collections.unmodifiableMap(chainedConfigs),
            id -> streamGraph.getStreamNode(id).getMinResources(),
            id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());


        // 配置检查点
        configureCheckpointing();

        // 设置保存点恢复设置
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        // 添加用户分布式缓存
        JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

        // set the ExecutionConfig last when it has been finalized
        try {
            //设置 job execution 配置
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                    "This indicates that non-serializable types (like custom serializers) were registered");
        }

        return jobGraph;
    }

主要分析一下setChaining方法中的逻辑

setChaining这里调用了createChain方法,传入了source节点和节点在chain的索引位置,以及一个OperatorChainInfo对象,这个对用

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        // 从每个source节点构建 任务链, 如果有多个source遍历每个source调用createChain方法
        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
            createChain(
                    sourceNodeId,   //source 节点id
                    0,   // chain的索引位置 初始为0
                // 一个私有类 递归调用期间帮助维护操作符链的信息
                //  sourceNodeId : chain的起始node id 为source
                //  hashes, legacyHashes : 是前面产生的StreamNode的hash值
                    new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
        }
    }

createChain方法

1.如果startNodeId已经创建chain,那么直接返回一个空list
2.如果没有构建,则开始构建chian

2.1 首先会循环当前StreamNode的所有StreamEdge(StreamEdge记录了下游节点id,根据StreamGraph可以获 取指定id的StreamNode),通过isChainable方法去判断两个节点是否能够chain在一起,如果能chain在一 起,将edge存入chainableOutputs集合,否则放入nonChainableOutputs集合
2.2 循环将chainableOutputs集合,如果有值,递归的调用createChain方法,当循环将chainableOutputs集 合循环完,会循环nonChainableOutputs集合递归调用createChain方法,开始创建新的chain,这时候是构 建以下游StreamNode为顶点开始构建chain
2.3 如果当前节点是chain的起始节点,会创建一个JobVertex,然后循环chain的出边(outEdge)调用connect 方法,将它指向的出边(outEdge)通过JobEdge进行连接,并将配置信息通过StreamConfig提供的 setTransitiveChainedTaskConfigs方法将配置信息保存到JobVertex的configuration中
如果是chain的中间节点,那么会将配置信息写入到StreamConfig中,并写入到chainedConfigs中,其数据 结构为 Map<Integer, Map<Integer, StreamConfig>> key为 chain的顶点,value为每个中间节点 配置信息

private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
        Integer startNodeId = chainInfo.getStartNodeId();
        // builtVertices存放了已经被构建了的StreamNode ID,避免重复操作
        if (!builtVertices.contains(startNodeId)) {

            // 过渡用的出边集合, 用来生成最终的 JobEdge,
            // 注意:存在某些StreamNode会连接到一起,比如source->map->flatMap,如果这几个StreamNode连接到一起,
            // 则transitiveOutEdges是不包括 chain 内部的边,既不包含source->map的StreamEdge的
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

            // 可以与当前节点链接的StreamEdge
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // 不可以与当前节点链接的StreamEdge
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

            // 获取当前处理node
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
            // 将当前节点的出边分成 chainable 和 nonChainable 两类
            // 分类可以被chain的edge和不可被chain的edge,使用isChainable的方法判断
            // 如果能chain 放入chainableOutputs集合,否则放入nonChainableOutputs集合
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }

            // todo 递归调用
            // 对于每个可连接的StreamEdge,递归调用其目标StreamNode,startNodeId保持不变,但是chainIndex会加1
            for (StreamEdge chainable : chainableOutputs) {
                // 如果是可被chain的StreamEdge,递归调用createChain方法
                // 注意currentNode是chainable.getTargetId(),也就是(downStreamNode)下游节点id
                // 递归直到currentNode的out edge为不可chain的edge,会执行下一段for循环,不可chain的边被加入transitiveOutEdges,最终返回到递归最外层
                // 这样以来,transitiveOutEdges收集齐了整个chain所有的出边
                transitiveOutEdges.addAll(
                        createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
            }

            // 对于每个不可连接的StreamEdge,则将对应的StreamEdge就是当前链的一个输出StreamEdge,所以会添加到
            // transitiveOutEdges这个集合中 然后递归调用其Target node,注意,startNodeID变成了nonChainable这个
            // StreamEdge的输出节点id,chainIndex也赋值为0,说明重新开始一条链的建立
            for (StreamEdge nonChainable : nonChainableOutputs) {
                // 将不可被chain的StreamEdge,添加到transitiveOutEdges集合中
                transitiveOutEdges.add(nonChainable);
                // 调用createChain,构建新的chain
                // todo  这里传入StreamNodeId 都是下游StreamNodeId, 并创建新的OperatorChainInfo对象;
                createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
            }

            //生成当前节点的显示名,如:"Keyed Aggregation -> Sink: Unnamed"
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
            // 设置chain的最小资源
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            // 设置chain的首选资源
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

            OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));


            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }


            // 如果当前节点是起始节点,则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig
            // createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig
            // 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
            StreamConfig config = currentNodeId.equals(startNodeId)
                    ? createJobVertex(startNodeId, chainInfo)
                    : new StreamConfig(new Configuration());

            // 设置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
            // 其中包括 序列化器, StreamOperator, Checkpoint 等相关配置
            // 设置的Vertex属性到config中
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

            // 如果相等 说明处理到了当前节点,也就是说chain的子节点处理完了 需要创建Edge将两个JobVertex进行连接
            // startNodeId是 chain的开始节点,  currentNodeId 是当前节点,  
            // todo 如果开始节点是1 当前节点是4, 那么他的逻辑顺序则是  1 => 4  ,  1 => 3  ,  1 => 2 ,  1=>1
            //      当 当前节点等于开始节点的时候, 说明chain中间的节点处理完了, 需要处理 chain的顶点, 即chain的开始节点
            if (currentNodeId.equals(startNodeId)) {

                //如果是chain的起始节点。(不是chain的中间节点,会被标记成 chain start)
                // 意味着一个新chain的开始
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                //我们也会把物理出边写入配置, 部署时会用到
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

                //  将当前节点(headOfChain)与所有出边相连
                // 对于每一个chain,把它和指向下一个chain的出边连接起来
                for (StreamEdge edge : transitiveOutEdges) {
                    //通过StreamEdge构建出JobEdge,创建 IntermediateDataSet ,用来将JobVertex和JobEdge相连
                    connect(startNodeId, edge);
                }
                // 将chain中所有子节点的StreamConfig写入到 headOfChain 节点的 CHAINED_TASK_CONFIG 配置中
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {
                //如果是 chain 中的子节点
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                // 获取到被chain的节点
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                // 将当前节点的StreamConfig添加到该chain的config集合中
                // 关联chain内节点的配置信息到chain的起始节点上
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(currentOperatorId);
            //如果节点的输出StreamEdge已经为空,则说明是链的结尾
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            // 返回连往chain外部的出边集合
            return transitiveOutEdges;

        } else {
            //startNodeId 如果已经构建过,则直接返回
            return new ArrayList<>();
        }
    }

看一下createChain内调用的一些方法

isChainable

这个方法用于判断两个StreamEdge的源(即 源产生的operator<并不一定是 source operator>)是否可以chain一起进行执行,只有当所有的条件都满足的时候才返回true(即可以chain)

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        /** 获取StreamEdge的源和目标StreamNode */
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        /**
         * 1、下游节点只有一个输入
         * 4、上下游节点在一个槽位共享组内
         * 5、下游节点的连接策略是 ALWAYS
         * 6、上游节点的连接策略是 HEAD 或者 ALWAYS
         * 7、sourceFactory不能是YieldingOperatorFactory
         * 8、edge 的分区函数是 ForwardPartitioner 的实例
         * 9、shuffle模式不能是batch模式
         * 10、上下游节点的并行度相等
         * 11、可以进行节点连接操作
         */
        return downStreamVertex.getInEdges().size() == 1
                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
                && (edge.getPartitioner() instanceof ForwardPartitioner)
                && edge.getShuffleMode() != ShuffleMode.BATCH
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled();
    }
createJobVertex

JobVertex只有在chain的顶点创建,chain的其余节点以配置信息的形式记录在JobVertex中
这个方法首先会获取StreamNode(chain的开始节点),并创建一个JobVertexId,是根据当前StreamNode的hash创建的, 获取到chain顶点的operator hash对 (用户与flink生成),然后根据JobVertexName和JobVertexId以及hash对生成JobVertex,生成jobVertex后设置一些参数,将jobVertex存入到jobGreaph中,到这里,一个JobVertex就生成完了

private StreamConfig createJobVertex(
            Integer streamNodeId,
            OperatorChainInfo chainInfo) {

        JobVertex jobVertex;
        // 获取开始节点
        StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);

        byte[] hash = chainInfo.getHash(streamNodeId);

        if (hash == null) {
            throw new IllegalStateException("Cannot find node hash. " +
                    "Did you generate them before calling this method?");
        }
        // 通过hash创建JobVertexId
        JobVertexID jobVertexId = new JobVertexID(hash);

        // 获取任务链 operator的哈希值
        List<Tuple2<byte[], byte[]>> chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId);
        List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
        if (chainedOperators != null) {
            for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
                OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1);
                operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperator.f0), userDefinedOperatorID));
            }
        }
        // 如果chain是 输人输出格式, 创建InputOutputFormatVertex对象,   -- >  这个只是尝试性的
        if (chainedInputOutputFormats.containsKey(streamNodeId)) {
            jobVertex = new InputOutputFormatVertex(
                    chainedNames.get(streamNodeId),
                    jobVertexId,
                    operatorIDPairs);

            chainedInputOutputFormats
                .get(streamNodeId)
                .write(new TaskConfig(jobVertex.getConfiguration()));
        } else { // 否则创建 JobVertex 对象
            jobVertex = new JobVertex(
                    chainedNames.get(streamNodeId),
                    jobVertexId,
                    operatorIDPairs);
        }
        for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) {
            try {
                jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider));
            } catch (IOException e) {
                throw new FlinkRuntimeException(String.format(
                        "Coordinator Provider for node %s is not serializable.", chainedNames.get(streamNodeId)));
            }
        }

        jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));

        jobVertex.setInvokableClass(streamNode.getJobVertexClass());

        int parallelism = streamNode.getParallelism();

        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        } else {
            parallelism = jobVertex.getParallelism();
        }

        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());

        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
        }

        // TODO: inherit InputDependencyConstraint from the head operator
        //  设置输入状态的情况下安排任务方式。  , 默认 如果有可使用的输入,则调度该任务
        jobVertex.setInputDependencyConstraint(streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());

        //将生成好的jobVertex存入jobVertices列表
        jobVertices.put(streamNodeId, jobVertex);
        //已经构建好的jobVertex Id列表
        builtVertices.add(streamNodeId);
        // 将jovVertex添加到JobGraph图中
        jobGraph.addVertex(jobVertex);

        return new StreamConfig(jobVertex.getConfiguration());
    }

connect

这个方法主要用于 将两个JobVertex连接在一起,首先会获取到ShuffleMode(数据交互模式)生成resultPartitionType,然后通过不同的partition创建不同的JobEdge,如果是ForwardPartitioner或RescalePartitioner使用POINTWISE模式,否则使用ALL_TO_ALL(即 一个subTask对应下游subTask是一对一还是一对多),最后connectNewDataSetAsInput方法,创建一个JobEdge,将两个JobVertex连接在一起

    private void connect(Integer headOfChain, StreamEdge edge) {

        physicalEdgesInOrder.add(edge);

        Integer downStreamVertexID = edge.getTargetId();

        JobVertex headVertex = jobVertices.get(headOfChain);
        JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);

        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

        // 每次循环 input数量+1
        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

        //获取 edge的分区模式 --
        StreamPartitioner<?> partitioner = edge.getPartitioner();

        //结果分区的类型
        ResultPartitionType resultPartitionType;
        // 最终返回  PIPELINED_BOUNDED 或者  BLOCKING
        // 如果没有定义,会进行判断全局数据交换模式, 最终确定分区模式
        switch (edge.getShuffleMode()) {
            case PIPELINED:
                resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                break;
            case BATCH:
                resultPartitionType = ResultPartitionType.BLOCKING;
                break;
            case UNDEFINED:
                resultPartitionType = determineResultPartitionType(partitioner);
                break;
            default:
                throw new UnsupportedOperationException("Data exchange mode " +
                    edge.getShuffleMode() + " is not supported yet.");
        }

        JobEdge jobEdge;
        // 根据不同的分区模式创建JobEdge
        // 如果分区模式为ForwardPartitioner或RescalePartitioner;
        // 则使用 POINTWISE分配模式,  否则使用ALL_TO_ALL
        if (isPointwisePartitioner(partitioner)) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 每个生产子任务都连接到消费任务的一个或多个子任务
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                    headVertex,
                    // 每个生产子任务都与消费任务的子任务相连接
                    DistributionPattern.ALL_TO_ALL,
                    resultPartitionType);
        }
        // set strategy name so that web interface can show it.
        // 设置分发策略模式,用于 web显示
        jobEdge.setShipStrategyName(partitioner.toString());

        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
                    headOfChain, downStreamVertexID);
        }
    }
downStreamVertex.connectNewDataSetAsInput方法

首先由input的JobVertex(即上游),创建一个IntermediateDataSet(简称 dataSet),并将自己添加到dataSet的producer中,,然后当前JobVertex(即下游)构建一个JobEdge,并将edge添加到inputs的列表中,并将dataSet的consumer进行赋值为JobEdge, 这是JobEdge已经构建完毕了

JobEdge中的dataSet中有两个参数(producer,consumer),分别对应生产者(上游JobVertex)和消费者(下游JobVertex),通过这种形式将两个JobVertex进行连接

    public JobEdge connectNewDataSetAsInput(
            JobVertex input,
            DistributionPattern distPattern,
            ResultPartitionType partitionType) {

        // 由上游节点 创建一个IntermediateDataSet
        // todo 中间数据集是操作符 - sour操作或任何中间操作 - 产生的数据集
        //      中间数据集可能被其他操作符读取、具体化或丢弃。
        IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);

        // 由此可以看出, 他的输入就是上游节点的一个输出
        // JobEdge的输入就是当前节点
        // 构建一个jobEdge
        JobEdge edge = new JobEdge(dataSet, this, distPattern);
        // jobEdge 连接edge
        this.inputs.add(edge);
        dataSet.addConsumer(edge);
        return edge;
    }

构建JobGraph最主要的逻辑就是构建chain,源码中剩下方法就是设置了一些JobGraph的参数,比如设置slot共享组,设置托管内存分数,配置检查点等内容,最终会将JobGraph构建完成并返回

下面是一段简单的代码,我们看一下他的生成JobGraph的过程

简单的代码

在StreamGraph向JobGraph转换的时候,会将两个或多个算子chain一起进行执行,这是Flink对我们的程序进行的一个优化,

StreamGraph向JobGraph转换

现在我看一下 具体任务执行后的样子

任务运行时
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容