flink为了提高执行效率,会将部分算子进行合并,合并后组成operator chain,这样整个operator chain作为一个JobVertex被调度到一个slot上去执行,避免了上下游算子发送数据时带来的网络开销以及数据序列化反序列化的性能损耗。
flink如何判断算子能组成一个operate chain呢?答案在源码StreamingJobGraphGenerator.java中:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
讲的通俗点就是必须满足以下条件:
-
downStreamVertex.getInEdges().size() == 1
:下游算子的入边必须为1,下游算子不能是connect,union,join,这些算子入边都是大于1,因为他需要接入两个流 -
upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
:上游算子和下游算子都在同一个SlotSharingGroup:每个算子都可以通过.slotSharingGroup()来指定该算子的SlotSharingGroup,默认不指定都是Default,如果想要上下游算子组成operater chain,上下游的SlotSharingGroup必须相同,要么都不指定,要么设置为相同的值 -
outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
:下游算子的ChainingStrategy策略必须是ALWAYS,上游算子的ChainingStrategy策略必须是ALWAYS或者HEAD:如果算子调用了disableChaining()的话,该算子的ChainingStrategy会变成NEVER,不能与上下游任何算子组成operator chain;如果算子调用了startNewChain(),那么这个算子的ChainingStrategy就变成HEAD;不能与上游算子组成chain,只能与下游算子组成chain。Source算子默认是HEAD,其他算子默认都是ALWAYS -
edge.getPartitioner() instanceof ForwardPartitioner
:上下游算子之间的数据分区方式是ForwardPartitioner:数据分区有几种分区方式:shuffle,rebalance,rescale,broadcast,keyedgroup,forward等。如果上下游算子之间没有显示调用其他分区模式(keyby,shuffle,rebalance,rescale,broadcast)并且上下游算子并发度相同的情况下,默认都是ForwardPartitioner模式 -
edge.getShuffleMode() != ShuffleMode.BATCH
:如果没有指定默认都是PIPELINED
/**
* The shuffle mode defines the data exchange mode between operators.
*/
@PublicEvolving
public enum ShuffleMode {
/**
* Producer and consumer are online at the same time.
* Produced data is received by consumer immediately.
*/
PIPELINED,
/**
* The producer first produces its entire result and finishes.
* After that, the consumer is started and may consume the data.
*/
BATCH,
/**
* The shuffle mode is undefined. It leaves it up to the framework to decide the shuffle mode.
* The framework will pick one of {@link ShuffleMode#BATCH} or {@link ShuffleMode#PIPELINED} in
* the end.
*/
UNDEFINED
}
-
upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
:上下游算子的并发度必须相同 -
streamGraph.isChainingEnabled()
:job没有调用disableChaining()