pyspark系列7-Spark streaming介绍

一.Spark streaming介绍

1.1 Spark streaming简介

Spark Streaming是Spark API的核心扩展,支持实时数据流的可扩展、高吞吐量和容错流处理。数据可以从Kafka、Kinesis或TCP套接字等多种来源中获取,并且可以使用复杂的算法进行处理,这些算法用高级函数表示,如map、reduce、join和window。最后,处理过的数据可以推送到文件系统、数据库和实时仪表板。事实上,您可以在数据流上应用Spark的机器学习和图形处理算法。


image.png

在内部,它的工作方式如下。Spark Streaming接收实时输入的数据流,并对数据进行分批处理,由Spark引擎进行处理,生成最终的批量结果流。


image.png

Spark Streaming提供了一种高级抽象,称为离散流或DStream,它表示连续的数据流。Dstream可以通过来自Kafka和Kinesis等源的输入数据流创建,也可以通过在其他Dstream上应用高级操作来创建。在内部,DStream表示为rdd序列。

1.2 Spark 与storm区别

Storm

  1. 流式计算框架
  2. 以record为单位处理数据
  3. 也支持micro-batch方式(Trident)

Spark

  1. 批处理计算框架
  2. 以RDD为单位处理数据
  3. 也支持micro-batch流式处理数据(Spark Streaming)

两者异同

  1. 吞吐量: Spark Streaming 优于Storm
  2. 延迟: Spark Streaming差于Storm

1.3 一个简单的例子

在我们深入了解如何编写自己的Spark Streaming程序之前,让我们快速了解一下简单的Spark Streaming程序是什么样的。

首先,我们导入StreamingContext,它是所有流功能的主要入口点。我们创建一个具有两个执行线程的本地StreamingContext,批处理间隔为1秒。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

为了初始化一个Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。

appName参数是应用程序在集群UI上显示的名称。master是Spark、Mesos或YARN集群的URL,或者是一个特殊的“local[*]”字符串,在本地模式下运行。实际上,当在集群上运行时,您不希望在程序中硬编码master,而是使用spark-submit启动应用程序并在那里接收它。但是,对于本地测试和单元测试,可以通过“local[*]”来运行Spark Streaming in-process(检测本地系统中的核数)。

在定义了上下文之后,必须执行以下操作:

  1. 通过创建输入DStreams来定义输入源。
  2. 通过对DStreams应用转换和输出操作来定义流计算。
  3. 开始接收数据并使用streamingContext.start()处理它。
  4. 使用streamingContext.awaitTermination()等待处理停止(手动或由于任何错误)。
  5. 可以使用streamingContext.stop()手动停止处理。

注意:

  1. 一旦启动了Context,就不能再设置或向其添加新的流计算。
  2. 一旦停止了Context,就不能重新启动它。
  3. 同一时间,JVM中只能有一个StreamingContext是活动的。
  4. StreamingContext上的stop()也会停止SparkContext。要只停止StreamingContext,请将stop()的可选参数stopSparkContext设置为false。
  5. 一个SparkContext可以被重用来创建多个StreamingContext,只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext)。

二.Spark Streaming的组件介绍

Spark Streaming的核心组件有2个:

  1. Streaming Context
  2. Dstream(离散流)

2.1 Streaming Context

Streaming Context是Spark Streaming程序的起点,生成Streaming Context之前需要生成SparkContext,SparkContext可以理解为申请Spark集群的计算资源,Streaming Context可以理解为申请Spark Streaming的计算资源

2.2 Dstream(离散流)

Dstream是Spark Streaming的数据抽象,同DataFrame,其实底层依旧是RDD。

Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示一个连续的数据流,要么是从源接收的输入数据流,要么是通过转换输入流生成的处理数据流。在内部,DStream由一系列连续的rdd表示,这是Spark对不可变的分布式数据集的抽象。DStream中的每个RDD都包含一定时间间隔的数据,如下图所示:


image.png

在DStream上应用的任何操作都转换为在底层rdd上的操作。


image.png

这些底层RDD转换是由Spark引擎计算的。DStream操作隐藏了大部分细节,并为开发人员提供了更高级的API。

DStream存在如下概念:

  1. Receiver
  2. 数据源: 基本源、高级源
  3. 可靠性
  4. Dstream的操作
  5. 缓存
  6. Checkpoint
