Spark Streaming学习

以下内容主要基于Spark2.1.0版本的Spark Streaming内容学习得到。


还是先把Maven的依赖加入进去:

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10

Overview

Spark Streaming是core Spark API的一个扩展,对实时数据流进行scalable, high-throughput, fault-tolerant流处理。数据可以从多个来源得到,比如:Kafka,Flume,Kinesis或者TCP socket,并提供高级别的函数诸如map,reduce,join和window这样复合的算法。最终处理后的数据可以通过文件系统、数据库和实时dashboards输出。事实上还支持Spark的机器学习图形处理算法在数据流上。

接收的数据流来源以及输出的数据结果

内部是按照如下方式处理的。Spark Streaming接收实时输入数据并将数据分成多个batches,然后Spark engine会产生最终的结果batches流。

按照batch处理

Spark Streaming提供一个高级别的抽象概念:discretized streamDStream),来代表一个连续的数据流。DStream既可以从Kafka、Flume和Kinesis输入数据流中创建,也可以从其他DStream上通过高级别的操作产生。在内部DStream是由一连串的RDDs来表示。


A Quick Example

在进一步学习Spark Streaming细节之前,可以先通过一个简单的例子看下大致的处理过程。下面这个例子就是从在TCP socket上监听数据服务器的文本数据并统计word个数。

下面逐步分解Example中的各段语句的作用。

创建一个StringContext,这也是所有Streaming功能的主入口,并设置batch间隔为1s


使用这个context创建一个DStream来表示来自TCP源的数据流。其中arg[0]为输入的入参hostname,在本地运行时为localhost;arg[1]为输入的第二个入参为端口号,比如9999


使用flatMap可以创建另一个DStream出来,目的是把源DStream的每条记录生成新DStream多条记录。其中FlatMapFunction的对象用于实现Transformation
将FlatMapFunction以lambda表达式的方式实现


然后将words这个DStream通过mapToPair和reduceByKey转换为对单词的统计的DStream变量
同样改成lambda表达式的方式实现


前面的代码只是创建了计算过程,但是只有start之后才开始执行。

执行过程需要建立两个终端,其中一个运行Netcat作为数据服务器,另一个正常使用spark-submit来提交Maven package出来的jar。

这是个示意,具体运行自己的application的时候不需要用example。

基础概念

Linking

在Maven中加入Spark-Streaming的依赖就不再赘述了。不过如果在代码中使用Kafka、Flume或者Kinesis则需要额外加入他们的artifact到pom.xml的依赖中。最新的版本在Maven repository中查找。

Kinesis的库在spark中没有找到

初始化StreamingContext

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);

JavaStreamingContext ssc = new JavaStreamingContext(conf,newDuration(1000));

这个初始化类似上面example中的代码,不过在集群使用中更倾向于在spark-submit的命令行中输入具体的master:Spark、Mesos或者YARN等。也就是master = args[0]之类。需要主要的是JavaStreamingContext在内部会创建一个JavaSparkContext,可以通过ssc.sparkContext访问。

batch的时长间隔是一个必须的配置入参。具体的细节可以在下面的调优章节中看到。

还可以通过已经创建的JavaSparkContext获得:

JavaSparkContext sc=...//existing JavaSparkContext

JavaStreamingContext ssc= new JavaStreamingContext(sc,Durations.seconds(1));

在创建好StreamingContext之后接下来要做的内容与example的相仿:

1、定义一个输入源——DStream

2、定义对DStream的计算:通过transformation输出DStream

3、启动接收数据并处理:streamingContext.start()

4、等待处理过程的停止信号:streamingContext.awaitTermination()

5、代码中控制停止:streamingContext.stop()

注意事项:

1、一旦Streaming context被启动,那么就不能修改或者添加任何新的DStream的transformation。

2、一旦Streaming context被停止,将不能被重启。

3、同一时刻在一个JVM上只能有一个StreamingContext在运行。

4、StreamingContext.stop()同时也停止了SparkContext。如果不想把SparkContext停止,需要在stop方法中设置可选参数ssc.stop(false);

5、SparkContext可以多次创建StreamingContext,只要在创建新的之前老的StreamingContext已经被停止,而且没有停止SparkContext。

Discretized Streams (DStreams)

DStream是一连串的RDD,其中每个RDD都包含一个时间间隔中的数据。DStream既是输入的数据流,也是transformation后的处理过的数据流。下图是DStream的表示图示:

其实对DStream的transformation操作就是对具体RDD的操作,比如前面那个example中的情况:

Input DStreams and Receivers

每一个输入的DStream(除了file stream)都与一个Receiver对象相关联。Receiver是从源接收数据并存入内存中。如果想在Application中并行接收多个Stream,那么就需要创建多个input DStream,这也将同时创建多个Receiver来接收多个数据stream。不过需要注意的一点是需要有足够的core来处理这些数据。

注意事项:

1、当在本地运行Spark Streaming程序,不要使用"local"/"local[1]"作为master参数。因为这表示只有本地的一个线程在运行这个任务,这时这个线程会运行input DStream所基于的Receiver,就没有额外的线程资源来进行数据的处理了。所以需要设置master参数为local[n],其中n > Receiver的个数。具体的如何设置master的参数参考Spark Properties

2、同样在集群中运行Streaming任务也需要设置core的个数大于Receiver的个数。不然系统就只会接收数据而不能处理数据了。

Basic Sources

在前面的例子中,我们看到通过使用ssc.socketTextStream(...)来创建一个DStream接收从TCP socket来的数据。其实除了socket,StreamingContext API还可以将文件作为数据源来创建DStream。

File Stream

为了能从任何文件系统中读取数据使用HDFS API(HDFS、S3、NFS等),DStream的创建方式为:

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

Spark Streaming将会监视目录dataDirectory并且处理任何新在这个目录中创建的文件(不支持子目录)。需要注意的是:

1、文件必须是统一的格式

2、文件必须是mv到这个文件目录下的(move或者rename)

3、一旦mv过去文件不能修改,所以如果一个文件被连续的修改,那么新加的内容将不会读取出来。

对于简单的text文件,有一种简单的创建DStream的方法:streamingContext.textFileStream(dataDirectory). file stream不需要运行一个Receiver,所以也不需要分配core资源。

Streams based on Custom Receivers

DStream还可以创建处理用户自定义的Receiver接收到的数据流。详见Custom Receiver Guide

Queue of RDDs as a Stream

为了用测试数据测试Spark Streaming,还可以基于一个队列的RDD创建DStream,通过方法streamingContext.queueStream(queueOfRDDs)。其中每一个RDD被认为是DStream的一个batch数据。

Advanced Sources

Kafka:Spark Streaming 2.1.0 is compatible with Kafka broker versions 0.8.2.1 or higher. See the Kafka Integration Guide for more details.

Flume:Spark Streaming 2.1.0 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.

Kinesis:Spark Streaming 2.1.0 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.

Custom Sources

Input DStreams同样可以创建为用户自定义的数据源。所有需要做的事情就是实现一个自定义的Receiver用以从用户的源中接收数据并推给Spark。详见Custom Receiver Guide

Receiver的可靠性

基于可靠性有两种数据源一种是可靠的,另一种是不可靠的。可靠的是指诸如Kafka或者Flume这种有ACK反馈的,不可靠的则没有。

可靠的Receiver:会在接收到数据后给源发送ACK

不可靠的Receiver:不会给源发送ACK

Transformations on DStreams

类似于RDD,DStream也可以通过transformation对输入的DStream进行处理。下面列出了常用的一些:

DStream的Transformation

上面一些transformation将会重点介绍:

UpdateStateByKey Operation

流计算往往是7*24小时不间断的,所以需要中间保存一些状态。

注意:使用updateStateByKey必须配置checkpoint目录,否则会报错

Transform Operation

transform主要是对一些RDD支持的Transformation而DStream中没有支持的做扩展。也类似与对DStream的每个RDD进行操作。

这个代码是利用transform对join操作的实现

Window Operations

如下图所示,Spark Streaming提供了一个窗口计算,允许在滑动窗口内的数据进行Transformation。

窗长是3个时间间隔,按照2个时间间隔进行滑动

任何一个window operation都需要下面两个参数:

window length - 窗长

sliding interval - 窗滑动的时间间隔

还是以上面那个数单词的case举例,假如现在有个需求是每隔10s就统计下最近30s时间内的单词数目。那么代码将写为:

窗长和窗滑动的时间间隔都必须是batch时长的整数倍

还有其他一些使用窗的Transformation:

和windowLength以及slideInterval相关的Tranformation

Join Operations

最后需要重点说下在Spark Streaming中做多个不同类型的join是how easily

Stream-stream joins

stream之间可以非常简单的join在一起

两个window stream相join

Stream-dataset joins

下面展示下一个window stream如何和一个Dataset进行join

用到了transform

DStream的输出

output操作类似与RDD的action操作,只有output才会真正执行之前的Transformation操作。

Design Patterns for using foreachRDD

主要讲到实际运行时类的序列化和反序列化的设计,以及错误的使用会导致的一些问题。暂时不细说了。

DataFrame and SQL Operations

还可以在流数据中轻松使用DataFrame和SQL。首先必须要使用StreamingContext的SparkContext来创建一个SparkSession。下面就是一个将之前的word count的例子改成使用DataFrame和SQL的方式。每个RDD都被转成DataFrame,注册成一个临时的表并使用SQL。

下面是具体的代码示例:

代码第一段


以StreamingContext的sparkConf来创建SparkSession


代码第二段

其中

JavaRecord的定义

MLlib Operations

还可以轻松使用MLlib的机器学习算法。首先存在流机器学习算法(比如:Streaming Linear Regression或者Streaming KMeans等),这些算法可以同时学习流数据产生模型并把模型应用到流数据当中。除了这些算法,还有更多的算法可以从offline的历史数据中学习得到模型,然后将这些模型应用到流数据中。具体的可以看指导文档:MLlib

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容