Spark学习笔记十:SparkStreaming

一、基础知识

1.SparkStreaming简介

SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。

2.SparkStreaming与Storm的区别
  1. Storm是纯实时的流式处理框架,SparkStreaming是准实时的处理框架(微批处理),可以通过控制间隔的时间做到实时处理。因为微批处理,SparkStreaming的吞吐量比Storm要高。
  2. Storm 的事务机制要比SparkStreaming的要完善。
  3. Storm支持动态资源调度。(spark1.2开始和之后也支持)
  4. SparkStreaming擅长复杂的业务处理,Storm不擅长复杂的业务处理,擅长简单的汇总型计算。
3.SparkStreaming接收数据原理
SparkStreaming接收数据原理.jpg

二、SparkStreaming代码

代码示例:WordCountFromSocket.scala

代码注意事项:

  • 启动socket server 服务器:nc –lk 9999
  • receiver模式下接受数据,local的模拟线程必须大于等于2,一个线程用来receiver用来接受数据,另一个线程用来执行job。
  • Durations时间设置就是我们能接收的延迟度。这个需要根据集群的资源情况以及任务的执行情况来调节。
  • 创建JavaStreamingContext有两种方式(SparkConf,SparkContext)
  • 所有的代码逻辑完成后要有一个output operation类算子。
  • JavaStreamingContext.start() Streaming框架启动后不能再次添加业务逻辑。
  • JavaStreamingContext.stop() 无参的stop方法将SparkContext一同关闭,stop(false),不会关闭SparkContext。
  • JavaStreamingContext.stop()停止之后不能再调用start。

三、SparkStreaming算子操作

1.foreachRDD

output operation类算子,必须对抽取出来的RDD执行action类算子,代码才能执行

2.transform

transformation类算子,可以通过transform算子,拿到Dstream中的RDD,对RDD使用RDD的算子操作,但是最后要返回RDD,返回的RDD又被封装到Dstream中。
【代码示例:TransformBlackList.scala】

3.updateStateByKey

transformation类算子
(1)为SparkStreaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
(2)通过更新函数对该key的状态不断更新,对于每个新的batch而言,SparkStreaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
使用到updateStateByKey要开启checkpoint机制和功能。
(3)多久会将内存中的数据写入到磁盘一份?
如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。
【代码示例:UpdateStateByKey.scala】

4.saveAsTextFiles

output operation类算子,将结果保存为文件
【代码示例:CopyFileToDirectory.scala,SaveAsTextFile.scala】

四、窗口操作

业务场景:每隔5s查看过去10s的数据
代码示例:WindowOperator.scala

窗口操作.jpg

假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。
注意:窗口长度和滑动间隔必须是batchInterval的整数倍,如果不是整数倍会检测报错。

为避免任务堆积,需要优化window窗口操作:每次拿到总和减去出去的batch,加上新进的batch。

优化后的window窗口操作示意图.png

上图的batchInterval = 1s (每隔1s生成一个batch),窗口长度5s,滑动间隔1s。

优化后的window操作要保存状态所以要设置checkpoint路径,没有优化的window操作可以不设置checkpoint路径。

五、Driver HA(Standalone或者Mesos)

因为SparkStreaming是7*24小时运行,Driver只是一个简单的进程,有可能挂掉,所以实现Driver的HA就有必要(如果使用的Client模式就无法实现Driver HA ,这里针对的是cluster模式)。Yarn平台cluster模式提交任务,AM(AplicationMaster)相当于Driver,如果挂掉会自动启动AM。这里所说的DriverHA针对的是Spark standalone和Mesos资源调度的情况下。实现Driver的高可用有两个步骤:
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)

Driver中元数据包括:
1.创建应用程序的配置信息。
2.DStream的操作逻辑。
3.job中没有完成的批次数据,也就是job的执行进度。

注意:getOrCreate适用于代码逻辑不变,spark会记录下offset,可恢复数据。代码逻辑变了只能清空checkpoint目录才可执行新逻辑,解决此问题只能手动管理offset
【代码示例:SparkStreamingDriverHA.scala】

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容