StreamGraph生成

名词解释

StreamGraph

一个代码用户编码结构的拓扑结构(不是很准确,因为很多用户编码的算子没有生成对应的StreamNode,like: shuffle, split等),所以【一个还未经过优化处理的逻辑计划】这样描述会更加准确。由Client端生成。(问题:StreamGraph → JobGraph这一步的转换,优化了什么?)

相关API

DataStream

A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation.

顾名思义,data组成的stream;DataStream中持有Transformation<T> transformation,经过该transformation,得到此DataStream(注意不是通过该transformation生成一个新的DataStream),意味着DataStream中持有的Transformation是该DataStream的来源;

Transformation

A Transformation represents the operation that creates a DataStream

(讲述起来比较复杂)可以简单理解成,代码中每个引用的算子,都是一个transformation;比如flatMap,split,print都有对应的Transformation实现类,具体实现如下:


对Transformation的具体实现类都在org.apache.flink.streaming.api.transformations中

Operator

Transformation中持有StreamOperator的实现,StreamOperator封装具体的Function来对DataStream中的record进程处理;以StreamMap实现类为例,StreamMap继承自StreamOperator接口;

以一个例子(也是贯穿全文的例子)归纳以上三个类:

public class FlinkBatchDemo {

    private static final Logger logger = LoggerFactory.getLogger(FlinkBatchDemo.class);

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(WordCountData.WORDS);

        DataStream<Tuple2<String, Integer>> counts =

                // split up the lines in pairs (2-tuples) containing: (word,1)

                text.flatMap(new Tokenizer())

                        // group by the tuple field "0" and sum up tuple field "1"

                        .keyBy(0).sum(1);

        // emit result

        counts.print();

//        System.out.println(env.getStreamGraph().getStreamingPlanAsJSON());

        // execute program

        env.execute("Streaming WordCount");

    }


    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            // normalize and split the line

            String[] tokens = value.toLowerCase().split("\\W+");


            // emit the pairs

            for (String token : tokens) {

                if (token.length() > 0) {

                    out.collect(new Tuple2<>(token, 1));

                }

            }

        }

    }

}

示例中text.flatMap(new Tokenizer())这一行,对inputStream调用flatMap算子,相当于对inputStream这个DataStream进行transform,transform的具体实现类为:OneInputTransformation(生成的该OneInputTransformation的input为inputStream中的Transformation),而OneInputTransformation持有StreamFlatMap,具体的处理逻辑在StreamFlatMap中 ,通过调用userFunction的具体实现来实现,如下图所示:

(该图有误,需要重画)

StreamGraph

StreamGraph可视化

通过env.getStreamGraph().getStreamingPlanAsJSON()方法可以的到StreamGraph的Json:

{

  "nodes" : [ {

    "id" : 1,

    "type" : "Source: Collection Source",

    "pact" : "Data Source",

    "contents" : "Source: Collection Source",

    "parallelism" : 1

  }, {

    "id" : 2,

    "type" : "Flat Map",

    "pact" : "Operator",

    "contents" : "Flat Map",

    "parallelism" : 1,

    "predecessors" : [ {

      "id" : 1,

      "ship_strategy" : "FORWARD",

      "side" : "second"

    } ]

  }, {

    "id" : 4,

    "type" : "Keyed Aggregation",

    "pact" : "Operator",

    "contents" : "Keyed Aggregation",

    "parallelism" : 1,

    "predecessors" : [ {

      "id" : 2,

      "ship_strategy" : "HASH",

      "side" : "second"

    } ]

  }, {

    "id" : 5,

    "type" : "Sink: Print to Std. Out",

    "pact" : "Data Sink",

    "contents" : "Sink: Print to Std. Out",

    "parallelism" : 1,

    "predecessors" : [ {

      "id" : 4,

      "ship_strategy" : "FORWARD",

      "side" : "second"

    } ]

  } ]

}

通过flink提供的可视化界面:https://flink.apache.org/visualizer/得到如下所示图形:



从图形中可以看到,缺少ID=3的节点(StreamNode),对应算子keyBy(0)未生成对应的StreamNode;因此可以看出,不是所有的算子都会产生对应的StreamNode;

源码阅读

从StreamExecutionEnvironment类入手来看源码是如何生成StreamGraph的,带着问题去看:

什么样的算子会生成StreamNode,什么样的算子会被忽略,被忽略的算子中的function是怎样处理的?

图中箭头中的FORWARD、HASH代表什么?

