Flink:DataStream类型转换及常用算子说明

1、DataStream算子转换概览

DataStreamFormations

2、DataStream转换算子

(1)Map [DataStream->DataStream]

说明:

  • 一对一转换,即一条转换成另一条
  • 调 用 用 户 定 义 的 MapFunction 对 DataStream[T] 数 据 进 行 处 理 , 形 成 新 的 Data-Stream[T],其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。

样例:

DataStream<String> stream = env.addSource(new SimpleStringGenerator()).setParallelism(1);
    stream.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        Thread.sleep(1000);
        String  tempStr="tmp_"+s;  // 单条数据计算
        return tempStr;
    }
}).print();
(2)FlatMap [DataStream->DataStream]

说明:

  • 一行变零到多行
  • 该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在 经典例子 WordCount 中,将每一行的文本数据切割,生成单词序列如在图所示,对于输入 DataStream[String]通过 FlatMap 函数进行处理,字符串数字按逗号切割,然后形成新的整 数数据集

样例:

DataStream<String> textTemp=stream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception {
        // 样例数据:"1;50001;0;202001;\"53180093<40100017:53180093:40006212;"
        List<String> tempList=Arrays.asList(s.split(";"));
        for(String str:tempList){
            collector.collect(str);
        }
    }
})
(3)Filter [DataStream->DataStream]

说明:

  • 该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条 件的数据过滤掉。

样例:

DataStream<String> filterStream=stream.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String s) throws Exception {
        return s.isEmpty();
    }
});
(4)KeyBy [DataStream->KeyedStream]

说明:

  • 该算子根据指定的 Key 将输入的 DataStream[T]数据格式转换为 KeyedStream[T],也就 是在数据集中执行 Partition 操作,将相同的 Key 值的数据放置在相同的分区中
  • 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。

样例:

DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
    new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
    ));
        
KeyedStream<UserInfo, String> result = source.keyBy(new KeySelector<UserInfo, String>() {
    @Override
    public String getKey(UserInfo user) throws Exception {
        return user.getName();
    }
});
(5)Reduce [KeyedStream->DataStream]

说明:

  • 该算子和 MapReduce 中 Reduce 原理基本一致,主要目的是将输入的 KeyedStream 通过 传 入 的 用 户 自 定 义 的 ReduceFunction 滚 动 地 进 行 数 据 聚 合 处 理 , 其 中 定 义 的 ReduceFunciton 必须满足运算结合律和交换律。

样例:

KeyedStream<UserInfo, String> result = source.keyBy(new KeySelector<UserInfo, String>() {
    @Override
    public String getKey(UserInfo user) throws Exception {
        return user.getName();
    }
});

DataStream<UserInfo> text=result.reduce(new ReduceFunction<UserInfo>() {
    @Override
    public UserInfo reduce(UserInfo userInfo, UserInfo t1) throws Exception {
        return userInfo;
    }
});
(6)Aggregations[KeyedStream->DataStream]

<html>
Aggregations 是 KeyedDataStream 接口提供的聚合算子,根据指定的字段进行聚合操 作,滚动地产生一系列数据聚合结果。其实是将 Reduce 算子中的函数进行了封装,封装的 聚合操作有 等,这样就不需要用户自己定义 Reduce 函数。
</html>

(7)Union[DataStream ->DataStream]

说明:

  • Union 算子主要是将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据 集的格式一致,输出的数据集的格式和输入的数据集格式保持一致。
    ** 样例:**
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
   new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
));

DataStreamSource<UserInfo> sourceTwo = env.fromCollection(Arrays.asList(
    new UserInfo("Ton", "21", "click"),
    new UserInfo("java", "23", "browse"),
    new UserInfo("flink", "31", "click")
));
        
DataStream unionStream=source.union(sourceTwo);
unionStream.print();
(8)Connect,CoMap,CoFlatMap[DataStream ->ConnectedStream->DataStream]

说明:

  • Connect 算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来 数据集的数据类型。
    样例:
DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
    new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
));

DataStreamSource<String> sourceTwo = env.fromCollection(Arrays.asList("1,2,3,4,5,6".split(",")));
// connect 算子进行两个流的拼接合并
ConnectedStreams<UserInfo, String> connectStream=source.connect(sourceTwo);

DataStream<String> text=connectStream.map(new CoMapFunction<UserInfo, String, String>() {
    @Override
    public String map1(UserInfo userInfo) throws Exception {
        return userInfo.toString();
    }

    @Override
    public String map2(String s) throws Exception {
        return s;
    }
});
text.print();

输出结果:
10> 2
2> 6
11> 3
1> 5
12> 4
9> 1
12> UserInfo{name='张三', age='31', desc='click'}
11> UserInfo{name='李四', age='23', desc='browse'}
10> UserInfo{name='张三', age='21', desc='click'}

流转换流程:

connectedStream转换流程

(9)Split 和 select [DataStream->SplitStream->DataStream]

说明:

  • Split 算子是将一个 DataStream 数据集按照条件进行拆分,形成两个数据集的过程, 也是 union 算子的逆向实现。每个接入的数据都会被路由到一个或者多个输出数据集中。
  • select 算子是选择指定标识的流。

样例:

DataStreamSource<UserInfo> source = env.fromCollection(Arrays.asList(
    new UserInfo("张三", "21", "click"),
    new UserInfo("李四", "23", "browse"),
    new UserInfo("张三", "31", "click")
));

SplitStream<UserInfo> splitStream = source.split(new OutputSelector<UserInfo>() {
    @Override
    public Iterable<String> select(UserInfo userInfo) {
        List<String> list = new ArrayList<>();
        if (userInfo.getName().equals("张三")) {
            list.add("success");
        } else {
            list.add("error");
        }
        return list;
    }
});
DataStream successStream = splitStream.select("success");
successStream.print("success-");
DataStream errorStream = splitStream.select("error");
errorStream.print("error-");

输出结果:
success-:3> UserInfo{name='张三', age='31', desc='click'}
success-:2> UserInfo{name='张三', age='21', desc='click'}
error-:9> UserInfo{name='李四', age='23', desc='browse'}

流转换流程:

split()&select()

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355