Spark之Spark Streaming原理

一、Spark Streaming概述

Spark Streaming类似于Apache Storm,用于流式数据的处理,具有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter和简单的TCP套接字等等,而结果也能保存在很多地方,比如HDFS、数据库等。Spark Streaming使用离散化流作为抽象表示,叫做DStream。DStream是随着时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列。创建出来的DStream支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream提供了许多与RDD所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

二、Spark Streaming和Storm的区别

区别

三、架构与抽象

Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的。批次间隔一般设置在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以spark作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。

Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个RDD序列,每个RDD代表数据流中一个时间片内的数据。
Spark Streaming为每个输入源启动对应的接收器。接收器以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为RDD。它们收集到输入数据后会把数据复制到另一个执行器进程来保障容错性。数据保存在执行器进程的内存中,和缓存RDD的方式一样。驱动器程序中的StreamingContext会周期性地运行Spark作业来处理这些数据,把数据与之前时间区间中的RDD进行整合。

四、Spark Streaming解析

1、初始化StreamingContext

import org.apache.spark._
import org.apache.spark.streaming._
val conf=new SparkConf().setAppName(appName).setMaster(master)
val ssc=new StreamingContext(conf,Second(1))

初始化完Context之后:
1、定义消息输入源来创建DStreams.
2、定义DStreams的转化操作和输出操作。
3、通过streamingContext.start()来启动消息采集和处理。
4、等待程序终止,可以通过streamingContext.awaitTermination()来设置
5、通过streamingContext.stop()来手动终止处理程序。
注意:
StreamingContext一旦启动,对DStreams的操作就不能修改了。在同一时间一个JVM中只有一个StreamingContext可以启动,stop()方法将同时停止SparkContext,可以传入参数stopSparkContext用于停止StreamingContext。

2、什么是DStreams

Discretized Stream是Spark Streaming的基本抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。对数据的操作也是按照RDD为单位来进行的。

3、DStreams输入

Spark Streaming原生支持一些不同的数据源。每个接收器都以Spark执行程序中一个长期运行的任务的形式运行,因此会占据分配给应用的CPU核心。此外,我们还需要有可用的CPU核心来处理数据。这意味这如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需的核心数。例如,如果我们想要在流计算应用中运行10个接收器,那么至少需要为应用分配11个CPU核心。所以如果在本地模式运行,不要使用local或者local[1]。

3.1基本数据源

3.1文件数据源

文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取

streamingContext.fileStream[KeyClass,ValueClass,InputFormatClass](dataDirectory)

Spark Streaming将会监控dataDirectory目录并不断处理移动进来的文件,记住目前不支持嵌套目录。
1、文件需要有相同的数据格式
2、文件进入dataDirectory的方式需要通过移动或者重命名来实现
3、一旦文件移动进目录,则不能修改,即便修改了也不会读取新数据。
如果文件比较简单,则可以使用streamingContext.textFileStream(dataDirectory)方法来读取文件。文件流不需要接收器,不需要单独分配CPU核。

3.2 自定义数据源

通过继承Receiver,并实现onStart、onStop方法来自定义数据源采集。可以通过streamingContext.receiverStream(<instance of custom receiver>)来使用自定义的数据采集源。

4、DStream转换

DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,比如:updateStateByKey()、transform()以及各种window相关的原语。
DStream的转化操作可以分为无状态(stateless)和有状态(stateful)两种:
(1)、在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。常见的RDD转化操作,例如map()、filter()、reduceByKey()等,都是无状态转化操作;
(2)、有状态操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。

4.1 无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。注意,针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._才能在scala中使用。

#这里列举一下无状态转化操作的例子
1、def map[U: ClassTag](mapFunc: T => U): DStream[U] 将源DStream中的每个元素通过一个函数func从而得到新的DStreams。
2、def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] 和map类似,但是每个输入的项可以被映射为0或更多项。
3、def filter(filterFunc: T => Boolean): DStream[T] 选择源DStream中函数func判为true的记录作为新DStream
4、def repartition(numPartitions: Int): DStream[T]   通过创建更多或者更少的partition来改变此DStream的并行级别。
5、def union(that: DStream[T]): DStream[T]  将一个具有相同slideDuration新的DStream和当前DStream进行合并,返回新的DStream
6、def count(): DStream[Long]  统计源DStreams中每个RDD所含元素的个数得到单元素RDD的新DStreams。
7、def reduce(reduceFunc: (T, T) => T): DStream[T]  通过函数func(两个参数一个输出)来整合源DStreams中每个RDD元素得到单元素RDD的DStream。
8、def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]   对于DStreams中元素类型为K调用此函数,得到包含(K,Long)对的新DStream,其中Long值表明相应的K在源DStream中每个RDD出现的次数。
9、def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]  对(K,V)对的DStream调用此函数,返回同样(K,V)对的新DStream,但是新DStream中的对应V为使用reduce函数整合而来
10、def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]  两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream。
11、def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]  两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStream
12、def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]  将RDD到RDD映射的函数func作用于源DStream中每个RDD上得到新DStream。这个可用于在DStream的RDD上做任意操作。注意的是,在这个转换函数里面能够应用所有RDD的转换操作。

