Spark Streaming
基于Spark之上的流处理
流:source ==> compute ==> store
离线是特殊的流
letting you write streaming jobs
the same way you write batch jobs
out of the box 开箱即用 OOTB
编程模型:DStream : represents a continuous stream of data
Core:RDD
SQL: DF/DS
Streaming入口:StreamingContext
Core:SparkContext
SQL:
SparkSession
SQLContext/HiveContext
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Streaming job running receiver 0 *******
18/09/07 22:42:41 WARN StreamingContext:
spark.master should be set as local[n], n > 1
in local mode
if you have receivers to get data,
otherwise Spark jobs will not get resources
to process the received data.
socket: 有receiver 占用一个core
对DStream做一个操作,其实就是对这个DStream底层的所有RDD都做相同的操作
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.textFileStream("/streaming/input/")
val words = lines.flatMap(_.split("\t"))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Exception in thread "main" java.lang.IllegalArgumentException:
requirement failed:
The checkpoint directory has not been set.
Please set it by StreamingContext.checkpoint().
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val curr = currentValues.sum
val pre = preValues.getOrElse(0)
Some(curr + pre)
}
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/streaming/checkpoint/")
val lines = ssc.socketTextStream("hadoop000",8888)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val result = pairs.updateStateByKey(updateFunction)
result.print()
ssc.start()
ssc.awaitTermination()
./spark-submit \
--master local[2] \
--name StreamingStateApp \
--class com.ruozedata.spark.streaming.day01.StreamingStateApp \
/home/hadoop/lib/g3-spark-1.0.jar
(love,3)
(juren,2)
(you,3)
(ruoze,2)
(say,2)
(i,3)
(zidong,4)
juren say
juren say