『听风居士』1.Spark Streaming另类实验与 Spark Streaming本质解析

1 Spark Streaming 透彻理解之一 - 简书
http://www.jianshu.com/p/8de6ec8513ca

1.Spark Streaming另类实验与 Spark Streaming本质解析 - 听风居士 - 博客园
http://www.cnblogs.com/zhouyf/p/5471477.html

1 Spark源码定制选择从Spark Streaming入手
我们从第一课就选择Spark子框架中的SparkStreaming。
那么,我们为什么要选择从SparkStreaming入手开始我们的Spark源码版本定制之路?
有下面几个方面的理由:
1)Spark大背景
Spark 最开始没有我们今天看到的Spark Streaming、GraphX、Machine Learning、Spark SQL和Spark R等相关子框架内容,最开始就只有很原始的Spark Core。我们要做Spark源码定制,做自己的发行版本,以SparkStreaming为切入点,Spark Streaming本身是 Spark Core上的一个子框架,所以我们透过一个子框架的彻底研究,肯定可以精通Spark力量的源泉和所有问题的解决之道;
2)为什么不选Spark SQL?
我们知道,Spark有很多子框架,现在除了基于Spark Core编程之外,用得最多的就是SparkSQL。Spark SQL由于涉及了太多的SQL语法细节的解析或者说优化,其实这些解析或优化,对于我们集 中精力去研究Spark而言,它是一件重要的事情,但其实不是最重要的一件事情。由于它有太多的SQL语法解析,这个不是一个合适的子框架来让我们研究。
3)为什么不选Spark R?
Spark R现在很不成熟,而且支持功能有限,这个也从我们的候选列表中删除掉。
4)为什么不选Spark GraphX(图计算)?
如果大家关注了Spark的演进或发展的话,Spark最近发布的几个版本,Spark图计算基本没有改进。如果按照这个趋势的话,Spark官方机构似乎 在透露一个信号,图计算已经发展到尽头了。所以说,我们如果要研究的话,肯定不会去做一个看上去发展到尽头的东西。另外,至于图计算而言,它有很多数学级 别的算法,而我们是要把Spark做到极致,这样的话,数学这件事情很重要,但对我们来说却不是最重要的。
5)为什么不选Spark MLlib(机器学习)?
Spark机器学习在封装了Vector(向量)和Metrics基础之上,加上Spark的RDD,构建了它的众多的库。这个也由于涉及到了太多的数学的知识,所以我们选机器学习其实也不是一个太好的选择。
综上所述,我们筛选之下,Spark Streaming是我们唯一的选择。
我 们回顾过去,2015年是Spark最火的一年,最火的国家主要是美国。其实,2015年也是流式处理最火的一年。从从业人员的待遇上看,不论2015年 还是2016年,在搞大数据开发的公司中,以Spark岗位招聘的待遇一定是最高的。2016上半年,据StackOverflow开展的一项调查结果显 示,在大数据领域,Spark从业人员的待遇是最高的。在调查中,50%以上的人认为,Spark中最吸引人的是Spark Streaming。总之,大家考虑用Spark,主要是因为Spark Streaming。
Spark Streaming到底有什么魔力?
1)它是流式计算
这是一个流处理的时代,一切数据如果不是流式的处理或者跟流式的处理不相关的话,都是无效的数据。这句话会不断地被社会的发展所证实。
2)流式处理才是真正的我们对大数据的初步印象
一方面,数据流进来,立即给我们一个反馈,这不是批处理或者数据挖掘能做到的。另一方面,Spark非常强大的地方在于它的流式处理可以在线的利用机器学习、图计算、Spark SQL或者Spark R的成果,这得益于Spark多元化、一体化的基础架构设计。也就是说,在Spark技术堆栈中,Spark Streaming可以调用任何的API接口,不需要做任何的设置。这是Spark无可匹敌之处,也是Spark Streaming必将一统天下的根源。这个时代的流处理单打独斗已经不行了,Spark Streaming必然会跟多个Spark子框架联合起来,称霸大数据领域。
3)流式处理“魅力和复杂”的双重体
如果你精通SparkStreaming,你就知道Spark Streaming以及它背后的兄弟框架,展示了Spark和大数据的无穷魅力。不过,在Spark的所有程序中,肯定是基于SparkStreaming的应用程序最容易出问题。为什么?因为数据不断流进来,它要动态控制数据的流入,作业的切分还有数据的处理。这些都会带来极大的复杂性。
4)与其他Spark子框架的巨大区别
如果你仔细观察,你会发现,Spark Streaming很像是基于Spark Core之上的一个应用程序。不像其他子框架,比如机器学习是把数学算法直接应用在Spark的RDD之上,Spark Streaming更像一般的应用程序那样,感知流进来的数据并进行相应的处理。
所以如果要做Spark的定制开发,Spark Streaming则提供了最好的参考,掌握了Spark Streaming也就容易开发任意其他的程序。当然想掌握SparkStreaming,但不去精通Spark Core的话,那是不可能的。Spark Core加Spark Streaming更是双剑合璧,威力无穷。我们选择SparkStreaming来入手,等于是找到了关键点。如果对照风水学的说法,对于Spark,我们算是已经幸运地找到了龙脉。如果要寻龙点穴,那么Spark Streaming就是龙穴之所在。找到了穴位,我们就能一日千里。

