(一)SparkStreaming简单介绍

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

sparkstreaming并不是一个真正的实时流处理框架,而是一个mini batch的框架
storm和flink是真正的实时流处理框架
SparkStreaming编程模型:DStream(discretized stream),represents a continuous stream of data,Internally, a DStream is represented as a sequence of RDDs
SparkCore的编程模型: RDD
SparkSQL的编程模型: DF/DS
SparkStreaming入口:StreamingContext
SparkCore入口: SparkContext
SparkSQL入口:SparkSession

[hadoop@hadoop000 bin]$ ./spark-shell --master local[2]

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc.getConf, Seconds(1))
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:924)
org.apache.spark.repl.Main$.createSparkSession(Main.scala:103)
<init>(<console>:15)
<init>(<console>:43)
<init>(<console>:45)
.<init>(<console>:49)
.<clinit>(<console>)
.$print$lzycompute(<console>:7)
.$print(<console>:6)
$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
  at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2456)
  at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2452)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2452)
  at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2541)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:84)
  at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:838)
  at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)
  ... 53 elided

spark报错:Only one SparkContext may be running in this JVM
原因通过以下StreamingContext的源码可以看出:

/**
   * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }
------------------------------------------------------------------------------------
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
    new SparkContext(conf)
  }

  private[streaming] def createNewSparkContext(
      master: String,
      appName: String,
      sparkHome: String,
      jars: Seq[String],
      environment: Map[String, String]
    ): SparkContext = {
    val conf = SparkContext.updatedConf(
      new SparkConf(), master, appName, sparkHome, jars, environment)
    new SparkContext(conf)
  }

sc.getConf相当于传进去了一个conf调用的就是上边这个方法,这个方法里的def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}又new了一个sparkcontext,所以会报错
应该调用下面这个方法:

/**
   * Create a StreamingContext using an existing SparkContext.
   * @param sparkContext existing SparkContext
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
  }

也就是直接传进去一个sparkcontext,再次启动就没问题了:

[hadoop@hadoop000 bin]$ ./spark-shell --master local[2]

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@7f64bd7

scala> val lines = ssc.socketTextStream("localhost",9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@546c30c2

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1e75af65

scala> val pairs = words.map(x=>(x,1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1aebe759

scala> val wordcounts = pairs.reduceByKey(_+_)
wordcounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@1bd6bfb0

scala> wordcounts.print()

scala> ssc.start()

-------------------------------------------                                     
Time: 1537941150000 ms
-------------------------------------------

-------------------------------------------                                     
Time: 1537941155000 ms
-------------------------------------------

-------------------------------------------                                     
Time: 1537941160000 ms
-------------------------------------------

-------------------------------------------                                     
Time: 1537941165000 ms
-------------------------------------------

[Stage 0:>                                                          (0 + 1) / 1]
[hadoop@hadoop000 ~]$ nc -lk 9999
huluwa huluwa yi gen teng shang qi ge wa
18/09/26 13:53:25 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/09/26 13:53:25 WARN storage.BlockManager: Block input-0-1537941205200 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------                                     
Time: 1537941210000 ms
-------------------------------------------
(gen,1)
(teng,1)
(huluwa,2)
(wa,1)
(yi,1)
(ge,1)
(qi,1)
(shang,1)

-------------------------------------------                                     
Time: 1537941215000 ms
-------------------------------------------

IDEA测试代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingApp")
    val ssc = new StreamingContext(conf,Seconds(5))
    val lines = ssc.socketTextStream("hadoop000",9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(x=>(x,1))
    val wordcounts = pairs.reduceByKey(_+_)
    wordcounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

如果把上方代码中local[2]改为local[1],运行信息如下:

......
18/09/27 16:37:52 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
....
18/09/27 16:38:30 INFO JobScheduler: Added jobs for time 1538037510000 ms
18/09/27 16:38:35 INFO JobScheduler: Added jobs for time 1538037515000 ms
18/09/27 16:38:40 INFO JobScheduler: Added jobs for time 1538037520000 ms
18/09/27 16:38:45 INFO JobScheduler: Added jobs for time 1538037525000 ms
18/09/27 16:38:50 INFO JobScheduler: Added jobs for time 1538037530000 ms
18/09/27 16:38:55 INFO JobScheduler: Added jobs for time 1538037535000 ms
18/09/27 16:39:00 INFO JobScheduler: Added jobs for time 1538037540000 ms
18/09/27 16:39:05 INFO JobScheduler: Added jobs for time 1538037545000 ms
18/09/27 16:39:10 INFO JobScheduler: Added jobs for time 1538037550000 ms
18/09/27 16:39:15 INFO JobScheduler: Added jobs for time 1538037555000 ms
18/09/27 16:39:20 INFO JobScheduler: Added jobs for time 1538037560000 ms
18/09/27 16:39:25 INFO JobScheduler: Added jobs for time 1538037565000 ms
.....

关于这个问题,官网的说明如下:
1)When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).

2)Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.

receiver会占用一个core,如果只有一个core,那么只能接收数据而不能处理数据,因为没有core可用于处理数据了

注意:File streams do not require running a receiver so there is no need to allocate any cores for receiving file data.
读文件系统的文件(file stream)不需要receiver,数据都存在HDFS上,假设挂掉了重新读一次就行了,不需要一个专门的receiver接收数据
查看源码可以知道原因:

/**
   * Creates an input stream from TCP source hostname:port. Data is received using
   * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
   * lines.
   * @param hostname      Hostname to connect to for receiving data
   * @param port          Port to connect to for receiving data
   * @param storageLevel  Storage level to use for storing the received objects
   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   * @see [[socketStream]]
   */
  def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  }
------------------------------------------------------------------------------------------------------------------
 /**
   * Gets the receiver object that will be sent to the worker nodes
   * to receive data. This method needs to defined by any specific implementation
   * of a ReceiverInputDStream.
   */
  def getReceiver(): Receiver[T]

socketTextStream返回值是个抽象类ReceiverInputDStream,该类下有个getReceiver方法

/**
   * Create an input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them as text files (using key as LongWritable, value
   * as Text and input format as TextInputFormat). Files must be written to the
   * monitored directory by "moving" them from another location within the same
   * file system. File names starting with . are ignored.
   * @param directory HDFS directory to monitor for new file
   */
  def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }

textFileStream的返回值直接就是个DStream

需要注意的几点:

  • Once a context has been started, no new streaming computations can be set up or added to it.
  • Once a context has been stopped, it cannot be restarted.
  • Only one StreamingContext can be active in a JVM at the same time.
  • stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  • A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

对DStream做一个操作,相当于对DStream底层的每个RDD做相同的操作
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容

  • Spark Streaming 编程指南1.概述Spark Streaming 是 Spark Core API ...
    it_zzy阅读 3,822评论 0 16
  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,319评论 0 10
  • 科技资讯2017年6月,最新一期的TIOBE编程语言排行已发布,对比去年同期,排名前5名的语言的排序没有发生变化,...
    唐先明阅读 197评论 0 1
  • 1、begin end 顺序语句块,fork join 并行语句块。 2、=是阻塞赋值,顺序执行,<=是非阻塞赋值...
    BadRosoul阅读 1,209评论 0 1
  • 红衣霓裳,露水楼有名的艳妓。有倾城舞姿,却只卖身不卖艺。无数士大夫、江湖侠客都拜倒在她的石榴裙下,愿为她赎身。但她...
    李宛宸679阅读 1,407评论 65 37