四种优化 Apache Flink 应用程序的方法

使用 Flink Tuples

当你使用类似于 groupBy, join, 或者 keyBy 算子时,Flink 提供了多种用于在你的数据集上选择 key 的方法。你可以使用 key 选择函数,如下:

// Join movies and ratings datasets

movies.join(ratings)

        // Use movie id as a key in both cases

        .where(new KeySelector() {

            @Override

            public String getKey(Movie m) throws Exception {

                return m.getId();

            }

        })

        .equalTo(new KeySelector() {

            @Override

            public String getKey(Rating r) throws Exception {

                return r.getMovieId();

            }

        })

你甚至可以指定 POJO 类型中一个 field 的名字:

movies.join(ratings)

    // Use same fields as in the previous example

    .where("id")

    .equalTo("movieId")

但是如果你现在使用的是 Flink 元组类型(tuple types)的数据,你可以简单地指定将要作为 key 的字段在元组中的位置:

DataSet> movies = ...

DataSet> ratings = ...

movies.join(ratings)

    // Specify fields positions in tuples

    .where(0)

    .equalTo(1)

这种方法在 Flink 中将会获得最佳的性能,但是可读性方面呢?这是不是意味着你的代码看起来像下面那样:

DataSet> result = movies.join(ratings)

    .where(0)

    .equalTo(0)

    .with(new JoinFunction, Tuple2,

                      Tuple3>() {

        // What is happening here?

        @Override

        public Tuple3 join(Tuple2 first,

                      Tuple2 second) throws Exception {

            // Some tuples are joined with some other tuples and some fields are returned???

            return new Tuple3<>(first.f0, first.f1, second.f1);

        }

    });

在这种情况下,提高可读性的常见方法是创建一个继承自 TupleX 的类,并且实现其中的 getters 和 setters。下面是 Flink Gelly 类库中 Edge 类的实现,其中有三个 fileds,所以它直接继承了 Tuple3 类:

public class Edge extends Tuple3 {

    public Edge(K source, K target, V value) {

        this.f0 = source;

        this.f1 = target;

        this.f2 = value;

    }


    // Getters and setters for readability

    public void setSource(K source) {

        this.f0 = source;

    }

    public K getSource() {

        return this.f0;

    }


    // Also has getters and setters for other fields

    ...

}

重用 Flink 对象

另外一种可以提升 Flink 应用程序性能的方法是在用户自定义函数返回数据时使用可变对象(mutable objects),请看看下面的例子:

stream

    .apply(new WindowFunction, String, TimeWindow>() {

        @Override

        public void apply(String userName, TimeWindow timeWindow,

                Iterable iterable,

                Collector> collector) throws Exception {

            long changesCount = ...

            // A new Tuple instance is created on every execution

            collector.collect(new Tuple2<>(userName, changesCount));

        }

    }

正如你所看到的,在我们每次调用 apply 函数的时候,我们都会创建一个 Tuple2 类型的实例,这将会给垃圾回收造成很大的压力。解决这个问题的一种方法就是反复使用相同的实例:

stream

    .apply(new WindowFunction, String, TimeWindow>() {

        // Create an instance that we will reuse on every call

        private Tuple2 result = new Tuple<>();


        @Override

        public void apply(String userName, TimeWindow timeWindow,

                          Iterable iterable,

                          Collector> collector) throws Exception {

            long changesCount = ...


            // Set fields on an existing object instead of creating a new one

            result.f0 = userName;

            // Auto-boxing!! A new Long value may be created

            result.f1 = changesCount;


            // Reuse the same Tuple2 object

            collector.collect(result);

        }

    }

上面的代码性能会好些。虽然我们在每次调用的时候只创建了一个 Tuple2 实例,但是我们还间接地创建了 Long 类型的实例。为了解决这个问题, Flink 内部提供了一系列 value classes,比如:IntValue, LongValue, StringValue, FloatValue 等。这些类的重点是为内置类型提供了可变版本,所以我们可以在用户自定义函数中重用这些类型,下面就是如何使用的例子:

stream

    .apply(new WindowFunction, String, TimeWindow>() {

        // Create a mutable count instance

        private LongValue count = new IntValue();

        // Assign mutable count to the tuple

        private Tuple2 result = new Tuple<>("", count);


        @Override

        // Notice that now we have a different return type

        public void apply(String userName, TimeWindow timeWindow,

                  Iterable iterable,

                  Collector> collector) throws Exception {

            long changesCount = ...


            // Set fields on an existing object instead of creating a new one

            result.f0 = userName;

            // Update mutable count value

            count.setValue(changesCount);


            // Reuse the same tuple and the same LongValue instance

            collector.collect(result);

        }

    }

上面这些使用习惯在 Flink 类库中被普遍使用,比如 Flink Gelly。

使用函数注解

另一种优化 Flink 应用程序的方法是提供一些关于用户自定义函数如何对输入数据进行处理的信息。由于 Flink 无法解析和理解你的代码,所以你提供一些关键的信息将会帮助 Flink 创建一个更加高效的执行计划。我们可以使用三种注解:

@ForwardedFields – 指定输入数据中哪些字段保持不变并且在输出值中使用(specifies what fields in an input value were left unchanged and are used in an output value.)。

@NotForwardedFields – 指定在输出中相同位置未保留的字段(specifies fields which were not preserved in the same positions in the output.)。

@ReadFields – 指定哪些字段在计算结果的时候用到。你只能指定那些在计算中使用的字段,而不是仅仅将数据拷贝到输出中的字段。(specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.)

我们来看看如何使用 ForwardedFields 注释:

// Specify that the first element is copied without any changes

@ForwardedFields("0")

class MyFunction implements MapFunction, Tuple2> {

    @Override

    public Tuple2 map(Tuple2 value) {

      // Copy first field without change

        return new Tuple2<>(value.f0, value.f1 + 123);

    }

}

上面的注释意味着输入元组的第一个元素将不会改变,而且在返回元组中同样处在第一个位置。

如果你没有改变一个元素,只不过简单地将它移到不同的位置上,你同样可以使用 ForwardedFields 注释来实现。下面例子中,我们简单地将输入元组的位置互相交换,并且直接返回:

// 1st element goes into the 2nd position, and 2nd element goes into the 1st position

@ForwardedFields("0->1; 1->0")

class SwapArguments implements MapFunction, Tuple2> {

    @Override

    public Tuple2 map(Tuple2 value) {

      // Swap elements in a tuple

        return new Tuple2<>(value.f1, value.f0);

    }

}

上面例子中提到的注释只能应用到只有一个输入参数的函数中,比如 map 或者 flatMap。如果你有两个输入参数的函数,你可以分别使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 注释来为第一和第二个参数指定一些信息。

下面我们使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 注释来为实现 JoinFunction 接口的类指定相关的信息:

// Two fields from the input tuple are copied to the first and second positions of the output tuple

@ForwardedFieldsFirst("0; 1")

// The third field from the input tuple is copied to the third position of the output tuple

@ForwardedFieldsSecond("2")

class MyJoin implements JoinFunction, Tuple2, Tuple3>() {

    @Override

    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {

        return new Tuple3<>(first.f0, first.f1, second.f1);

    }

})

Flink 同样提供了 NotForwardedFieldsFirst, NotForwardedFieldsSecond, ReadFieldsFirst, 和 ReadFirldsSecond 注释来实现相同的功能。

选择 Join 类型

如果你为 Flink 提供了一些信息,可以使你的 Join 操作更快,在讨论这个是如何工作之前,让我们先了解 Fliink 是如何运行 Join 操作的。

当 Flink 处理批量数据时,集群中的每台机器只存储了部分的数据。为了执行 Join 操作, Apache Flink 需要找到两个数据集所有 key 相同的数据。为了做到这一点,Flink 首先必须将两个数据集拥有相同 key 的数据放在同一台机器上。这里有两种实现策略:

Repartition-Repartition strategy:在这种场景下,Join 的两个数据集分别对它们的 key 使用相同的分区函数进行分区,并经过网络发送数据。这就意味着如果数据集非常大,这将花费相当一部分时间将数据分发出去。

Broadcast-Forward strategy:这种场景下,大的数据集R不做处理,另一个比较小的数据集S将全部复制到集群中所有拥有R的一部分数据的机器上。

如果你使用一个比较小的数据集和一个比较大的数据集进行 join 操作,你可以使用 Broadcast-Forward 策略,这个很容易实现:

ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)

这种写法表示第一个数据集要比第二个数据集小的多。

Flink 支持的其他 join 提示有以下几种:

BROADCAST_HASH_SECOND – 表示第二个数据集比较小

REPARTITION_HASH_FIRST – 表示第一个数据集比较小

REPARTITION_HASH_SECOND – 表示第二个数据集有点小

REPARTITION_SORT_MERGE – 表示重新分区两个数据集并使用排序和合并策略(sorting and merging strategy)

OPTIMIZER_CHOOSES – Flink 优化器将决定如何连接数据集

本文翻译自:Four ways to optimize your Flink applications

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,446评论 0 13
  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,289评论 0 10
  • 佛曰:受身无间,永远不死,寿长乃无间地狱中之大劫。在佛经中清楚说明了,五逆者死后将,被打落无间地狱。一杀父,二杀母...
    范淼阅读 276评论 0 1
  • 之前一直在犹豫,徘徊,找不到努力学习的动力。 其实努力就是一种生活方式。要有限的努力,不要过度努力。要时常更新自己...
    漫整掘意阅读 112评论 0 1