【必会】SparkStreaming的窗口操作及实战

Window Operations(窗口操作)可以设置窗口大小和滑动窗口间隔来动态的获取当前Streaming的状态。基于窗口的操作会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

下面,通过一张图来描述SparkStreaming的窗口操作,如图所示。

基于窗口的操作需要两个参数,如下:

        窗口长度(windowDuration),控制每次计算最近的多少个批次的数据;

        滑动间隔(slideDuration),用来控制对新的 DStream 进行计算的间隔。

        两者都必须是 StreamingContext 中批次间隔(batchDuration)的整数倍。

接下来,勾叔带大家来使用窗口操作,即使用窗口操作进行实战。

每秒发送1个数字

```

import java.io.PrintWriter

import java.net.{ServerSocket,Socket}

objectSocketLikeNCWithWindow{

    defmain(args:Array[String]):Unit={

        valport=1521

        valss=newServerSocket(port)

        valsocket:Socket=ss.accept()

        println("connect to host : "+socket.getInetAddress)

        vari=0

        // 每秒发送1个数

        while(true) {

            i+=1

            valout=newPrintWriter(socket.getOutputStream)

            out.println(i)

            out.flush()

            Thread.sleep(1000)

        }

    }

}

案例一

观察窗口的数据;观察 batchDuration、windowDuration、slideDuration 三者之间的关系;使用窗口相关的操作,具体代码演示如下:

packagecn.lagou.streaming

importorg.apache.spark.SparkConf

importorg.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}

importorg.apache.spark.streaming.{Seconds,StreamingContext}

objectWindowDemo{

    defmain(args:Array[String]):Unit={

        valconf=newSparkConf().setMaster("local[*]")

        .setAppName(this.getClass.getCanonicalName)

        // 每 5s 生成一个RDD(mini-batch)

        valssc=newStreamingContext(conf,Seconds(5))

        ssc.sparkContext.setLogLevel("error")

        vallines:ReceiverInputDStream[String]=

        ssc.socketTextStream("localhost",1521)

        lines.foreachRDD{ (rdd,time)=>println(s"rdd = ${rdd.id}; time = $time")

            rdd.foreach(value=>println(value))

        }

        valres1:DStream[String]=lines.reduceByWindow(_+" "+_,Seconds(20),Seconds(10))

        res1.print()

        valres2:DStream[String]=lines.window(Seconds(20),Seconds(10))

        res2.print()

        // 求窗口元素的和

        valres3:DStream[Int]=lines.map(_.toInt).reduceByWindow(_+_,Seconds(20),Seconds(10))

        res3.print()

        // 求窗口元素的和

        valres4=res2.map(_.toInt).reduce(_+_)

        res4.print()

        ssc.start()

        ssc.awaitTermination()

    }

}

案例二

热点搜索词实时统计。每隔 10 秒,统计最近20秒的词出现的次数,具体代码演示如下:

packagecn.lagou.streaming

importorg.apache.spark.SparkConf

importorg.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}

importorg.apache.spark.streaming.{Seconds,StreamingContext}

objectHotWordStats{

    defmain(args:Array[String]):Unit={

        valconf:SparkConf=newSparkConf().setMaster("local[2]")

                            .setAppName(this.getClass.getCanonicalName)

        valssc=newStreamingContext(conf,Seconds(2))

        ssc.sparkContext.setLogLevel("ERROR")

        //设置检查点,检查点具有容错机制。生产环境中应设置到HDFS

        ssc.checkpoint("data/checkpoint/")

        vallines:ReceiverInputDStream[String]=ssc.socketTextStream("localhost",9999)

        valwords:DStream[String]=lines.flatMap(_.split("\\s+"))

        valpairs:DStream[(String,Int)]=words.map(x=>(x,1))

        // 通过reduceByKeyAndWindow算子, 每隔10秒统计最近20秒的词出现的次数

        // 后 3个参数:窗口时间长度、滑动窗口时间、分区

        valwordCounts1:DStream[(String,Int)]=pairs.reduceByKeyAndWindow(

                        (a:Int,b:Int)=>a+b,Seconds(20),Seconds(10),2)

        wordCounts1.print

        // 这里需要checkpoint的支持

        valwordCounts2:DStream[(String,Int)]=pairs.reduceByKeyAndWindow(_+_,_-_,

                                                Seconds(20),Seconds(10),2)

        wordCounts2.print

        ssc.start()

        ssc.awaitTermination()

    }

}

```

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容