StreamGraph生成

名词解释

StreamGraph

一个代码用户编码结构的拓扑结构(不是很准确,因为很多用户编码的算子没有生成对应的StreamNode,like: shuffle, split等),所以【一个还未经过优化处理的逻辑计划】这样描述会更加准确。由Client端生成。(问题:StreamGraph → JobGraph这一步的转换,优化了什么?)

相关API

DataStream

A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation.

顾名思义,data组成的stream;DataStream中持有Transformation<T> transformation,经过该transformation,得到此DataStream(注意不是通过该transformation生成一个新的DataStream),意味着DataStream中持有的Transformation是该DataStream的来源;

Transformation

A Transformation represents the operation that creates a DataStream

(讲述起来比较复杂)可以简单理解成,代码中每个引用的算子,都是一个transformation;比如flatMap,split,print都有对应的Transformation实现类,具体实现如下:


对Transformation的具体实现类都在org.apache.flink.streaming.api.transformations中

Operator

Transformation中持有StreamOperator的实现,StreamOperator封装具体的Function来对DataStream中的record进程处理;以StreamMap实现类为例,StreamMap继承自StreamOperator接口;

以一个例子(也是贯穿全文的例子)归纳以上三个类:

public class FlinkBatchDemo {

    private static final Logger logger = LoggerFactory.getLogger(FlinkBatchDemo.class);

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(WordCountData.WORDS);

        DataStream<Tuple2<String, Integer>> counts =

                // split up the lines in pairs (2-tuples) containing: (word,1)

                text.flatMap(new Tokenizer())

                        // group by the tuple field "0" and sum up tuple field "1"

                        .keyBy(0).sum(1);

        // emit result

        counts.print();

//        System.out.println(env.getStreamGraph().getStreamingPlanAsJSON());

        // execute program

        env.execute("Streaming WordCount");

    }


    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            // normalize and split the line

            String[] tokens = value.toLowerCase().split("\\W+");


            // emit the pairs

            for (String token : tokens) {

                if (token.length() > 0) {

                    out.collect(new Tuple2<>(token, 1));

                }

            }

        }

    }

}

示例中text.flatMap(new Tokenizer())这一行,对inputStream调用flatMap算子,相当于对inputStream这个DataStream进行transform,transform的具体实现类为:OneInputTransformation(生成的该OneInputTransformation的input为inputStream中的Transformation),而OneInputTransformation持有StreamFlatMap,具体的处理逻辑在StreamFlatMap中 ,通过调用userFunction的具体实现来实现,如下图所示:

(该图有误,需要重画)

StreamGraph

StreamGraph可视化

通过env.getStreamGraph().getStreamingPlanAsJSON()方法可以的到StreamGraph的Json:

{

  "nodes" : [ {

    "id" : 1,

    "type" : "Source: Collection Source",

    "pact" : "Data Source",

    "contents" : "Source: Collection Source",

    "parallelism" : 1

  }, {

    "id" : 2,

    "type" : "Flat Map",

    "pact" : "Operator",

    "contents" : "Flat Map",

    "parallelism" : 1,

    "predecessors" : [ {

      "id" : 1,

      "ship_strategy" : "FORWARD",

      "side" : "second"

    } ]

  }, {

    "id" : 4,

    "type" : "Keyed Aggregation",

    "pact" : "Operator",

    "contents" : "Keyed Aggregation",

    "parallelism" : 1,

    "predecessors" : [ {

      "id" : 2,

      "ship_strategy" : "HASH",

      "side" : "second"

    } ]

  }, {

    "id" : 5,

    "type" : "Sink: Print to Std. Out",

    "pact" : "Data Sink",

    "contents" : "Sink: Print to Std. Out",

    "parallelism" : 1,

    "predecessors" : [ {

      "id" : 4,

      "ship_strategy" : "FORWARD",

      "side" : "second"

    } ]

  } ]

}

通过flink提供的可视化界面:https://flink.apache.org/visualizer/得到如下所示图形:



从图形中可以看到,缺少ID=3的节点(StreamNode),对应算子keyBy(0)未生成对应的StreamNode;因此可以看出,不是所有的算子都会产生对应的StreamNode;

源码阅读

从StreamExecutionEnvironment类入手来看源码是如何生成StreamGraph的,带着问题去看:

什么样的算子会生成StreamNode,什么样的算子会被忽略,被忽略的算子中的function是怎样处理的?

图中箭头中的FORWARD、HASH代表什么?