image.png

2.1 Receiver

每个输入DStream(文件流除外)都与一个Receiver (Scala doc, Java doc)对象相关联,接收来自源的数据并将其存储在Spark的内存中进行处理。

2.2 数据源

Spark Streaming提供了两类内置流源:

  1. 基本源:在StreamingContext API中直接可用的源。例如文件系统和套接字连接。
  2. 高级资源:像Kafka, Kinesis等资源可以通过额外的实用程序类获得。这些需要根据链接部分中讨论的额外依赖项进行链接。

注意,如果希望在流应用程序中并行接收多个数据流,可以创建多个输入Dstream。这将创建多个接收器,这些接收器将同时接收多个数据流。但是请注意,Spark worker/executor是一个长期运行的任务,因此它占用分配给Spark Streaming应用程序的一个核心。因此,Spark Streaming应用程序需要分配足够的内核(或者线程,如果在本地运行的话)来处理接收到的数据,以及运行接收端,记住这一点很重要。

记住
在本地运行Spark Streaming程序时,不要使用“local”或“local[1]”作为主URL。这两种情况都意味着只有一个线程用于本地运行任务。如果你使用一个基于接收器的输入DStream(例如,socket, Kafka等),那么单线程将被用来运行Receiver ,不留下任何线程来处理接收的数据。因此,当本地运行时,总是使用“local[n]”作为主URL,其中要运行n个>数量的Receiver 。

将逻辑扩展到集群上,分配给Spark Streaming应用的内核数必须大于接收端数。否则系统将接收到数据,但无法进行处理。

2.3 可靠性

根据数据源的可靠性,可以有两种数据源。源(如Kafka)允许传输的数据被确认。如果从这些可靠来源接收数据的系统正确地确认了接收的数据,就可以确保不会由于任何类型的故障而丢失数据。这就产生了两种接收者:
1). 可靠的接收端—当数据被接收到并存储在Spark中并进行复制时,一个可靠的接收端会正确地向一个可靠的源发送确认。
2), 不可靠的接收者——不可靠的接收者不向源发送确认。这可以用于不支持确认的来源,甚至当一个人不想或需要进入确认的复杂性时,用于可靠的来源。

对于不可靠的接收者,Spark streaming有自己的可靠机制,来保证数据的可靠性。

2.4 Dstream的操作

与rdd类似,转换允许修改来自输入DStream的数据。DStreams支持许多普通Spark RDD上可用的转换。下面是一些常见的.

Transformations on DStreams

image.png

Output Operations on DStreams:

image.png

2.5 缓存

与rdd类似,DStreams也允许开发人员在内存中持久化流数据。也就是说,在DStream上使用persist()方法将自动在内存中持久化该DStream的每个RDD。如果DStream中的数据将被计算多次(例如,对同一数据的多次操作),这是有用的。对于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow,以及基于状态的操作,如updateStateByKey,这是隐式true。因此,由基于窗口的操作生成的DStreams会自动持久化到内存中,而不需要开发人员调用persist()。

对于通过网络接收数据的输入流(例如,Kafka, socket等),默认的持久性级别被设置为将数据复制到两个节点以实现容错。

注意,与rdd不同,DStreams的默认持久性级别将数据序列化保存在内存中。

2.6 Checkpoint

流应用程序必须全天候运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障、JVM崩溃等)具有弹性。为了使这成为可能,Spark Streaming需要对容错存储系统进行足够的信息检查点,以便从故障中恢复。有两种类型的数据是检查点的:

  1. 元数据检查点——将定义流计算的信息保存到像HDFS这样的容错存储中。这用于从运行流应用程序驱动程序的节点的故障中恢复(稍后将详细讨论)。元数据包括:
    1.1) 配置—用于创建流应用程序的配置。
    1.2) DStream操作-定义流应用程序的DStream操作集。
    1.3) 未完成批-作业已排队但尚未完成的批。

  2. 数据检查点——将生成的rdd保存到可靠的存储中。在一些跨多个批组合数据的有状态转换中,这是必要的。在这种转换中,生成的rdd依赖于以前批次的rdd,这导致依赖链的长度随着时间不断增加。为了避免恢复时间的无限增长(与依赖链成正比),有状态转换的中间rdd会定期被检查到可靠的存储(例如HDFS),以切断依赖链。

