一、Spark Streaming概述
Spark Streaming类似于Apache Storm,用于流式数据的处理,具有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter和简单的TCP套接字等等,而结果也能保存在很多地方,比如HDFS、数据库等。Spark Streaming使用离散化流作为抽象表示,叫做DStream。DStream是随着时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列。创建出来的DStream支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream提供了许多与RDD所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。
二、Spark Streaming和Storm的区别
三、架构与抽象
Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的。批次间隔一般设置在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以spark作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。四、Spark Streaming解析
1、初始化StreamingContext
import org.apache.spark._
import org.apache.spark.streaming._
val conf=new SparkConf().setAppName(appName).setMaster(master)
val ssc=new StreamingContext(conf,Second(1))
初始化完Context之后:
1、定义消息输入源来创建DStreams.
2、定义DStreams的转化操作和输出操作。
3、通过streamingContext.start()来启动消息采集和处理。
4、等待程序终止,可以通过streamingContext.awaitTermination()来设置
5、通过streamingContext.stop()来手动终止处理程序。
注意:
StreamingContext一旦启动,对DStreams的操作就不能修改了。在同一时间一个JVM中只有一个StreamingContext可以启动,stop()方法将同时停止SparkContext,可以传入参数stopSparkContext用于停止StreamingContext。
2、什么是DStreams
Discretized Stream是Spark Streaming的基本抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。对数据的操作也是按照RDD为单位来进行的。
3、DStreams输入
Spark Streaming原生支持一些不同的数据源。每个接收器都以Spark执行程序中一个长期运行的任务的形式运行,因此会占据分配给应用的CPU核心。此外,我们还需要有可用的CPU核心来处理数据。这意味这如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需的核心数。例如,如果我们想要在流计算应用中运行10个接收器,那么至少需要为应用分配11个CPU核心。所以如果在本地模式运行,不要使用local或者local[1]。
3.1基本数据源
3.1文件数据源
文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取
streamingContext.fileStream[KeyClass,ValueClass,InputFormatClass](dataDirectory)
Spark Streaming将会监控dataDirectory目录并不断处理移动进来的文件,记住目前不支持嵌套目录。
1、文件需要有相同的数据格式
2、文件进入dataDirectory的方式需要通过移动或者重命名来实现
3、一旦文件移动进目录,则不能修改,即便修改了也不会读取新数据。
如果文件比较简单,则可以使用streamingContext.textFileStream(dataDirectory)方法来读取文件。文件流不需要接收器,不需要单独分配CPU核。
3.2 自定义数据源
通过继承Receiver,并实现onStart、onStop方法来自定义数据源采集。可以通过streamingContext.receiverStream(<instance of custom receiver>)来使用自定义的数据采集源。
4、DStream转换
DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,比如:updateStateByKey()、transform()以及各种window相关的原语。
DStream的转化操作可以分为无状态(stateless)和有状态(stateful)两种:
(1)、在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。常见的RDD转化操作,例如map()、filter()、reduceByKey()等,都是无状态转化操作;
(2)、有状态操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。
4.1 无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。注意,针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._才能在scala中使用。
#这里列举一下无状态转化操作的例子
1、def map[U: ClassTag](mapFunc: T => U): DStream[U] 将源DStream中的每个元素通过一个函数func从而得到新的DStreams。
2、def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] 和map类似,但是每个输入的项可以被映射为0或更多项。
3、def filter(filterFunc: T => Boolean): DStream[T] 选择源DStream中函数func判为true的记录作为新DStream
4、def repartition(numPartitions: Int): DStream[T] 通过创建更多或者更少的partition来改变此DStream的并行级别。
5、def union(that: DStream[T]): DStream[T] 将一个具有相同slideDuration新的DStream和当前DStream进行合并,返回新的DStream
6、def count(): DStream[Long] 统计源DStreams中每个RDD所含元素的个数得到单元素RDD的新DStreams。
7、def reduce(reduceFunc: (T, T) => T): DStream[T] 通过函数func(两个参数一个输出)来整合源DStreams中每个RDD元素得到单元素RDD的DStream。
8、def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)] 对于DStreams中元素类型为K调用此函数,得到包含(K,Long)对的新DStream,其中Long值表明相应的K在源DStream中每个RDD出现的次数。
9、def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] 对(K,V)对的DStream调用此函数,返回同样(K,V)对的新DStream,但是新DStream中的对应V为使用reduce函数整合而来
10、def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] 两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream。
11、def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] 两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStream
12、def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] 将RDD到RDD映射的函数func作用于源DStream中每个RDD上得到新DStream。这个可用于在DStream的RDD上做任意操作。注意的是,在这个转换函数里面能够应用所有RDD的转换操作。
4.2 有状态转换操作
1、def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]
(1)、 S是你需要保存的状态的类型。
(2)、updateFunc 是定义了每一批次RDD如何来更新的状态值。 Seq[V] 是当前批次相同key的值的集合。 Option[S] 是框架自动提供的,上一次保存的状态的值。
(3)、updateStateByKey会返回一个新的DStream,该DStream中保存了(Key,State)的序列。
2、window 函数
(1)、def window(windowDuration: Duration, slideDuration: Duration): DStream[T] 基于对源DStream窗化的批次进行计算返回一个新的DStream,windowDuration是窗口大小,slideDuration滑动步长。
(2)、def countByWindow( windowDuration: Duration, slideDuration: Duration): DStream[Long] 注意,返回的是window中记录的条数。
(3)、def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
(4)、 def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration, slideDuration: Duration): DStream[(K, V)] 通过给定的窗口大小以滑动步长来应用reduceFunc函数,返回DStream[(K, V)], K就是DStream中相应的K,V是window应用了reduce之后产生的最终值。
(5)、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int =ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]
五、Spark Streaming应用案例
这里我们使用spark streaming来编写一个实时统计单词的案例:
2、在pom.xml文件中添加一下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sparkstreaming</groupId>
<artifactId>sparkstreamingdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.1</spark.version>
</properties>
<dependencies>
<!--添加Scala依赖-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<!--如果有provided存在,那么打包的时候该依赖不会打到jar包中-->
<scope>provided</scope>
</dependency>
<!--添加spark-core依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<!--添加spark-streaming依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!--添加编译支持,都编译成java1.8版本-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--添加Scala编译的支持-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!--添加打jar包的支持工具-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!--在你应用maven package这个阶段的时候,该插件会启动-->
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<!--在你jar包中指定启动类-->
<manifest>
<mainClass>com.SparkStreaming.WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
3、编写Scala文件
接下来编写scala代码,每一行都有注释,可以仔细看一下:
package com.SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount extends App{
//需要新建一个sparkConf变量,来提供spark的配置
val sparkConf=new SparkConf().setAppName("StreamWordCount").setMaster("local[2]")
//新建一个StreamingContext入口
val ssc=new StreamingContext(sparkConf,Seconds(2))
//从master机器上的9999端口不断的获取输入的文本数据
val lines=ssc.socketTextStream("master",9999)
//将每行文本通过空格分割多个单词
val words=lines.flatMap(_.split(" "))
//将每一个单词装换成一个元组
val pairs=words.map((_,1))
//根据单词来统计相同单词的数量
val result=pairs.reduceByKey(_+_)
//打印结果
result.print()
//启动你的流式处理程序
ssc.start()
//等待你的停止信号
ssc.awaitTermination()
}
4、写完这个项目代码之后,打包上传到服务器5、最后我们就是测试一下,因为是实时计算所以我们这里需要有一个输入的地方,在代码我们已经给出了一个监听端口号,所以我们另打开一个终端输入以下命令:
nc -lk 9999
//或者(注意这里的l不是1是小写的L)
nc -l -p 9999
接下来我们就是开启我们的程序,这里要注意:开启的顺序不能错,先开启监听端口号,然后再启动程序:
./spark-submit --class com.SparkStreaming.WordCount /root/Pro-jar/sparkstreamingdemo-1.0-SNAPSHOT-jar-with-dependencies.jar
案例到这里就结束了。
这里主要是浅显的讲解了一下Spark Streaming,后期还会扩展。