SparkStreaming提供的窗口计算功能,允许在数据的滑动窗口上应用转换操作。本节就来介绍SparkStreaming的窗口操作的知识。
1.DS滑动窗口的工作方式
滑动窗口和原始数据流的关系如下图所示:
image
如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowedDStream的RDD。在上述例子中,操作应用于最近3个时间片上的数据,窗口每次向前滑动2个时间片。由此可知,窗口操作需要指定两个参数:
- 窗口长度(windowLength):窗口操作的时间长度(如上例中为:3*时间片)
- 滑动间隔(slideInterval):相邻两个窗口操作的时间间隔,即窗口每次向前滑动的时间距离(如上例中为:2*时间片)
注意两点:
- 这里的时间片就是批处理的时间片;
- 窗口长度和滑动间隔必须是批处理时间片的整数倍
举个例子:在NetworkWordCount程序中,如果需要每隔10秒钟,对最近30秒钟的数据进行统计,可以使用reduceByKeyAndWindow窗口算子替换reduceByKey来实现:
......
val words = lines.flatMap(.split(" "))
val wordPair = words.map(x=>(x,1))
//val wordCountResult = wordPair.reduceByKey(+_)
val wordCountResult = wordPair.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))
......
2.常用的窗口操作
- window(windowLength, slideInterval):基于源DStream产生的窗口化的批数据计算一个新的DStream
- countByWindow(windowLength, slideInterval):返回流中元素的一个滑动窗口数
- reduceByWindow(func,windowLength, slideInterval):返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数必须是相关联的以使计算能够正确的并行计算。
- reduceByKeyAndWindow(func,windowLength, slideInterval,[numTasks]):应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个Key的值均由给定的reduce函数聚集起来。注意,在默认情况下,这个算子利用了Spark默认的并发任务数去分组,也可以用numTasks参数设置任务数。
- reduceByKeyAndWindow(func,invFunc,windowLength, slideInterval,[numTasks]):比上面reduceByKeyAndWindow更高效的版本。可以使用前一个窗口的reduce的结果,来递增计算每个窗口的reduce值。这是通过对进入滑动窗口的新数据进行reduce操作,以及“逆减(inverse reduce)”离开窗口的旧数据来完成的。举例:当窗口滑动时,对键对应的值进行“一加一减”操作。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。可选参数numTasks可用于指定任务数。注意:使用此操作必须启用检查点。
- countByKeyAndWindow(windowLength,slideInterval,[numTasks]):应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个Key的值都是它们在滑动窗口中出现的频率。