总之,元数据检查点主要用于从驱动程序失败中恢复,而数据或RDD检查点即使是用于基本功能(如果使用有状态转换)也是必要的。

三.一个简单的测试用例

3.1 linux服务器安装nc服务

yum -y install netcat.x86_64    -- centos7 正确
yum -y install nc.x86_64          -- centos7 错误

nc -lk 9999
[root@hp2 yum.repos.d]# nc -help
usage: nc [-46cDdFhklNnrStUuvz] [-C certfile] [-e name] [-H hash] [-I length]
          [-i interval] [-K keyfile] [-M ttl] [-m minttl] [-O length]
          [-o staplefile] [-P proxy_username] [-p source_port] [-R CAfile]
          [-s sourceaddr] [-T keyword] [-V rtable] [-W recvlimit] [-w timeout]
          [-X proxy_protocol] [-x proxy_address[:port]] [-Z peercertfile]
          [destination] [port]
        Command Summary:
                -4              Use IPv4
                -6              Use IPv6
                -C certfile     Public key file
                -c              Use TLS
                -D              Enable the debug socket option
                -d              Detach from stdin
                -e name         Required name in peer certificate
                -F              Pass socket fd
                -H hash         Hash string of peer certificate
                -h              This help text
                -I length       TCP receive buffer length
                -i interval     Delay interval for lines sent, ports scanned
                -K keyfile      Private key file
                -k              Keep inbound sockets open for multiple connects
                -l              Listen mode, for inbound connects
                -M ttl          Outgoing TTL / Hop Limit
                -m minttl       Minimum incoming TTL / Hop Limit
                -N              Shutdown the network socket after EOF on stdin
                -n              Suppress name/port resolutions
                -O length       TCP send buffer length
                -o staplefile   Staple file
                -P proxyuser    Username for proxy authentication
                -p port         Specify local port for remote connects
                -R CAfile       CA bundle
                -r              Randomize remote ports
                -S              Enable the TCP MD5 signature option
                -s sourceaddr   Local source address
                -T keyword      TOS value or TLS options
                -t              Answer TELNET negotiation
                -U              Use UNIX domain socket
                -u              UDP mode
                -V rtable       Specify alternate routing table
                -v              Verbose
                -W recvlimit    Terminate after receiving a number of packets
                -w timeout      Timeout for connects and final net reads
                -X proto        Proxy protocol: "4", "5" (SOCKS) or "connect"
                -x addr[:port]  Specify proxy address and port
                -Z              Peer certificate file
                -z              Zero-I/O mode [used for scanning]
        Port numbers can be individual or ranges: lo-hi [inclusive]
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# nc -lk 9999

3.2 pyspark代码

代码:

#!/usr/bin/env python
# encoding: utf-8


"""
@author:  'Administrator'
@contact:
@time:
"""

#!/usr/bin/python
# encoding: utf-8

#
# Streaming Word Count Example
#    Original Source: https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html
#
# To run this example:
# yum install nc.x86_64
#   Terminal 1:  nc -lk 9999
#   Terminal 2:  ./bin/spark-submit streaming_word_count.py
#   Note, type words into Terminal 1
#

# Import the necessary classes and create a local SparkContext and Streaming Contexts
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__=='__main__':
    # Create Spark Context with two working threads (note, `local[2]`)
    sc = SparkContext("local[2]", "NetworkWordCount")

    # Create local StreamingContextwith batch interval of 1 second
    ssc = StreamingContext(sc, 1)

    # Create DStream that will connect to the stream of input lines from connection to localhost:9999
    # lines is DStream representing the data stream extracted via the ssc.socketTextStream.

    lines = ssc.socketTextStream("localhost", 9999)

    # Split lines into words
    words = lines.flatMap(lambda line: line.split(" "))

    # Count each word in each batch
    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    # Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.pprint()

    # Start the computation
    ssc.start()

    # Wait for the computation to terminate
    ssc.awaitTermination()

测试记录:

image.png

滚动太快,只能从日志中找到记录


image.png

参考:

1.http://spark.apache.org/docs/latest/streaming-programming-guide.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容