所有的Flink程序都是由三部分组成的:Source、Transformation、Sink。做各种各样的转换操作,Source是负责读取数据源,Transformation利用各种算子进行处理加工,Sink负责输出。
1.数据流
这三部分数据流,在运行的过程中,它又是如何运行在slot上的呢?
- 在运行时,Flink上运行的程序被映射成“逻辑数据流”(dataflows),它包含了这三部分
- 每一个DataFlow以一个或多个sources开始以一个或多个sinks结束。DataFlow类似于任意的有向无环图(DAG)。
- 在大部分情况下,程序中的转换运算(transformation)跟DataFlow中的算子(operator)是一一对应的关系。
有了这样一个DataFlow,如何去处理呢?这就涉及到我们最后生成的执行图。
2.执行图
在Flink里,从DataFlow到执行图的过程,可以把它分成四层:
- StreamGraph:代码生成的最初的图
- JobGraph:StreamGraph经过优化后生成JobGraph,客户端在提交给JobManager的数据结构。主要优化:将多个符合条件的小节点连在一起作为一个大的节点。
- ExecutionGraph:JobManager根据JobGraph生成ExecutionGraph,ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
- 物理执行图:JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署后形成的图,并不是一个具体的数据结构。
执行图生成过程如下:
3.并行度
一个特定算子的子任务(subtask)的个数称之为其并行度(paralielism)。一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度。
如下图,总共并行度是2,slot个数最少要2个来执行,
不同的并行度的数据还有交互:
那么并行度和slot到底有什么关系?
如上图,左边是一个JobGraph,这个作业包含的算子有5个,下面的角标代表其并行度。我们看到这几个算子里面最大的并行度是4,所以右边的图上给了四个slot,右图也根据每个算子的并行度给出了slot和算子间的具体分配,可以看到,任务被调度到slot上也是比较均匀的,如果把太多任务调度到一个slot上,那么slot运行效率会低下。
并行度和数据传输的关系
1.一个程序中,不同的算子可能具有不同的并行度
2.算子之间传输数据的形式可以是One-to-one模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类,下面说明这两种形式。
One-to-one:Stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map算子的子任务看到的元素的个数以及顺序跟source算子的子任务生产的元素的个数,顺序相同。map/filter/flatMap等算子都是one-to-one的对应关系。
Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transforation发送数据到不同的目标任务。例如,keyBy基于hashCode重分区、而redistribute过程就类似与spark的shuffle过程。
任务链
作业在调度过程中,会将不同的任务合并在一起,到底什么样的任务能合并一起做优化呢?下面来看一下优化技术:Operator Chains
Flink采用了一种称之为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发的方式进行连接。那什么条件可以满足合并呢?
- 相同并行度
- one-to-one
下图是一个任务链的例子:
任务调度控制
- 如果不想让Flink自动合并,可以调用env.disableOperatorChaining()方法。
- 如果想让某一个算子不合并,那也可以给某个算子调用disableChainging()方法
- 如果想从某个算子开始,从这个算子后面可以合并,前面一个不能合并,前面一个之前的也可以照常合并,那么可以调用startNewChain()方法。