2 Spark Streaming另类在线实验

我们在研究Spark Streaming的过程中,会有困惑的事情:如何清晰的看到数据的流入、被处理的过程?
使用一个小技巧,通过调节放大Batch Interval的方式,来降低批处理次数,以方便看清楚各个环节。
我们从已写过的广告点击的在线黑名单过滤的Spark Streaming应用程序入手。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object OnlineBlackListFilter { def main(args: Array[String]) { /** * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息。 * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置 * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如 * 只有1G的内存)的初学者 /val conf = new SparkConf() //创建SparkConf对象 conf.setAppName("OnlineBlackListFilter") //设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setMaster("spark://Master:7077") //此时,程序在Spark集群val ssc = new StreamingContext(conf,Seconds(300)) /* * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,黑名单的生成往往有复杂的业务 * 逻辑,具体情况算法不同,但是在Spark Streaming进行处理的时候每次都能工访问完整的信息 /val blackList = Array(("hadoop",true),("mahout",true)) val blackListRDD = ssc.sparkContext.parallelize(blackList,8) //监听主机Master上的9999端口,接收数据val adsClickStream = ssc.socketTextStream("Master" ,9999) /* * 此处模拟的广告点击的每条数据的格式为:time、name * 此处map操作的结果是name、(time,name)的格式 /val adsClientStreamFormated = adsClickStream.map(ads=>(ads.split(" ")(1),ads)) adsClientStreamFormated.transform(userClickRDD => { //通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,又获得了相应点击内容是否在黑名单中val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /* * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean)) * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在在值 * 如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容; /val validClicked = joinedBlackListRDD.filter(joinedItem=>{ if(joinedItem._2._2.getOrElse(false)){ false }else{ true } }) validClicked.map(validClick => {validClick._2._1}) }).print() /* * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费 */ ssc.start() ssc.awaitTermination() }}

把程序的Batch Interval设置从30秒改成300秒:
val ssc = new StreamingContext(conf, Seconds(300))
重新生成一下jar包 。

Spark集群有5台机器:Master、Worker1、Worker2、Worker3、Worker4。
启动HDFS集群:start-dfs.sh启动Spark集群:start-all.sh启动Spark的History Server:start-history-server.sh

打开数据发送的端口:nc -lk 9999。
用spark-submit运行前面生成的jar包。
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.sparkstreaming.OnlineBlackListFilter --master spark://Master:7077 /root/Documents/SparkApps/OnlineBlackListFilter.jar
在数据发送端口输入若干数据,比如:

