1 Spark Streaming 透彻理解之一

本文内容基于Spark最新版1.6.1

  1. Spark 最初只有Spark Core,通过逐步的发展,现在已扩展出Spark SQL、Spark Streaming、Spark MLlib(machine learning)、GraphX(graph)、Spark R等。 而Spark Streaming本是Spark Core上的一个子框架,如果我们试着去精通这个子框架,不仅仅能写出非常复杂的应用程序,还能够很好的驾驭Spark,进而研究并达到精通Spark的地步,及其寻找到Spark问题的解决之道。

  2. 我们为什么从Spark Streaming切入研究Spark源码的定制,因为Spark SQL涉及到很多SQL语法解析和优化的细节,对于我们集中精力研究Spark有所干扰;Spark R还不是很成熟,支持功能有限;GraphX最近几个版本基本没有改进,里面有许多数学算法;MLlib也涉及到相当多的数学知识。

  3. Spark Streaming的优势是在于可以结合SparkSQL、图计算、机器学习,使其功能更加强大。同时在Spark中Spark Streaming也是最容易出现问题的,因为它是不断的运行,内部比较复杂。掌握好Spark Streaming,可以去窥视Spark的一切!

  4. Spark Streaming到底是什么? Spark Streaming是一个流式计算框架,运行在Spark Core之上。这是一个流处理的时代,一切数据如果不是以流式来处理或者跟流式的处理不相关的话,都将是次数据,我们必将处在一个流的数据处理时代。Spark Streaming很像是基于Spark Core之上的一个应用程序。不像其他子框架,比如机器学习是把数学算法直接应用在Spark的RDD之上,Spark Streaming更像一般的应用程序那样,感知流进来的数据并进行相应的处理。很像顺其自然的一种感知操作,利用自己独有的“神经元”来对数据进行各类操作。

  5. Spark Streaming的几大优点

  • 对源源不断流进来的数据,能够迅速响应并立即给出你所要是反馈信息
  • Spark非常强大的地方在于它的流式处理可以在线的利用机器学习、图计算、Spark SQL或者Spark R的成果,这得益于Spark多元化、一体化的基础架构设计。也就是说,在Spark技术堆栈中,Spark Streaming可以调用任何的API接口,不需要做任何的设置。这是Spark无可匹敌之处,也是Spark Streaming必将一统天下的根源。
  1. 如何清晰的看到数据的流入、被处理的过程? 使用一个小技巧,通过调节放大Batch Interval的方式,来降低批处理次数,以方便看清楚各个环节。
    我们从已写过的广告点击的在线黑名单过滤的Spark Streaming应用程序入手,看一下是具体的实验源码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 使用Scala开发集群运行的Spark 在线黑名单过滤程序
 *
 * 背景描述:在广告点击计费系统中,我们在线过滤掉黑名单的点击,
进而保护广告商的利益,只进行有效的广告点击计费
 *  或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量;
 * 实现技术:使用transform Api直接基于RDD编程,进行join操作
 *
 */