4.2 有状态转换操作

1、def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]
           (1)、 S是你需要保存的状态的类型。
           (2)、updateFunc 是定义了每一批次RDD如何来更新的状态值。 Seq[V] 是当前批次相同key的值的集合。 Option[S] 是框架自动提供的,上一次保存的状态的值。
           (3)、updateStateByKey会返回一个新的DStream,该DStream中保存了(Key,State)的序列。
2、window 函数
           (1)、def window(windowDuration: Duration, slideDuration: Duration): DStream[T] 基于对源DStream窗化的批次进行计算返回一个新的DStream,windowDuration是窗口大小,slideDuration滑动步长。
           (2)、def countByWindow( windowDuration: Duration, slideDuration: Duration): DStream[Long]  注意,返回的是window中记录的条数。
           (3)、def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]   通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
           (4)、 def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]   通过给定的窗口大小以滑动步长来应用reduceFunc函数,返回DStream[(K, V)], K就是DStream中相应的K,V是window应用了reduce之后产生的最终值。
           (5)、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int =ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]

五、Spark Streaming应用案例

这里我们使用spark streaming来编写一个实时统计单词的案例:

1、这里我使用的是idea创建的一个maven工程
接下来选择工程的目录,然后finish即可。

2、在pom.xml文件中添加一下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sparkstreaming</groupId>
    <artifactId>sparkstreamingdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.1</spark.version>
    </properties>
    <dependencies>
        <!--添加Scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
            <!--如果有provided存在,那么打包的时候该依赖不会打到jar包中-->
            <scope>provided</scope>
        </dependency>
        <!--添加spark-core依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <!--添加spark-streaming依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!--添加编译支持,都编译成java1.8版本-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!--添加Scala编译的支持-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!--添加打jar包的支持工具-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!--在你应用maven package这个阶段的时候,该插件会启动-->
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <!--在你jar包中指定启动类-->
                        <manifest>
                            <mainClass>com.SparkStreaming.WordCount</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3、编写Scala文件

在编写scala之前我们先要改一下文件名(方便)
注意红框标出的内容与pom文件中指定的启动类必须一致

接下来编写scala代码,每一行都有注释,可以仔细看一下:

package com.SparkStreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount extends App{
  //需要新建一个sparkConf变量,来提供spark的配置
  val sparkConf=new SparkConf().setAppName("StreamWordCount").setMaster("local[2]")

  //新建一个StreamingContext入口
  val ssc=new StreamingContext(sparkConf,Seconds(2))

  //从master机器上的9999端口不断的获取输入的文本数据
  val lines=ssc.socketTextStream("master",9999)

  //将每行文本通过空格分割多个单词
  val words=lines.flatMap(_.split(" "))

  //将每一个单词装换成一个元组
  val pairs=words.map((_,1))

  //根据单词来统计相同单词的数量
  val result=pairs.reduceByKey(_+_)

  //打印结果
  result.print()

  //启动你的流式处理程序
  ssc.start()

  //等待你的停止信号
  ssc.awaitTermination()

}

4、写完这个项目代码之后,打包上传到服务器

打包结束后项目会产生一个target文件,里面就有打成的jar包(选择后缀带有denpendencies)
接下来就是上传到服务器,这里我就不演示了,使用xftp相对简单。我把它上传到自己建的一个文件夹下

5、最后我们就是测试一下,因为是实时计算所以我们这里需要有一个输入的地方,在代码我们已经给出了一个监听端口号,所以我们另打开一个终端输入以下命令:
nc -lk 9999
//或者(注意这里的l不是1是小写的L)
nc -l -p 9999

开启程序之后我们要在这个窗口输入要统计的单词。
接下来我们就是开启我们的程序,这里要注意:开启的顺序不能错,先开启监听端口号,然后再启动程序

./spark-submit --class com.SparkStreaming.WordCount  /root/Pro-jar/sparkstreamingdemo-1.0-SNAPSHOT-jar-with-dependencies.jar

程序启动后的效果如下:

我们设置的是两秒一次刷新,所以每个2秒会出现一次time
最后我们在输入窗口输入测试数据:

案例到这里就结束了。
这里主要是浅显的讲解了一下Spark Streaming,后期还会扩展。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容