SparkStreaming重点解析

这篇文章主要针对SparkStreaming的三个特性进行说明。
分别是Dstream Window checkpoint

sparkStreaming是一个实时数据流框架。数据可以从多种源输入,比如Kafka,Flume,TCP socket等,可以对输入进来的数据进行一系列处理,最终输出。
在内部处理时,会将流分解为多个批batch。最终交由spark处理。

图片.png

Dstream
官方有这样的解释:Dstream(Discretized Stream)意为离散化流,是Spark Streaming提供的基本抽象,他代表了一个连续的数据流,无论是从数据源接受的数据流还是处理过程中产生的数据流。Dstream的内部属性中一串连续的RDD。每个RDD在一定间隔时间后产生。
对于每个Dstream的操作,最终都会变为对RDD的操作,所以Streaming操作最后会交给Spark引擎(engine)进行计算。


图片.png

我们可以通过源码进行验证。

首先我们以一个程序为例

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
        ssc.checkpoint(".");

        // Initial state RDD input to mapWithState
        @SuppressWarnings("unchecked")
        List<Tuple2<String, Integer>> tuples =
                Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
        JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);

        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
                "localhost",9999, StorageLevels.MEMORY_AND_DISK_SER_2);

        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());  

        JavaPairDStream<String, Integer> wordsDstream = newWords.mapToPair(s -> new Tuple2<>(s, 1));

该程序第二步生成一个JavaReceiverInputDStream对象,由于是通过socketTextStream获得,所以获得了的是JavaReceiverInputDStream对象。

private[streaming] class SocketInputDStream[T](
_ssc : org.apache.spark.streaming.StreamingContext, 
host : scala.Predef.String,
 port : scala.Int,
 bytesToObjects : scala.Function1[java.io.InputStream, scala.Iterator[T]], storageLevel : org.apache.spark.storage.StorageLevel)
(implicit evidence$1 : scala.reflect.ClassTag[T]) 
extends org.apache.spark.streaming.dstream.ReceiverInputDStream[T] {...}

这里是从socket端获得流的Dstream它继承的是ReceiverInputDStream。
而这个ReceiverInputDStream是一个重要的类

abstract class ReceiverInputDStream[T](
_ssc :org.apache.spark.streaming.StreamingContext)
(implicit evidence$1 : scala.reflect.ClassTag[T])
 extends org.apache.spark.streaming.dstream.InputDStream[T] {
  override def compute(validTime : org.apache.spark.streaming.Time) : scala.Option[org.apache.spark.rdd.RDD[T]] = { /* compiled code */ }
....
}

它重写了compute方法,这个方法是用来生成RDD的。

然后看它继承的父类InputDStream

abstract class InputDStream[T](
_ssc : org.apache.spark.streaming.StreamingContext)
(implicit evidence$1 : scala.reflect.ClassTag[T]) 
extends org.apache.spark.streaming.dstream.DStream[T] {
override def dependencies : scala.List[org.apache.spark.streaming.dstream.DStream[_]] = { /* compiled code */ }
  override def slideDuration : org.apache.spark.streaming.Duration = { /* compiled code */ }
}

最后终于来到了Dstream了

abstract class DStream[T](
@scala.transient private[streaming] var ssc : org.apache.spark.streaming.StreamingContext)
(implicit evidence$1 : scala.reflect.ClassTag[T])
extends scala.AnyRef with scala.Serializable with org.apache.spark.internal.Logging {
private[streaming] var generatedRDDs : scala.collection.mutable.HashMap[org.apache.spark.streaming.Time, org.apache.spark.rdd.RDD[T]] = { /* compiled code */ }
 private[streaming] var zeroTime : org.apache.spark.streaming.Time = { /* compiled code */ }
 private[streaming] var rememberDuration : org.apache.spark.streaming.Duration = { /* compiled code */ }
 private[streaming] var storageLevel : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
 private[streaming] val mustCheckpoint : scala.Boolean = { /* compiled code */ }
 private[streaming] var checkpointDuration : org.apache.spark.streaming.Duration = { /* compiled code */ }
 private[streaming] val checkpointData : org.apache.spark.streaming.dstream.DStreamCheckpointData[T] = { /* compiled code */ }
 private[streaming] var graph : org.apache.spark.streaming.DStreamGraph = { /* compiled code */ }
}

这里我们注重关注它的属性:
generatedRDDs 用来放RDD的一个Hash表,key为time
其他的自行了解,有开始时间,checkPoint点,时间间隔,图等等。

下面我们来看看Dstream怎么生成RDD的。
由例子往下走

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }
override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }

Dstream实现了getOrCompute方法,该方法会根据该RDD是否在Hash表中进行判断,没有则创建有就提取。
map,reduce等操作都重写了compute方法但是都是调用父类的方法。

关于定时任务,job调度和生成详见博客
https://www.cnblogs.com/sparkbigdata/p/5513339.html
https://blog.csdn.net/xiaojun220/article/details/51441242

Window
window的思想和TCP中window十分类似,用一个窗口化操作,对数据偏移进行控制。每次想操作多少秒的数据,多久进行窗口移动都是可设定值。sparkStreaming也提供了相关的API可以针对不同操作进行使用,官网很详细,必须再说明
http://spark.apachecn.org/docs/cn/2.2.0/streaming-programming-guide.html#input-dstreams-%E5%92%8C-receivers%E6%8E%A5%E6%94%B6%E5%99%A8

checkpoint
通过整理几篇博文进行理解。
checkpoint意为检查点。类似于快照的意思。如果在spark计算中,DAG十分巨大,出错后因为丢失了中间数据而需要重新计算是很费事的。这时候,将文件写入缓存或者是持久化写入硬盘是很必要的。但是也不能保证硬盘不会出问题。所以我们需要检查故有了检查点,他的作用是将重要的中间数据写入一个高可用的地方。
3

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容