Flink DataStream 算子 Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregate

[TOC]
数据转换将数据流从一种形式转换为另一种形式,也就是说输入可以是一个或多个数据流,输出也可以是零,或一个或多个数据流。Flink1.7对transform另起一个新的名字“Operators ”--Operators transform 。程序可以将多个transform组合成复杂的数据流拓扑。

1.Map

Map [DataStream->DataStream]

Map: 一对一转换,即一条转换成另一条。

输入一个元素并生成一个元素。 一个map函数,它将输入流的值加倍:
dataStream.map { x => x * 2 }

package com.bigdata.flink.dataStreamMapOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Map: 一对一转换
 */
public class DataStreamMapOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 商品的价格乘以8
        SingleOutputStreamOperator<UserAction> result = source.map(new MapFunction<UserAction, UserAction>() {
            @Override
            public UserAction map(UserAction value) throws Exception {

                int newPrice = value.getProductPrice() * 8;
                return new UserAction(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
            }
        });

        // 输出: 输出到控制台
        // UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=64)
        result.print();

        env.execute();
    }
}

2.FlatMap

转换:DataStream → DataStream

FlatMap [DataStream->DataStream]
FlatMap: 一行变零到多行。如下,将一个句子(一行)分割成多个单词(多行)。

输入一个元素并生成零个,一个或多个元素。 将句子分割为单词的flatmap函数:

dataStream.flatMap { str => str.split(" ") }

package com.bigdata.flink.dataStreamFlatMapOperator;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Summary:
 *      FlatMap: 一行变任意行(0~多行)
 */
public class DataStreamFlatMapOperator {
  public static void main(String[] args) throws Exception{
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 输入: 英文电影台词
      DataStreamSource<String> source = env
              .fromElements(
                      "You jump I jump",
                      "Life was like a box of chocolates"
              );

      // 转换: 将包含chocolates的句子转换为每行一个单词
      SingleOutputStreamOperator<String> result = source.flatMap(new FlatMapFunction<String, String>() {
          @Override
          public void flatMap(String value, Collector<String> out) throws Exception {
              if(value.contains("chocolates")){
                  String[] words = value.split(" ");
                  for (String word : words) {
                      out.collect(word);
                  }
              }
          }
      });

      // 输出: 输出到控制台
      // Life
      // was
      // like
      // a
      // box
      // of
      // chocolates
      result.print();

      env.execute();
  }
}

Filter [DataStream->DataStream]

DataStream → DataStream
计算每个元素的布尔函数,并保留函数返回true的元素。 过滤掉零值的过滤器,通俗来讲就是过滤掉等于0的元素,转换成新的数据流

dataStream.filter { _ != 0 }

4.KeyBy

转换:DataStream → KeyedStream
KeyBy [DataStream->KeyedStream]
KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。

逻辑分区流分为不同的分区。 具有相同key的所有记录都分配给同一分区。 在内部,keyBy()是使用hash分区实现的。 指定key有不同的方法。此Transformations返回KeyedStream,

注意:

  • 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
  • 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
  • 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
  • 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。

Reduce [KeyedStream->DataStream]

Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
注意: Reduce会输出每一次滚动聚合的结果。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容