Flink实战(四) - DataSet API编程

1 你将学到

◆ DataSet API开发概述

◆ 计数器

◆ DataSource

◆ 分布式缓存

◆ Transformation

◆ Sink

2 Data Set API 简介

Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的常规程序.

最初从某些Source源创建数据集(例如,通过读取文件或从本地集合创建)

结果通过sink返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)

image

Flink程序可以在各种环境中运行,单机运行或嵌入其他程序中

执行可以在本地JVM中执行,也可以在集群机器上执行.

  • 有关Flink API基本概念的介绍,请参阅本系列的上一篇

Flink实战(三) - 编程模型及核心概念

为了创建自己的Flink DataSet程序,鼓励从Flink程序的解剖开始,逐步添加自己的转换!

3 测试环境

image
image

4 Data Sources简介

数据源创建初始数据集,例如来自文件或Java集合。创建数据集的一般机制是在InputFormat后面抽象的

Flink附带了几种内置格式,可以从通用文件格式创建数据集。其中许多都在ExecutionEnvironment上有快捷方法。

4.1 基于文件

  • readTextFile(path)/ TextInputFormat
    按行读取文件并将它们作为字符串返回
  • readTextFileWithValue(path)/ TextValueInputFormat
    按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串
  • readCsvFile(path)/ CsvInputFormat
    解析逗号(或其他字符)分隔字段的文件。返回元组,案例类对象或POJO的DataSet。支持基本的java类型及其Value对应的字段类型
  • readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat
    使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件
  • readSequenceFile(Key,Value,path)/ SequenceFileInputFormat
    创建JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 <Key,Value>返回。

4.2 基于集合

  • fromCollection(Iterable) - 从Iterable创建数据集。 Iterable返回的所有元素必须属于同一类型
  • fromCollection(Iterator) - 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • fromElements(elements:_ *) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型
  • fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • generateSequence(from,to) - 并行生成给定时间间隔内的数字序列。

4.3 通用

  • readFile(inputFormat,path)/ FileInputFormat
    接受文件输入格式
  • createInput(inputFormat)/ InputFormat

    接受通用输入格式5 从集合创建DataSet5.1 Scala实现
    image

5.2 Java实现

image

6 从文件/文件夹创建DataSet

6.1 Scala实现

文件

image
image

文件夹

image
image

Java实现

image
image

7 从csv文件创建Dataset

image

7.1 Scala实现

  • 注意忽略第一行


    image
image
  • includedFields参数使用
    image
  • 定义一个POJO
    image

    image
    8 从递归文件夹的内容创建DataSet
    image
    8.1 Scala实现
    image

9从压缩文件中创建DataSet

Flink目前支持输入文件的透明解压缩,如果它们标有适当的文件扩展名。 特别是,这意味着不需要进一步配置输入格式,并且任何FileInputFormat都支持压缩,包括自定义输入格式。

压缩文件可能无法并行读取,从而影响作业可伸缩性。

下表列出了当前支持的压缩方法

image

9.1 Scala实现

image

10 Transformation

10.1 map

Map转换在DataSet的每个元素上应用用户定义的map函数。 它实现了一对一的映射,也就是说,函数必须返回一个元素。

以下代码将Integer对的DataSet转换为Integers的DataSet:

Scala实现

image
image

Java实现

image

10.2 filter

Scala实现

image

Java实现

image

10.3 mapPartition

MapPartition在单个函数调用中转换并行分区。 map-partition函数将分区作为Iterable获取,并且可以生成任意数量的结果值。 每个分区中的元素数量取决于并行度和先前的操作。

Scala实现

image

Java实现

image

10.4 first

Scala实现

10.5 Cross

image

11 Data Sinks

image
image

11.1 Java描述

Data Sinks使用DataSet并用于存储或返回它们

使用OutputFormat描述数据接收器操作

Flink带有各种内置输出格式,这些格式封装在DataSet上的操作后面:

  • writeAsText()/ TextOutputFormat
    将元素按行顺序写入字符串。通过调用每个元素的toString()方法获得字符串。
  • writeAsFormattedText()/ TextOutputFormat
    按字符串顺序写入元素。通过为每个元素调用用户定义的format()方法来获取字符串。
  • writeAsCsv(...)/ CsvOutputFormat
    将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)
    打印标准输出/标准错误流上每个元素的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将以生成输出的任务的标识符为前缀。
  • write()/ FileOutputFormat
    自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()/ OutputFormat
    最通用的输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。
    可以将DataSet输入到多个操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

使用自定义输出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

本地排序输出

可以使用元组字段位置或字段表达式以指定顺序在指定字段上对数据接收器的输出进行本地排序。 这适用于每种输出格式。

以下示例显示如何使用此功能:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

参考

DataSet Transformations

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,869评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,716评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,223评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,047评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,089评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,839评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,516评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,410评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,920评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,052评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,179评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,868评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,522评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,070评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,186评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,487评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,162评论 2 356

推荐阅读更多精彩内容