Tasks and Operator Chains

目录

文档解读

文档路径
/Concepts/Distributed Runtime/Tasks and Operator Chains

For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.

上文提到的task映射到代码中就是org.apache.flink.runtime.taskmanager.Task,API说明:

The Task represents one execution of a parallel subtask on a TaskManager. A Task wraps a Flink operator (which may be a user function) and runs it, providing all services necessary for example to consume input data, produce its results (intermediate result partitions) and communicate with the JobManager.

The Flink operators (implemented as subclasses of AbstractInvokablehave only data readers, writers, and certain event callbacks. The task connects those to the network stack and actor messages, and tracks the state of the execution and handles exceptions.

Tasks have no knowledge about how they relate to other tasks, or whether they are the first attempt to execute the task, or a repeated attempt. All of that is only known to the JobManager. All the task knows are its own runnable code, the task's configuration, and the IDs of the intermediate results to consume and produce (if any).

Each Task is run by one dedicated thread.

其中主要涉及到几个概念:Operator ChainTaskSubtask。简言之,实际物理上运行的就是subtask线程,而这个线程执行的内容就是各个算子,如果一个subtask中执行了多个算子就形成了一个Operator Chain。对于执行相同算子的subtask可以认为是逻辑上的某一个task,也就是当并行度为1的时候,task就等同于subtask。并行度大于1的时候,一个task会物理上拆分成多个subtask线程进行计算。

下面的代码模拟了官方上图的示例

public class TaskDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(2);

        env.addSource(new DataSource())
                .map(new MyMapFunction())
                .keyBy(0)
                .process(new MyKeyedProcessFunction())
                .addSink(new DataSink()).setParallelism(1).name("Custom Sink");

        env.execute();
    }

    private static class MyMapFunction implements MapFunction<Tuple2<Long, String>, Tuple2<Long, String>> {
        @Override
        public Tuple2<Long, String> map(Tuple2<Long, String> value) throws Exception {
            System.out.println(Thread.currentThread().getName() + " - key: " + value.f0);
            return value;
        }
    }

    private static class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, Tuple2<Long, String>, Tuple2<Long, String>> {
        @Override
        public void processElement(Tuple2<Long, String> value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
            System.out.println(Thread.currentThread().getName() + " - key: " + ctx.getCurrentKey());
            out.collect(value);
        }
    }

    private static class DataSink implements SinkFunction<Tuple2<Long, String>> {
        @Override
        public void invoke(Tuple2<Long, String> value, Context context) throws Exception {
            System.out.println("Result:" + value);
        }
    }

    private static class DataSource extends RichParallelSourceFunction<Tuple2<Long, String>> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
            String[] products = new String[]{"a", "b", "c", "d", "e", "f", "g"};

            final long numElements = 10;
            int i = 0;
            while (running && i < numElements) {
                Thread.sleep(RandomUtils.nextLong(1, 10) * 1000L);

                Tuple2<Long, String> data = new Tuple2<>(RandomUtils.nextLong(0, 10),
                        products[RandomUtils.nextInt(0, products.length)]);
                ctx.collect(data);
                System.out.println("Sand data:" + data);
                i++;
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

查看TaskExecutor#submitTask方法,里面会创建Task对象,每个task实际就是一个线程,在日志中检索Received task,有如下信息,然后会调用task.startTaskThread();启动这些线程。

Received task Source: Custom Source -> Map (1/2).
Received task Source: Custom Source -> Map (2/2).
Received task KeyedProcess (1/2).
Received task KeyedProcess (2/2).
Received task Sink: Custom Sink (1/1).

对于第一个 Source: Custom Source -> Map的task有两个subtask而这个task里面的operator chain就是Source: Custom SourceMap。每个task中包括的operator chain可以通过StreamTask的operatorChain变量查看。注意,一个task中的chain是倒序存在下面的数组里面,本例中的第一个task存储的operator顺序就是StreamMap和StreamSource。

/**
 * Stores all operators on this chain in reverse order.
 */
private final StreamOperator<?>[] allOperators;

运行程序会随机生成一些数据,会有类似如下结果:

Legacy Source Thread - Source: Custom Source -> Map (1/2) - key: 6
KeyedProcess (1/2) - key: (6)
Legacy Source Thread - Source: Custom Source -> Map (2/2) - key: 6
KeyedProcess (1/2) - key: (6)

这个结果表示,随机生成的2条key为6数据,分别分配到了第一个task的subtask1和subtask2中计算,通过keyby之后,同一个key的数据会分到同一个task中,即分配到了第二个task的subtask1中。

扩展阅读

算子间数据传输

每个算子都会通过AbstractStreamOperator中的output.collect方法发送数据

  • 如果下游算子在同一个operator chains中,那么最终调用的是OperatorChain#ChainingOutput#collect方法,里面会调用到下一个算子的执行方法operator.processElement(castRecord);

  • 如果下游算子不在同一个operator chains中,即不再同一个task中,那么最终调用的是RecordWriterOutput#collect方法将序列化之后的结果发送到下游算子。而下游另一个task中的算子是通过StreamTaskNetworkInput#pollNextNullable方法反序列化上一个算子发送的数据,返回的结果会通过StreamOneInputProcessor#processElement方法传给算子的processElement方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342