object OnlineBlackListFilter {
  def main(args: Array[String]){
    /**
     * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
     * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
     * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
     * 只有1G的内存)的初学者       *
     */
    val conf = new SparkConf() //创建SparkConf对象
    conf.setAppName("OnlineBlackListFilter") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
    val ssc = new StreamingContext(conf, Seconds(30))
    /**
     * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,黑名单的生成往往有复杂的业务
     * 逻辑,具体情况算法不同,但是在Spark Streaming进行处理的时候每次都能工访问完整的信息
     */
    val blackList = Array(("hadoop", true),("mahout", true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
    val adsClickStream = ssc.socketTextStream("Master", 9999)
    /**
     * 此处模拟的广告点击的每条数据的格式为:time、name
     * 此处map操作的结果是name、(time,name)的格式
     */
    val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
    adsClickStreamFormatted.transform(userClickRDD => {
      //通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,又获得了相应点击内容是否在黑名单中
      val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
      /**
       * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean))
       * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在在值
       * 如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容;
       */
      val validClicked = joinedBlackListRDD.filter(joinedItem => {
        if(joinedItem._2._2.getOrElse(false))
        {
          false
        } else {
          true
        }
      })
      validClicked.map(validClick => {validClick._2._1})
    }).print
    /**
     * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
     */
    ssc.start()
    ssc.awaitTermination()
  }
}

集群中需要先执行nc,启动 9999端口

nc -lk 9999

将代码打包上传到集群运行

  1. 我们运行完程序,看到过滤结果以后,停止程序,打开HistoryServer http://master:18080/

  2. 点击App ID进去,打开,会看到如下图所示的4个Job,从实际执行的Job是1个Job,但是图中显示有4个Job,从这里可以看出Spark Streaming运行的时候自己会启动一些Job。



    先看看job id 为0 的详细信息


  3. 很明显是我们定义的blackListRDD数据的生成。对应的代码为

val blackList = Array((“Hadoop”, true), (“Mathou”, true)) 
//把Array变成RDD 
val blackListRDD = ssc.sparkContext.parallelize(blackList) 

并且它做了reduceBykey的操作(代码中并没有此步操作,SparkStreaming框架自行生成的)。
这里有两个Stage,Stage 0和Stage 1

Job 1的详细信息



一个makeRDD,这个RDD是receiver不断的接收数据流中的数据,在时间间隔达到batchInterval后,将所有数据变成一个RDD。并且它的耗时也是最长的59s

  1. 此处可以看出,receiver也是一个独立的job。由此我们可以得出一个结论:我们在应用程序中,可以启动多个job,并且不用的job之间可以相互配合,这就为我们编写复杂的应用程序打下了基础。
    我们点击上面的start at OnlineBlackListFilter.scala:64查看详细信息
  2. 根据上图的信息,只有一个Executor在接收数据,最最重要的是红色框中的数据本地性为PROCESS_LOCAL,由此可以知道receiver接收到数据后会保存到内存中,只要内存充足是不会写到磁盘中的
    即便在创建receiver时,指定的存储默认策略为
MEMORY_AND_DISK_SER_2 
def socketTextStream( 
hostname: String, 
port: Int, 
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
): ReceiverInputDStream[String] = withNamedScope(“socket text stream”) { 
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) 
}
  1. job 2的详细信息




    Job 2 将前两个job生成的RDD进行leftOuterJoin操作。
    从Stage Id的编号就可以看出,它是依赖于上两个Job的。
    Receiver接收数据时是在spark-master节点上,但是Job 2在处理数据时,数据已经到了spark-worker1上了(因为我的环境只有两个worker,数据并没有分散到所有worker节点,worker节点如果多一点,情况可能不一样,每个节点都会处理数据)
    点击上面的Stage Id 3查看详细信息:



    Executor上运行,并且有5个Task 。
    Job 3的详细信息

  2. 总结:我们可以看出,一个batchInterval并不是仅仅触发一个Job。
    根据上面的描述,我们更细致的了解了DStream和RDD的关系了。DStream就是一个个batchInterval时间内的RDD组成的。只不过DStream带上了时间维度,是一个无边界的集合。



    以上的连续4个图,分别对应以下4个段落的描述:
    Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各种地方。
    Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。
    Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。
    Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。
    在我们前面的实验中,每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。
    DStream是一个没有边界的集合,没有大小的限制。
    DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。
    锁定到时间片后,就是空间的操作,也就是对本时间片的对应批次的数据的处理。
    下面用实例来讲解数据处理过程。
    从Spark Streaming程序转换为Spark执行的作业的过程中,使用了DStreamGraph。
    Spark Streaming程序中一般会有若干个对DStream的操作。DStreamGraph就是由这些操作的依赖关系构成。

对DStream的操作会构建成DStream Graph



从每个foreach开始,都会进行回溯。从后往前回溯这些操作之间的依赖关系,也就形成了DStreamGraph。
在每到batchInterval时间间隔后,Job被触发,DStream Graph将会被转换成RDD Graph



空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容