Flink的API概览
<v:shapetype id="_x0000_t75" stroked="f" filled="f" path="m@4@5l@4@11@9@11@9@5xe" o:preferrelative="t" o:spt="75" coordsize="21600,21600"><v:stroke joinstyle="miter"><v:formulas></v:formulas><v:path o:connecttype="rect" gradientshapeok="t" o:extrusionok="f"></v:path></v:stroke></v:shapetype><v:shape id="内容占位符_x0020_1" style="width:415.2pt;height:222pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" o:spid="_x0000_i1027"><v:imagedata o:title="" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image001.png"></v:imagedata></v:shape>
1、dataStream的数据源
1、socket数据源
从socket当中接收数据,并统计最近5秒钟每个单词出现的次数
第一步:node01开发socket服务
node01执行以下命令开启socket服务
nc -lk 9000
第二步:开发代码实现
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time object FlinkSource1 { def main(args: Array[String]): Unit = { //获取程序入口类val streamExecution: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketText: DataStream[String] = streamExecution.socketTextStream("node01",9000) //注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错import org.apache.flink.api.scala._ val result: DataStream[(String, Int)] = socketText.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.timeWindow(Time.seconds(5), Time.seconds(5)) //统计最近5秒钟的数据.sum(1) //打印结果数据result.print().setParallelism(1) //执行程序streamExecution.execute()
}
}
2、文件数据源
读取hdfs路径下面所有的文件数据进行处理
第一步:添加maven依赖
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency>
第二步:代码实现
object FlinkSource2 { def main(args: Array[String]): Unit = { val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //从文本读取数据val hdfStream: DataStream[String] = executionEnvironment.readTextFile("hdfs://node01:8020/flink_input/") val result: DataStream[(String, Int)] = hdfStream.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result.print().setParallelism(1)
executionEnvironment.execute("hdfsSource")
}
}
3、从一个已经存在的集合当中获取数据
代码实现
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkSource3 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val value: DataStream[String] = environment.fromElements[String]("hello world","spark flink") val result2: DataStream[(String, Int)] = value.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result2.print().setParallelism(1)
environment.execute()
}
}
4、自定义数据源
如果flink自带的一些数据源还不够的工作使用的话,我们还可以自定义数据源
flink提供了大量的已经实现好的source方法,你也可以自定义source
通过实现sourceFunction接口来自定义source,
或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义source。
1、通过ParallelSourceFunction 来实现自定义数据源
如果需要实现一个多并行度的数据源,那么我们可以通过实现ParallelSourceFunction 接口或者继承RichParallelSourceFunction 来自定义有并行度的source。
第一步:使用scala代码实现ParallelSourceFunction接口
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} class MyParalleSource extends ParallelSourceFunction[String] { var isRunning:Boolean = **true
override def** run(sourceContext: SourceFunction.SourceContext[String]): Unit = { while (true){
sourceContext.collect("hello world")
}
} override def cancel(): Unit = { isRunning = false }
}
第二步:使用自定义数据源
object FlinkSource5 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceStream: DataStream[String] = environment.addSource(new MyParalleSource) val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).map(x => (x, 1))
.keyBy(0)
.sum(1)
result.print().setParallelism(2)
environment.execute("paralleSource")
}
}
2、dataStream的算子介绍
官网算子介绍:
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html
flink当中对于实时处理,有很多的算子,我们可以来看看常用的算子主要有哪些,dataStream当中的算子主要分为三大类,
Transformations****:转换的算子,都是懒执行的,只有真正碰到sink****的算子才会真正加载执行
partition****:对数据进行重新分区等操作
Sink****:数据下沉目的地
<v:shape id="图片_x0020_14" style="width:415.2pt;height:235.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\20190214114209629.png" o:spid="_x0000_i1026"><v:imagedata o:title="20190214114209629" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image002.png"></v:imagedata></v:shape>
DataStream的Transformations算子
l map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
l flatmap:输入一个元素,可以返回零个,一个或者多个元素
l filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
l keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区【典型用法见备注】
l reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
l aggregations:sum(),min(),max()等
l window:在后面单独详解
l Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。
l Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。
l CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap
l Split:根据规则把一个数据流切分为多个流
l Select:和split配合使用,选择切分后的流
案例一:使用union算子来合并多个DataStream
获取两个dataStream,然后使用union将两个dataStream进行合并
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkUnion { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //获取第一个dataStream val firstStream: DataStream[String] = environment.fromElements("hello world","test scala") //获取第二个dataStream val secondStream: DataStream[String] = environment.fromElements("second test","spark flink") //将两个流进行合并起来val unionAll: DataStream[String] = firstStream.union(secondStream) //结果不做任何处理val unionResult: DataStream[String] = unionAll.map(x => { // println(x) x
}) //调用sink算子,打印输出结果unionResult.print().setParallelism(1) //开始运行environment.execute()
}
}
案例二:使用connect实现不同类型的DataStream进行连接
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment} object FlinkConnect { def main(args: Array[String]): Unit = { //获取程序入口类val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换的包import org.apache.flink.api.scala._ //定义string类型的dataStream val strStream: DataStream[String] = environment.fromElements("hello world","abc test") //定义int类型的dataStream val intStream: DataStream[Int] = environment.fromElements(1,2,3,4,5) //两个流进行connect操作val connectedStream: ConnectedStreams[String, Int] = strStream.connect(intStream) //通过map对数据进行处理,传入两个函数val connectResult: DataStream[Any] = connectedStream.map(x =>{ x + "abc"},y =>{ y * 2 })
connectResult.print().setParallelism(1)
environment.execute("connect stream")
}
}
案例三:使用split将一个DataStream切成多个DataStream
import java.{lang, util} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment} object FlinkSplit { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //获取第一个dataStream val resultDataStream: DataStream[String] = environment.fromElements("hello world","test spark","spark flink") //通过split来对我们的流进行切分操作val splitStream: SplitStream[String] = resultDataStream.split(new OutputSelector[String] { override def select(out: String): lang.Iterable[String] = { val strings = new util.ArrayListString if (out.contains("hello")) { //如果包含hello,那么我们就给这个流起名字叫做hello strings.add("hello")
} else {
strings.add("other")
}
strings
}
}) //对我么的stream进行选择val helloStream: DataStream[String] = splitStream.select("hello") //打印包含hello的所有的字符串helloStream.print().setParallelism(1)
environment.execute()
}
}
DataStream的Partition算子
https://blog.csdn.net/lmalds/article/details/60575205 flink的各种算子介绍
partition算子允许我们对数据进行重新分区,或者解决数据倾斜等问题
l Random partitioning:随机分区
• dataStream.shuffle()
l Rebalancing:对数据集进行再平衡,重分区,消除数据倾斜
• dataStream.rebalance()
l Rescaling:Rescaling是通过执行oepration算子来实现的。由于这种方式仅发生在一个单一的节点,因此没有跨网络的数据传输。
• dataStream.rescale()
<v:shape id="图片_x0020_34" style="width:279pt;height:196.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\1642492-20190329155510739-1792670965.png" o:spid="_x0000_i1025"><v:imagedata o:title="1642492-20190329155510739-1792670965" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image003.png"></v:imagedata></v:shape>
l Custom partitioning:自定义分区
• 自定义分区需要实现Partitioner接口
• dataStream.partitionCustom(partitioner, "someKey")
• 或者dataStream.partitionCustom(partitioner, 0);
l Broadcasting:广播变量,后面详细讲解
需求:对我们filter过后的数据进行重新分区
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val dataStream: DataStream[String] = environment.fromElements("hello world","test spark","abc hello","hello flink") val resultStream: DataStream[(String, Int)] = dataStream.filter(x => x.contains("hello")) // .shuffle //随机的重新分发数据,上游的数据,随机的发送到下游的分区里面去 // .rescale .rebalance //对数据重新进行分区,涉及到shuffle的过程.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.sum(1)
resultStream.print().setParallelism(1)
environment.execute()
}
}
案例实战:自定义分区策略
如果以上的几种分区方式还没法满足我们的需求,我们还可以自定义分区策略来实现数据的分区
需求:自定义分区策略,实现不同分区的数据发送到不同分区里面去进行处理,将包含hello的字符串发送到一个分区里面去,其他的发送到另外一个分区里面去
第一步:自定义分区类
import org.apache.flink.api.common.functions.Partitioner class MyPartitioner extends Partitioner[String]{ override def partition(word: String, num: Int): Int = {
println("****分区个数为****" + num) if(word.contains("hello")){ 0 }else{ 1 }
}
}
第二步:代码实现进行分区
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkCustomerPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置我们的分区数,如果不设置,默认使用CPU核数作为分区个数
environment.setParallelism(2) import org.apache.flink.api.scala._ //获取dataStream val sourceStream: DataStream[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop") val rePartition: DataStream[String] = sourceStream.partitionCustom(new MyPartitioner,x => x +"")
rePartition.map(x =>{
println("****数据的****key****为****" + x + "****线程为****" + Thread.currentThread().getId)
x
})
rePartition.print()
environment.execute()
}
}
DataStream的sink算子
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/
l writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
l print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
l 自定义输出addSink【kafka、redis】
我们可以通过sink算子,将我们的数据发送到指定的地方去,例如kafka或者redis或者hbase等等,前面我们已经使用过将数据打印出来调用print()方法,接下来我们来实现自定义sink将我们的数据发送到redis里面去
第一步:导入flink整合redis的jar包
** <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
第二步:代码开发
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Stream2Redis { def main(args: Array[String]): Unit = { //获取程序入口类val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //组织数据val streamSource: DataStream[String] = executionEnvironment.fromElements("hello world","key value") //将数据包装成为key,value对形式的tuple val tupleValue: DataStream[(String, String)] = streamSource.map(x =>(x.split(" ")(0),x.split(" ")(1))) val builder = new FlinkJedisPoolConfig.Builder
builder.setHost("node03")
builder.setPort(6379)
builder.setTimeout(5000)
builder.setMaxTotal(50)
builder.setMaxIdle(10)
builder.setMinIdle(5) val config: FlinkJedisPoolConfig = builder.build() //获取redis sink val redisSink = new RedisSink[Tuple2[String,String]](config,new MyRedisMapper) //使用我们自定义的sink tupleValue.addSink(redisSink) //执行程序executionEnvironment.execute("redisSink")
}
} class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET)
} override def getKeyFromData(data: (String, String)): String = {
data._1
} override def getValueFromData(data: (String, String)): String = {
data._2
}
}