1375864674543 Tom
1375864674553 Spy
1375864674571 Andy
1375864688436 Cheater
1375864784240 Kelvin
1375864853892 Steven
1375864979347 John

打开浏览器,看History Server的日志信息:


点击最新的应用,看我们目前运行的应用程序中有些什么Job:



总共竟然有5个Job。这完全不是我们此前做Spark SQL之类的应用程序时看到的样子。

我们接下来看一看这些Job的内容,主要揭示一些现象,不会做完全深入的剖析,只是为了先让大家进行一些思考。

Job 0:此Job不体现我们的业务逻辑代码。这个Job是出于对后面计算的负载均衡的考虑。

Job 0包含有Stage 0、Stage 1。随便看一个Stage,比如Stage 1。看看其中的Aggregated Metrics by Executor部分:
发现此Stage在所有Executor上都存在。
Job 1:运行时间比较长,耗时1.5分钟。
点击Stage 2的链接,进去看看Aggregated Metrics By Executor部分:
可以知道,Stage 2只在Worker4上的一个Executor执行,而且执行了1.5分钟。是否会觉得奇怪:从业务处理的角度看,我们发送的那么一点数据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢?
从DAG Visualization部分,就知道此Job实际就是启动了一个接收数据的Receiver:
原来Receiver是通过一个Job来启动的。那肯定有一个Action来触发它。看看Tasks部分:
只有一个Worker运行此Job。是用于接收数据。Locality Level是PROCESS_LOCAL,原来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。
看来,Spark Streaming应用程序启动后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。
重要启示:一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。这一认识为我们写复杂Spark程序奠定了良好的基础。

Job 2:看Details可以发现有我们程序的主要业务逻辑,体现在Stag 3、Stag4、Stag 5中。

我们看Stag3、Stage4的详情,可以知道这2个Stage都是用4个Executor执行的。所有数据处理是在4台机器上进行的。
Stag 5只在Worker4上。这是因为这个Stage有Shuffle操作。
Job3:有Stage 6、Stage 7、Stage 8。其中Stage 6、Stage 7被跳过。
看看Stage 8的Aggregated Metrics by Executor部分。可以看到,数据处理是在4台机器上进行的:

Job4:****也体现了我们应用程序中的业务逻辑 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage 10被跳过。
看看Stage 11的详情。可以看到,数据处理是在Worker2之外的其它3台机器上进行的:
综合以上的现象可以知道,Spark Streaming的一个应用中,运行了这么多Job,远不是我们从网络博客或者书籍上看的那么简单。我们有必要通过这些现象,反过来回溯去寻根问源。不过这次暂不做深入分析。
我们的神奇之旅才刚刚开始。
3 瞬间理解Spark Streaming本质
我们先看一张图:


以上的连续4个图,分别对应以下4个段落的描述:
Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各种地方。
Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。
Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。
Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。

在我们前面的实验中,每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。

DStream是一个没有边界的集合,没有大小的限制。
DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。

锁定到时间片后,就是空间的操作,也就是对本时间片的对应批次的数据的处理。

下面用实例来讲解数据处理过程。
从Spark Streaming程序转换为Spark执行的作业的过程中,使用了DStreamGraph。
Spark Streaming程序中一般会有若干个对DStream的操作。DStreamGraph就是由这些操作的依赖关系构成。

从程序到DStreamGraph的转换,如以下图例所示:
本例中,从每个foreach开始,都会进行回溯。从后往前回溯这些操作之间的依赖关系,也就形成了DStreamGraph。执行从DStream到RDD的转换,也就形成了RDD Graph,如下图所示:
空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。现在再去读官方的Spark Streaming的文档,就好理解多了。

看来我们的学习,将从Spark Streaming的现象开始,深入到Spark Core和Spark Streaming的本质。
备注:本博客内容来源于Spark发行版本定制课程

分类: Spark Streaming 进阶

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容