1. 函数类(Function Classes)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
下面例子实现了FilterFunction接口:
输入文件数据:
package com.udf
import com.transform.SensorReading
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object FilterTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val streamFromFile = env.readTextFile("data/sensor.txt");
val ds=streamFromFile.map(x=>{
val dataArray = x.split(",")
SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
})
ds.filter(new Myfilter()).print()
env.execute()
}
}
class Myfilter()extends FilterFunction[SensorReading]{
override def filter(value: SensorReading): Boolean = {
value.id.startsWith("sensor_1")
}
}
运行代码,输出如下:
"D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=57252:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.udf.FilterTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SensorReading(sensor_10,1588473885745,23.136816002475335)
SensorReading(sensor_1,1588473886247,2.0306394984733496)
Process finished with exit code 0
2. 匿名函数(Lambda Functions)
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))
3. 富函数(Rich Functions)
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
Rich Function有一个生命周期的概念。典型的生命周期方法有:
- open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
举例,FlatMap的富函数实现:
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作,例如建立一个和HDFS的连接
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}
override def close(): Unit = {
// 以下做一些清理工作,例如断开和HDFS的连接。
}
}