一.Spark streaming介绍
1.1 Spark streaming简介
Spark Streaming是Spark API的核心扩展,支持实时数据流的可扩展、高吞吐量和容错流处理。数据可以从Kafka、Kinesis或TCP套接字等多种来源中获取,并且可以使用复杂的算法进行处理,这些算法用高级函数表示,如map、reduce、join和window。最后,处理过的数据可以推送到文件系统、数据库和实时仪表板。事实上,您可以在数据流上应用Spark的机器学习和图形处理算法。
在内部,它的工作方式如下。Spark Streaming接收实时输入的数据流,并对数据进行分批处理,由Spark引擎进行处理,生成最终的批量结果流。
Spark Streaming提供了一种高级抽象,称为离散流或DStream,它表示连续的数据流。Dstream可以通过来自Kafka和Kinesis等源的输入数据流创建,也可以通过在其他Dstream上应用高级操作来创建。在内部,DStream表示为rdd序列。
1.2 Spark 与storm区别
Storm
- 流式计算框架
- 以record为单位处理数据
- 也支持micro-batch方式(Trident)
Spark
- 批处理计算框架
- 以RDD为单位处理数据
- 也支持micro-batch流式处理数据(Spark Streaming)
两者异同
- 吞吐量: Spark Streaming 优于Storm
- 延迟: Spark Streaming差于Storm
1.3 一个简单的例子
在我们深入了解如何编写自己的Spark Streaming程序之前,让我们快速了解一下简单的Spark Streaming程序是什么样的。
首先,我们导入StreamingContext,它是所有流功能的主要入口点。我们创建一个具有两个执行线程的本地StreamingContext,批处理间隔为1秒。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
为了初始化一个Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。
appName参数是应用程序在集群UI上显示的名称。master是Spark、Mesos或YARN集群的URL,或者是一个特殊的“local[*]”字符串,在本地模式下运行。实际上,当在集群上运行时,您不希望在程序中硬编码master,而是使用spark-submit启动应用程序并在那里接收它。但是,对于本地测试和单元测试,可以通过“local[*]”来运行Spark Streaming in-process(检测本地系统中的核数)。
在定义了上下文之后,必须执行以下操作:
- 通过创建输入DStreams来定义输入源。
- 通过对DStreams应用转换和输出操作来定义流计算。
- 开始接收数据并使用streamingContext.start()处理它。
- 使用streamingContext.awaitTermination()等待处理停止(手动或由于任何错误)。
- 可以使用streamingContext.stop()手动停止处理。
注意:
- 一旦启动了Context,就不能再设置或向其添加新的流计算。
- 一旦停止了Context,就不能重新启动它。
- 同一时间,JVM中只能有一个StreamingContext是活动的。
- StreamingContext上的stop()也会停止SparkContext。要只停止StreamingContext,请将stop()的可选参数stopSparkContext设置为false。
- 一个SparkContext可以被重用来创建多个StreamingContext,只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext)。
二.Spark Streaming的组件介绍
Spark Streaming的核心组件有2个:
- Streaming Context
- Dstream(离散流)
2.1 Streaming Context
Streaming Context是Spark Streaming程序的起点,生成Streaming Context之前需要生成SparkContext,SparkContext可以理解为申请Spark集群的计算资源,Streaming Context可以理解为申请Spark Streaming的计算资源
2.2 Dstream(离散流)
Dstream是Spark Streaming的数据抽象,同DataFrame,其实底层依旧是RDD。
Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示一个连续的数据流,要么是从源接收的输入数据流,要么是通过转换输入流生成的处理数据流。在内部,DStream由一系列连续的rdd表示,这是Spark对不可变的分布式数据集的抽象。DStream中的每个RDD都包含一定时间间隔的数据,如下图所示:
在DStream上应用的任何操作都转换为在底层rdd上的操作。
这些底层RDD转换是由Spark引擎计算的。DStream操作隐藏了大部分细节,并为开发人员提供了更高级的API。
DStream存在如下概念:
- Receiver
- 数据源: 基本源、高级源
- 可靠性
- Dstream的操作
- 缓存
- Checkpoint
2.1 Receiver
每个输入DStream(文件流除外)都与一个Receiver (Scala doc, Java doc)对象相关联,接收来自源的数据并将其存储在Spark的内存中进行处理。
2.2 数据源
Spark Streaming提供了两类内置流源:
- 基本源:在StreamingContext API中直接可用的源。例如文件系统和套接字连接。
- 高级资源:像Kafka, Kinesis等资源可以通过额外的实用程序类获得。这些需要根据链接部分中讨论的额外依赖项进行链接。
注意,如果希望在流应用程序中并行接收多个数据流,可以创建多个输入Dstream。这将创建多个接收器,这些接收器将同时接收多个数据流。但是请注意,Spark worker/executor是一个长期运行的任务,因此它占用分配给Spark Streaming应用程序的一个核心。因此,Spark Streaming应用程序需要分配足够的内核(或者线程,如果在本地运行的话)来处理接收到的数据,以及运行接收端,记住这一点很重要。
记住
在本地运行Spark Streaming程序时,不要使用“local”或“local[1]”作为主URL。这两种情况都意味着只有一个线程用于本地运行任务。如果你使用一个基于接收器的输入DStream(例如,socket, Kafka等),那么单线程将被用来运行Receiver ,不留下任何线程来处理接收的数据。因此,当本地运行时,总是使用“local[n]”作为主URL,其中要运行n个>数量的Receiver 。
将逻辑扩展到集群上,分配给Spark Streaming应用的内核数必须大于接收端数。否则系统将接收到数据,但无法进行处理。
2.3 可靠性
根据数据源的可靠性,可以有两种数据源。源(如Kafka)允许传输的数据被确认。如果从这些可靠来源接收数据的系统正确地确认了接收的数据,就可以确保不会由于任何类型的故障而丢失数据。这就产生了两种接收者:
1). 可靠的接收端—当数据被接收到并存储在Spark中并进行复制时,一个可靠的接收端会正确地向一个可靠的源发送确认。
2), 不可靠的接收者——不可靠的接收者不向源发送确认。这可以用于不支持确认的来源,甚至当一个人不想或需要进入确认的复杂性时,用于可靠的来源。
对于不可靠的接收者,Spark streaming有自己的可靠机制,来保证数据的可靠性。
2.4 Dstream的操作
与rdd类似,转换允许修改来自输入DStream的数据。DStreams支持许多普通Spark RDD上可用的转换。下面是一些常见的.
Transformations on DStreams
Output Operations on DStreams:
2.5 缓存
与rdd类似,DStreams也允许开发人员在内存中持久化流数据。也就是说,在DStream上使用persist()方法将自动在内存中持久化该DStream的每个RDD。如果DStream中的数据将被计算多次(例如,对同一数据的多次操作),这是有用的。对于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow,以及基于状态的操作,如updateStateByKey,这是隐式true。因此,由基于窗口的操作生成的DStreams会自动持久化到内存中,而不需要开发人员调用persist()。
对于通过网络接收数据的输入流(例如,Kafka, socket等),默认的持久性级别被设置为将数据复制到两个节点以实现容错。
注意,与rdd不同,DStreams的默认持久性级别将数据序列化保存在内存中。
2.6 Checkpoint
流应用程序必须全天候运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障、JVM崩溃等)具有弹性。为了使这成为可能,Spark Streaming需要对容错存储系统进行足够的信息检查点,以便从故障中恢复。有两种类型的数据是检查点的:
元数据检查点——将定义流计算的信息保存到像HDFS这样的容错存储中。这用于从运行流应用程序驱动程序的节点的故障中恢复(稍后将详细讨论)。元数据包括:
1.1) 配置—用于创建流应用程序的配置。
1.2) DStream操作-定义流应用程序的DStream操作集。
1.3) 未完成批-作业已排队但尚未完成的批。数据检查点——将生成的rdd保存到可靠的存储中。在一些跨多个批组合数据的有状态转换中,这是必要的。在这种转换中,生成的rdd依赖于以前批次的rdd,这导致依赖链的长度随着时间不断增加。为了避免恢复时间的无限增长(与依赖链成正比),有状态转换的中间rdd会定期被检查到可靠的存储(例如HDFS),以切断依赖链。
总之,元数据检查点主要用于从驱动程序失败中恢复,而数据或RDD检查点即使是用于基本功能(如果使用有状态转换)也是必要的。
三.一个简单的测试用例
3.1 linux服务器安装nc服务
yum -y install netcat.x86_64 -- centos7 正确
yum -y install nc.x86_64 -- centos7 错误
nc -lk 9999
[root@hp2 yum.repos.d]# nc -help
usage: nc [-46cDdFhklNnrStUuvz] [-C certfile] [-e name] [-H hash] [-I length]
[-i interval] [-K keyfile] [-M ttl] [-m minttl] [-O length]
[-o staplefile] [-P proxy_username] [-p source_port] [-R CAfile]
[-s sourceaddr] [-T keyword] [-V rtable] [-W recvlimit] [-w timeout]
[-X proxy_protocol] [-x proxy_address[:port]] [-Z peercertfile]
[destination] [port]
Command Summary:
-4 Use IPv4
-6 Use IPv6
-C certfile Public key file
-c Use TLS
-D Enable the debug socket option
-d Detach from stdin
-e name Required name in peer certificate
-F Pass socket fd
-H hash Hash string of peer certificate
-h This help text
-I length TCP receive buffer length
-i interval Delay interval for lines sent, ports scanned
-K keyfile Private key file
-k Keep inbound sockets open for multiple connects
-l Listen mode, for inbound connects
-M ttl Outgoing TTL / Hop Limit
-m minttl Minimum incoming TTL / Hop Limit
-N Shutdown the network socket after EOF on stdin
-n Suppress name/port resolutions
-O length TCP send buffer length
-o staplefile Staple file
-P proxyuser Username for proxy authentication
-p port Specify local port for remote connects
-R CAfile CA bundle
-r Randomize remote ports
-S Enable the TCP MD5 signature option
-s sourceaddr Local source address
-T keyword TOS value or TLS options
-t Answer TELNET negotiation
-U Use UNIX domain socket
-u UDP mode
-V rtable Specify alternate routing table
-v Verbose
-W recvlimit Terminate after receiving a number of packets
-w timeout Timeout for connects and final net reads
-X proto Proxy protocol: "4", "5" (SOCKS) or "connect"
-x addr[:port] Specify proxy address and port
-Z Peer certificate file
-z Zero-I/O mode [used for scanning]
Port numbers can be individual or ranges: lo-hi [inclusive]
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]# nc -lk 9999
3.2 pyspark代码
代码:
#!/usr/bin/env python
# encoding: utf-8
"""
@author: 'Administrator'
@contact:
@time:
"""
#!/usr/bin/python
# encoding: utf-8
#
# Streaming Word Count Example
# Original Source: https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html
#
# To run this example:
# yum install nc.x86_64
# Terminal 1: nc -lk 9999
# Terminal 2: ./bin/spark-submit streaming_word_count.py
# Note, type words into Terminal 1
#
# Import the necessary classes and create a local SparkContext and Streaming Contexts
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__=='__main__':
# Create Spark Context with two working threads (note, `local[2]`)
sc = SparkContext("local[2]", "NetworkWordCount")
# Create local StreamingContextwith batch interval of 1 second
ssc = StreamingContext(sc, 1)
# Create DStream that will connect to the stream of input lines from connection to localhost:9999
# lines is DStream representing the data stream extracted via the ssc.socketTextStream.
lines = ssc.socketTextStream("localhost", 9999)
# Split lines into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
测试记录:
滚动太快,只能从日志中找到记录
参考:
1.http://spark.apache.org/docs/latest/streaming-programming-guide.html