Flink实时处理之DataStream

Flink的API概览

<v:shapetype id="_x0000_t75" stroked="f" filled="f" path="m@4@5l@4@11@9@11@9@5xe" o:preferrelative="t" o:spt="75" coordsize="21600,21600"><v:stroke joinstyle="miter"><v:formulas></v:formulas><v:path o:connecttype="rect" gradientshapeok="t" o:extrusionok="f"></v:path></v:stroke></v:shapetype><v:shape id="内容占位符_x0020_1" style="width:415.2pt;height:222pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" o:spid="_x0000_i1027"><v:imagedata o:title="" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image001.png"></v:imagedata></v:shape>

1、dataStream的数据源

1、socket数据源

从socket当中接收数据,并统计最近5秒钟每个单词出现的次数

第一步:node01开发socket服务

node01执行以下命令开启socket服务

nc -lk 9000

第二步:开发代码实现

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time object FlinkSource1 { def main(args: Array[String]): Unit = { //获取程序入口类val streamExecution: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketText: DataStream[String] = streamExecution.socketTextStream("node01",9000) //注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错import org.apache.flink.api.scala._ val result: DataStream[(String, Int)] = socketText.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.timeWindow(Time.seconds(5), Time.seconds(5)) //统计最近5秒钟的数据.sum(1) //打印结果数据result.print().setParallelism(1) //执行程序streamExecution.execute()
}
}

2、文件数据源

读取hdfs路径下面所有的文件数据进行处理

第一步:添加maven依赖

<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency>

第二步:代码实现

object FlinkSource2 { def main(args: Array[String]): Unit = { val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //从文本读取数据val hdfStream: DataStream[String] = executionEnvironment.readTextFile("hdfs://node01:8020/flink_input/") val result: DataStream[(String, Int)] = hdfStream.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)

result.print().setParallelism(1)

executionEnvironment.execute("hdfsSource")
}
}

3、从一个已经存在的集合当中获取数据

代码实现

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkSource3 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val value: DataStream[String] = environment.fromElements[String]("hello world","spark flink") val result2: DataStream[(String, Int)] = value.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)
result2.print().setParallelism(1)
environment.execute()
}
}

4、自定义数据源

如果flink自带的一些数据源还不够的工作使用的话,我们还可以自定义数据源

flink提供了大量的已经实现好的source方法,你也可以自定义source

通过实现sourceFunction接口来自定义source,

或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义source。

1、通过ParallelSourceFunction 来实现自定义数据源

如果需要实现一个多并行度的数据源,那么我们可以通过实现ParallelSourceFunction 接口或者继承RichParallelSourceFunction 来自定义有并行度的source。

第一步:使用scala代码实现ParallelSourceFunction接口

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} class MyParalleSource extends ParallelSourceFunction[String] { var isRunning:Boolean = **true

override def** run(sourceContext: SourceFunction.SourceContext[String]): Unit = { while (true){
sourceContext.collect("hello world")
}
} override def cancel(): Unit = { isRunning = false }
}

第二步:使用自定义数据源

object FlinkSource5 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceStream: DataStream[String] = environment.addSource(new MyParalleSource) val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).map(x => (x, 1))
.keyBy(0)
.sum(1)
result.print().setParallelism(2)
environment.execute("paralleSource")
}
}

2、dataStream的算子介绍

官网算子介绍:

https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html

flink当中对于实时处理,有很多的算子,我们可以来看看常用的算子主要有哪些,dataStream当中的算子主要分为三大类,

Transformations****:转换的算子,都是懒执行的,只有真正碰到sink****的算子才会真正加载执行

partition****:对数据进行重新分区等操作

Sink****:数据下沉目的地

<v:shape id="图片_x0020_14" style="width:415.2pt;height:235.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\20190214114209629.png" o:spid="_x0000_i1026"><v:imagedata o:title="20190214114209629" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image002.png"></v:imagedata></v:shape>

DataStream的Transformations算子

l map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

l flatmap:输入一个元素,可以返回零个,一个或者多个元素

l filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

l keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区【典型用法见备注】

l reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

l aggregations:sum(),min(),max()等

l window:在后面单独详解

l Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。

l Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。

l CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap

l Split:根据规则把一个数据流切分为多个流

l Select:和split配合使用,选择切分后的流

案例一:使用union算子来合并多个DataStream

获取两个dataStream,然后使用union将两个dataStream进行合并

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkUnion { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //获取第一个dataStream val firstStream: DataStream[String] = environment.fromElements("hello world","test scala") //获取第二个dataStream val secondStream: DataStream[String] = environment.fromElements("second test","spark flink") //将两个流进行合并起来val unionAll: DataStream[String] = firstStream.union(secondStream) //结果不做任何处理val unionResult: DataStream[String] = unionAll.map(x => { // println(x) x
}) //调用sink算子,打印输出结果unionResult.print().setParallelism(1) //开始运行environment.execute()
}
}

案例二:使用connect实现不同类型的DataStream进行连接

import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment} object FlinkConnect { def main(args: Array[String]): Unit = { //获取程序入口类val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换的包import org.apache.flink.api.scala._ //定义string类型的dataStream val strStream: DataStream[String] = environment.fromElements("hello world","abc test") //定义int类型的dataStream val intStream: DataStream[Int] = environment.fromElements(1,2,3,4,5) //两个流进行connect操作val connectedStream: ConnectedStreams[String, Int] = strStream.connect(intStream) //通过map对数据进行处理,传入两个函数val connectResult: DataStream[Any] = connectedStream.map(x =>{ x + "abc"},y =>{ y * 2 })
connectResult.print().setParallelism(1)
environment.execute("connect stream")
}
}

