CDH Spark Streaming

翻译: https://www.cloudera.com/documentation/enterprise/latest/topics/spark_streaming.html
版本: 5.14.2

Spark Streaming是核心Spark的扩展,可实现数据流的可扩展,高吞吐量,容错处理。Spark Streaming接收输入数据流并将数据分成批次,批次被称为DStreams。DStream可以从Kafka,Flume和Kinesis等来源创建,也可以通过对其他DStream进行操作来创建。每个输入DStream都与一个 Receiver 关联,它接收来自源的数据并将其存储在执行器内存中。

有关Spark Streaming的详细信息,请参阅Spark Streaming Programming Guide

继续阅读:

Spark Streaming和动态分配

从CDH 5.5开始,默认开启动态分配,这意味着执行程序在闲置时被移除。但是,动态分配在Spark Streaming中无效。在Spark Streaming中,每批都有数据进来,执行者只要有数据就可以运行。如果执行程序空闲超时小于批处理持续时间,则执行程序会不断被添加和删除。但是,如果执行程序空闲超时大于批处理持续时间,则不会删除执行程序。因此,Cloudera建议您通过设置禁用动态分配 spark.dynamicAllocation.enabled 。

Spark Streaming示例

本示例使用Kafka将单词流传送到Python字数统计程序。

  1. 安装Kafka并创建一个Kafka服务。
  2. 创建一个Kafka topic wordcounttopic 并传入您的ZooKeeper服务器:
$ kafka-topics --create --zookeeper zookeeper_server:2181 --topic wordcounttopic \
--partitions 1 --replication-factor 1
  1. 根据Spark Streaming示例kafka_wordcount.py创建一个Kafka字数Python程序。该版本将输入流划分为10秒的批次并对每批中的单词进行计数:
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
  1. 使用提交应用程序 spark-submit , 禁用动态分配并传入ZooKeeper服务器和主题 wordcounttopic.。要在本地运行,您必须至少指定两个工作线程:一个接收和一个处理数据。
$ spark-submit --master local[2] --conf "spark.dynamicAllocation.enabled=false" \
--jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py \
zookeeper_server:2181 wordcounttopic

在CDH部署中,使用包安装时,SPARK_HOME 默认为/usr/lib/spark ; parcel安装时默认为 /opt/cloudera/parcels/CDH/lib/spark ; Cloudera Manager 部署时,可用脚本在 /usr/bin 中。

或者,您可以按如下方式在YARN上运行:

$ spark-submit --master yarn --deploy-mode client --conf "spark.dynamicAllocation.enabled=false" \
--jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py \
zookeeper_server:2181 wordcounttopic
  1. 在另一个窗口中,创建一个Kafka生产者,发布给 wordcounttopic:
$ kafka-console-producer --broker-list kafka_broker:9092 --topic wordcounttopic
  1. 在生产者窗口中,键入以下内容:
hello
hello
hello
hello
hello
hello
gb
gb
gb
gb
gb
gb

根据您键入的速度,在Spark Streaming应用程序窗口中,您将看到如下输出:

-------------------------------------------
Time: 2016-01-06 14:18:00
-------------------------------------------
(u'hello', 6)
(u'gb', 2)

-------------------------------------------
Time: 2016-01-06 14:18:10
-------------------------------------------
(u'gb', 4)

在Spark Streaming中启用容错处理

如果Spark Streaming应用程序的驱动程序主机失败,则可能会丢失已收到但尚未处理的数据。为确保没有数据丢失,可以使用Spark Streaming recovery 。在发生故障时,Spark将收到数据写入HDFS,并使用此数据恢复状态。

要启用Spark Streaming恢复:

  1. 在 SparkConf对象中,设置 spark.streaming.receiver.writeAheadLog.enable 参数true。
  2. 使用这个 SparkConf 创建一个StreamingContext 实例,并指定一个检查点目录。
  3. 在StreamingContext中 使用 getOrCreate 方法 , 要么创建新的上下文,要么从检查点目录的旧上下文中恢复:
from __future__ import print_function

import sys

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

checkpoint = "hdfs://ns1/user/systest/checkpoint"

# Function to create and setup a new StreamingContext
def functionToCreateContext():

  sparkConf = SparkConf()
  sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
  sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf)
  ssc = StreamingContext(sc, 10)

  zkQuorum, topic = sys.argv[1:]
  kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  lines = kvs.map(lambda x: x[1])
  counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
  counts.pprint()

  ssc.checkpoint(checkpoint)   # set checkpoint directory
  return ssc

if __name__ == "__main__":
  if len(sys.argv) != 3:
    print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
    exit(-1)

  ssc = StreamingContext.getOrCreate(checkpoint, lambda: functionToCreateContext())
  ssc.start()
  ssc.awaitTermination()

有关更多信息,请参阅检查点

为了防止接收器发生故障时的数据丢失,接收器必须能够根据需要重放来自原始数据源的数据。

  • spark.streaming.receiver.writeAheadLog.enable参数为true时,kafka接收器自动重播。
  • 无接收器的Direct Kafka DStream 不需要spark.streaming.receiver.writeAheadLog.enable 参数并且可以在没有数据丢失的情况下运行,即使没有流式恢复。
  • 包装在Spark中的Flume接收器在接收器故障时自动重放数据。

有关更多信息,请参阅Spark Streaming + Kafka集成指南Spark Streaming + Flume集成指南

为长时间运行的Spark Streaming作业配置身份验证

如果您使用的是认证的Spark通信,则必须为长时间运行的Spark Streaming作业执行其他配置步骤。请参阅在YARN上为长时间运行的应用程序配置Spark

云中Spark流的最佳实践

将Spark Streaming 与云服务一起用作基础存储层时,请使用集群上的临时HDFS来存储检查点,而不是Amazon S3或Microsoft ADLS等云存储。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容