追踪代码,StreamGraph是在env.execute()之后触发生成的

-> execute(getStreamGraph(jobName));

  -> return getStreamGraph(jobName, true);

    -> StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();

      -> return new StreamGraphGenerator(transformations, config, checkpointCfg)

生成StreamGraphGenerator需要transformations参数,追踪transformations赋值的过程:

-> env.fromElements

 -> return addSource(function, "Collection Source", typeInfo).setParallelism(1);

  -> return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);

   -> DataStream.flatMap

    -> return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));

     -> getExecutionEnvironment().addOperator(resultTransform); // resultTransform是flatmap对应的Transformation,其inputTransformation也就是上游        transformation对应LegacySourceTransformation

      -> this.transformations.add(transformation);

发现将flatmap对应的transformation也就是OneInputTransformation加入到变量transformations中;

继续追踪demo中的算子,最终确定transformations中存在Flat Map → OneInputTransformation, Keyed Aggregation → OneInputTransformation(上游transformation是keyBy算子对应的PartitionTransformation), Print to Std. Out → SinkTransformation(上游transformation是OneInputTransformation);

共有三个Transformation;

StreamGraphGenerator是通过遍历Transformation中的每一个成员,最终形成一个DAG图:

for (Transformation<?> transformation: transformations) {

   transform(transformation);

}



...

不同的Transform类型,对应不同的方法;

需要注意的是(问题1):

对于Keyed Aggregation这个OneInputTransformation,上游的PartitionTransformation不会生成对应的StreamNode,而是生成一个VirtualPartitionNode:

private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {

   Transformation<T> input = partition.getInput();

   List resultIds = new ArrayList<>();


   Collection<Integer> transformedIds = transform(input);

   for (Integer transformedId: transformedIds) {

      int virtualId = Transformation.getNewNodeId();

      streamGraph.addVirtualPartitionNode(

            transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());

      resultIds.add(virtualId);

   }


   return resultIds;

}



// 对于VirtualPartitionNode,在addEdge中,如果上游是该Node,会继续向上遍历直到获取到非虚拟节点

} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {

   int virtualId = upStreamVertexID;

   upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;

   if (partitioner == null) {

      partitioner = virtualPartitionNodes.get(virtualId).f1;

   }

   shuffleMode = virtualPartitionNodes.get(virtualId).f2;

   addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);

}

所以对于keyby算子(对应PartitionTransformation)不会生成对应的StreamNode与StreamEdge,生成的StreamGraph会越过这个算子直接将两个非虚拟的Node进行连接;具体执行的function在生成StreamGraph的代码中没有体现。

对于问题2:

生成两个Node之间的Edge时,会对上下游的partition数目进行判断,上下游数目一致时,选择ForwardPartitioner:

if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {

   partitioner = new ForwardPartitioner<Object>();

} else if (partitioner == null) {

   partitioner = new RebalancePartitioner<Object>();

}



public class ForwardPartitioner extends StreamPartitioner<T> {

   private static final long serialVersionUID = 1L;


   @Override

   public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

      return 0;

   }


   public StreamPartitioner<T> copy() {

      return this;

   }


   @Override

   public String toString() {

      return "FORWARD";

   }

}

而KeyedStream中,用到的是KeyGroupStreamPartitioner

```public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {

   this(

      dataStream,

      new PartitionTransformation<>(

         dataStream.getTransformation(),

         // 初始化KeyedStream时,声明KeyGroupStreamPartitioner

         new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),

      keySelector,

      keyType);

}



public class KeyGroupStreamPartitioner extends StreamPartitioner implements ConfigurableStreamPartitioner {

   private static final long serialVersionUID = 1L;


   private final KeySelector<T, K> keySelector;


   private int maxParallelism;


   public KeyGroupStreamPartitioner(KeySelector keySelector, int maxParallelism) {

      Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");

      this.keySelector = Preconditions.checkNotNull(keySelector);

      this.maxParallelism = maxParallelism;

   }


   public int getMaxParallelism() {

      return maxParallelism;

   }


   @Override

   public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

      K key;

      try {

         key = keySelector.getKey(record.getInstance().getValue());

      } catch (Exception e) {

         throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);

      }

      return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);

   }


   @Override

   public StreamPartitioner<T> copy() {

      return this;

   }


   @Override

   public String toString() {

      return "HASH";

   }


   @Override

   public void configure(int maxParallelism) {

      KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);

      this.maxParallelism = maxParallelism;

   }

}```swift

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容