案例三:使用split将一个DataStream切成多个DataStream

import java.{lang, util} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment} object FlinkSplit { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //获取第一个dataStream val resultDataStream: DataStream[String] = environment.fromElements("hello world","test spark","spark flink") //通过split来对我们的流进行切分操作val splitStream: SplitStream[String] = resultDataStream.split(new OutputSelector[String] { override def select(out: String): lang.Iterable[String] = { val strings = new util.ArrayListString if (out.contains("hello")) { //如果包含hello,那么我们就给这个流起名字叫做hello strings.add("hello")
} else {
strings.add("other")
}
strings
}
}) //对我么的stream进行选择val helloStream: DataStream[String] = splitStream.select("hello") //打印包含hello的所有的字符串helloStream.print().setParallelism(1)
environment.execute()
}
}

DataStream的Partition算子

https://blog.csdn.net/lmalds/article/details/60575205 flink的各种算子介绍

partition算子允许我们对数据进行重新分区,或者解决数据倾斜等问题

l Random partitioning:随机分区

• dataStream.shuffle()

l Rebalancing:对数据集进行再平衡,重分区,消除数据倾斜

• dataStream.rebalance()

l Rescaling:Rescaling是通过执行oepration算子来实现的。由于这种方式仅发生在一个单一的节点,因此没有跨网络的数据传输。

• dataStream.rescale()

<v:shape id="图片_x0020_34" style="width:279pt;height:196.2pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\1642492-20190329155510739-1792670965.png" o:spid="_x0000_i1025"><v:imagedata o:title="1642492-20190329155510739-1792670965" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image003.png"></v:imagedata></v:shape>

l Custom partitioning:自定义分区

• 自定义分区需要实现Partitioner接口

• dataStream.partitionCustom(partitioner, "someKey")

• 或者dataStream.partitionCustom(partitioner, 0);

l Broadcasting:广播变量,后面详细讲解

需求:对我们filter过后的数据进行重新分区

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val dataStream: DataStream[String] = environment.fromElements("hello world","test spark","abc hello","hello flink") val resultStream: DataStream[(String, Int)] = dataStream.filter(x => x.contains("hello")) // .shuffle //随机的重新分发数据,上游的数据,随机的发送到下游的分区里面去 // .rescale .rebalance //对数据重新进行分区,涉及到shuffle的过程.flatMap(x => x.split(" "))
.map(x => (x, 1))
.keyBy(0)
.sum(1)

resultStream.print().setParallelism(1)
environment.execute()
}
}

案例实战:自定义分区策略

如果以上的几种分区方式还没法满足我们的需求,我们还可以自定义分区策略来实现数据的分区

需求:自定义分区策略,实现不同分区的数据发送到不同分区里面去进行处理,将包含hello的字符串发送到一个分区里面去,其他的发送到另外一个分区里面去

第一步:自定义分区类

import org.apache.flink.api.common.functions.Partitioner class MyPartitioner extends Partitioner[String]{ override def partition(word: String, num: Int): Int = {
println("****分区个数为****" + num) if(word.contains("hello")){ 0 }else{ 1 }
}
}

第二步:代码实现进行分区

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkCustomerPartition { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置我们的分区数,如果不设置,默认使用CPU核数作为分区个数

environment.setParallelism(2) import org.apache.flink.api.scala._ //获取dataStream val sourceStream: DataStream[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop") val rePartition: DataStream[String] = sourceStream.partitionCustom(new MyPartitioner,x => x +"")
rePartition.map(x =>{
println("****数据的****key****为****" + x + "****线程为****" + Thread.currentThread().getId)
x
})
rePartition.print()
environment.execute()

}
}

DataStream的sink算子

https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/

l writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

l print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

l 自定义输出addSink【kafka、redis】

我们可以通过sink算子,将我们的数据发送到指定的地方去,例如kafka或者redis或者hbase等等,前面我们已经使用过将数据打印出来调用print()方法,接下来我们来实现自定义sink将我们的数据发送到redis里面去

第一步:导入flink整合redis的jar包

** <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>

第二步:代码开发

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Stream2Redis { def main(args: Array[String]): Unit = { //获取程序入口类val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //组织数据val streamSource: DataStream[String] = executionEnvironment.fromElements("hello world","key value") //将数据包装成为key,value对形式的tuple val tupleValue: DataStream[(String, String)] = streamSource.map(x =>(x.split(" ")(0),x.split(" ")(1))) val builder = new FlinkJedisPoolConfig.Builder

builder.setHost("node03")
builder.setPort(6379)

builder.setTimeout(5000)
builder.setMaxTotal(50)
builder.setMaxIdle(10)
builder.setMinIdle(5) val config: FlinkJedisPoolConfig = builder.build() //获取redis sink val redisSink = new RedisSink[Tuple2[String,String]](config,new MyRedisMapper) //使用我们自定义的sink tupleValue.addSink(redisSink) //执行程序executionEnvironment.execute("redisSink")
}
} class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET)

} override def getKeyFromData(data: (String, String)): String = {
data._1

} override def getValueFromData(data: (String, String)): String = {
data._2

}
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容