文档解读
文档路径
/Concepts/Distributed Runtime/Tasks and Operator Chains
For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.
上文提到的task映射到代码中就是org.apache.flink.runtime.taskmanager.Task
,API说明:
The Task represents one execution of a parallel subtask on a TaskManager. A Task wraps a Flink operator (which may be a user function) and runs it, providing all services necessary for example to consume input data, produce its results (intermediate result partitions) and communicate with the JobManager.
The Flink operators (implemented as subclasses of
AbstractInvokable
have only data readers, writers, and certain event callbacks. The task connects those to the network stack and actor messages, and tracks the state of the execution and handles exceptions.Tasks have no knowledge about how they relate to other tasks, or whether they are the first attempt to execute the task, or a repeated attempt. All of that is only known to the JobManager. All the task knows are its own runnable code, the task's configuration, and the IDs of the intermediate results to consume and produce (if any).
Each Task is run by one dedicated thread.
其中主要涉及到几个概念:Operator Chain,Task,Subtask。简言之,实际物理上运行的就是subtask线程,而这个线程执行的内容就是各个算子,如果一个subtask中执行了多个算子就形成了一个Operator Chain。对于执行相同算子的subtask可以认为是逻辑上的某一个task,也就是当并行度为1的时候,task就等同于subtask。并行度大于1的时候,一个task会物理上拆分成多个subtask线程进行计算。
下面的代码模拟了官方上图的示例
public class TaskDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(2);
env.addSource(new DataSource())
.map(new MyMapFunction())
.keyBy(0)
.process(new MyKeyedProcessFunction())
.addSink(new DataSink()).setParallelism(1).name("Custom Sink");
env.execute();
}
private static class MyMapFunction implements MapFunction<Tuple2<Long, String>, Tuple2<Long, String>> {
@Override
public Tuple2<Long, String> map(Tuple2<Long, String> value) throws Exception {
System.out.println(Thread.currentThread().getName() + " - key: " + value.f0);
return value;
}
}
private static class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, Tuple2<Long, String>, Tuple2<Long, String>> {
@Override
public void processElement(Tuple2<Long, String> value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
System.out.println(Thread.currentThread().getName() + " - key: " + ctx.getCurrentKey());
out.collect(value);
}
}
private static class DataSink implements SinkFunction<Tuple2<Long, String>> {
@Override
public void invoke(Tuple2<Long, String> value, Context context) throws Exception {
System.out.println("Result:" + value);
}
}
private static class DataSource extends RichParallelSourceFunction<Tuple2<Long, String>> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
String[] products = new String[]{"a", "b", "c", "d", "e", "f", "g"};
final long numElements = 10;
int i = 0;
while (running && i < numElements) {
Thread.sleep(RandomUtils.nextLong(1, 10) * 1000L);
Tuple2<Long, String> data = new Tuple2<>(RandomUtils.nextLong(0, 10),
products[RandomUtils.nextInt(0, products.length)]);
ctx.collect(data);
System.out.println("Sand data:" + data);
i++;
}
}
@Override
public void cancel() {
running = false;
}
}
}
查看TaskExecutor#submitTask
方法,里面会创建Task对象,每个task实际就是一个线程,在日志中检索Received task,有如下信息,然后会调用task.startTaskThread();
启动这些线程。
Received task Source: Custom Source -> Map (1/2).
Received task Source: Custom Source -> Map (2/2).
Received task KeyedProcess (1/2).
Received task KeyedProcess (2/2).
Received task Sink: Custom Sink (1/1).
对于第一个 Source: Custom Source -> Map
的task有两个subtask而这个task里面的operator chain就是Source: Custom Source
和Map
。每个task中包括的operator chain可以通过StreamTask的operatorChain变量查看。注意,一个task中的chain是倒序存在下面的数组里面,本例中的第一个task存储的operator顺序就是StreamMap和StreamSource。
/**
* Stores all operators on this chain in reverse order.
*/
private final StreamOperator<?>[] allOperators;
运行程序会随机生成一些数据,会有类似如下结果:
Legacy Source Thread - Source: Custom Source -> Map (1/2) - key: 6
KeyedProcess (1/2) - key: (6)
Legacy Source Thread - Source: Custom Source -> Map (2/2) - key: 6
KeyedProcess (1/2) - key: (6)
这个结果表示,随机生成的2条key为6数据,分别分配到了第一个task的subtask1和subtask2中计算,通过keyby之后,同一个key的数据会分到同一个task中,即分配到了第二个task的subtask1中。
扩展阅读
算子间数据传输
每个算子都会通过AbstractStreamOperator中的output.collect
方法发送数据
如果下游算子在同一个operator chains中,那么最终调用的是
OperatorChain#ChainingOutput#collect
方法,里面会调用到下一个算子的执行方法operator.processElement(castRecord);
如果下游算子不在同一个operator chains中,即不再同一个task中,那么最终调用的是
RecordWriterOutput#collect
方法将序列化之后的结果发送到下游算子。而下游另一个task中的算子是通过StreamTaskNetworkInput#pollNextNullable
方法反序列化上一个算子发送的数据,返回的结果会通过StreamOneInputProcessor#processElement
方法传给算子的processElement方法。