追踪代码,StreamGraph是在env.execute()之后触发生成的

-> execute(getStreamGraph(jobName));

  -> return getStreamGraph(jobName, true);

    -> StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();

      -> return new StreamGraphGenerator(transformations, config, checkpointCfg)

生成StreamGraphGenerator需要transformations参数,追踪transformations赋值的过程:

-> env.fromElements

 -> return addSource(function, "Collection Source", typeInfo).setParallelism(1);

  -> return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);

   -> DataStream.flatMap

    -> return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));

     -> getExecutionEnvironment().addOperator(resultTransform); // resultTransform是flatmap对应的Transformation,其inputTransformation也就是上游        transformation对应LegacySourceTransformation

      -> this.transformations.add(transformation);

发现将flatmap对应的transformation也就是OneInputTransformation加入到变量transformations中;

继续追踪demo中的算子,最终确定transformations中存在Flat Map → OneInputTransformation, Keyed Aggregation → OneInputTransformation(上游transformation是keyBy算子对应的PartitionTransformation), Print to Std. Out → SinkTransformation(上游transformation是OneInputTransformation);

共有三个Transformation;

StreamGraphGenerator是通过遍历Transformation中的每一个成员,最终形成一个DAG图:

for (Transformation<?> transformation: transformations) {

   transform(transformation);

}



...

不同的Transform类型,对应不同的方法;

需要注意的是(问题1):

对于Keyed Aggregation这个OneInputTransformation,上游的PartitionTransformation不会生成对应的StreamNode,而是生成一个VirtualPartitionNode:

private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {

   Transformation<T> input = partition.getInput();

   List resultIds = new ArrayList<>();


   Collection<Integer> transformedIds = transform(input);

   for (Integer transformedId: transformedIds) {

      int virtualId = Transformation.getNewNodeId();

      streamGraph.addVirtualPartitionNode(

            transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());

      resultIds.add(virtualId);

   }


   return resultIds;

}



// 对于VirtualPartitionNode,在addEdge中,如果上游是该Node,会继续向上遍历直到获取到非虚拟节点

} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {

   int virtualId = upStreamVertexID;

   upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;

   if (partitioner == null) {

      partitioner = virtualPartitionNodes.get(virtualId).f1;

   }

   shuffleMode = virtualPartitionNodes.get(virtualId).f2;

   addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);

}

所以对于keyby算子(对应PartitionTransformation)不会生成对应的StreamNode与StreamEdge,生成的StreamGraph会越过这个算子直接将两个非虚拟的Node进行连接;具体执行的function在生成StreamGraph的代码中没有体现。

对于问题2:

生成两个Node之间的Edge时,会对上下游的partition数目进行判断,上下游数目一致时,选择ForwardPartitioner:

if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {

   partitioner = new ForwardPartitioner<Object>();

} else if (partitioner == null) {

   partitioner = new RebalancePartitioner<Object>();

}



public class ForwardPartitioner extends StreamPartitioner<T> {

   private static final long serialVersionUID = 1L;


   @Override

   public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

      return 0;

   }


   public StreamPartitioner<T> copy() {

      return this;

   }


   @Override

   public String toString() {

      return "FORWARD";

   }

}

而KeyedStream中,用到的是KeyGroupStreamPartitioner

```public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {

   this(

      dataStream,

      new PartitionTransformation<>(

         dataStream.getTransformation(),

         // 初始化KeyedStream时,声明KeyGroupStreamPartitioner

         new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),

      keySelector,

      keyType);

}



public class KeyGroupStreamPartitioner extends StreamPartitioner implements ConfigurableStreamPartitioner {

   private static final long serialVersionUID = 1L;


   private final KeySelector<T, K> keySelector;


   private int maxParallelism;


   public KeyGroupStreamPartitioner(KeySelector keySelector, int maxParallelism) {

      Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");

      this.keySelector = Preconditions.checkNotNull(keySelector);

      this.maxParallelism = maxParallelism;

   }


   public int getMaxParallelism() {

      return maxParallelism;

   }


   @Override

   public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {

      K key;

      try {

         key = keySelector.getKey(record.getInstance().getValue());

      } catch (Exception e) {

         throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);

      }

      return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);

   }


   @Override

   public StreamPartitioner<T> copy() {

      return this;

   }


   @Override

   public String toString() {

      return "HASH";

   }


   @Override

   public void configure(int maxParallelism) {

      KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);

      this.maxParallelism = maxParallelism;

   }

}```swift

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容