通过案例对spark streaming透彻理解三板斧之三

    通过案例对spark streaming透彻理解三板斧之三:解密Spark Streaming运行机制和框架

    首先我们运行以下的程序,然后通过这个程序的运行过程进一步加深理解Spark Streaming流处理的Job的执行的过程,代码如下:

object OnlineForeachRDD2DB {

def main(args: Array[String]){

/*

* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置

* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

* 只有1G的内存)的初学者       *

*/

val conf = new SparkConf() //创建SparkConf对象

conf.setAppName("OnlineForeachRDD") //设置应用程序的名称,在程序运行的监控界面可以看到名称

conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

conf.setMaster("local[6]")

//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口

val ssc = new StreamingContext(conf, Seconds(5))

val lines = ssc.socketTextStream("Master", 9999)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.foreachRDD { rdd =>

rdd.foreachPartition { partitionOfRecords => {

// ConnectionPool is a static, lazily initialized pool of connections

val connection = ConnectionPool.getConnection()

partitionOfRecords.foreach(record => {

val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"

val stmt = connection.createStatement();

stmt.executeUpdate(sql);

})

ConnectionPool.returnConnection(connection)  // return to the pool for future reuse

}

}

}

/**

*  在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,

*在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和

*ReceiverTacker的start方法:

*  1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job

*  2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动

*ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把

*数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过

*ReceivedBlockTracker来管理接受到的元数据信息每个BatchInterval会产生一个具体的Job,

*其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG

*而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,

*在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中

*基于RDD的Action触发真正的作业的运行),

*  为什么使用线程池呢?

*  1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task

*有异曲同工之妙;

*  2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;

*/

ssc.start()

ssc.awaitTermination()

}

}

Spark Streaming容错机制

  (1)driver级别容错

(2)Executor级别容错

        1.接收数据的安全性

        2.执行的安全性

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容