class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)
基础:对象
Spark流媒体功能的主要入口。StreamingContext表示到Spark集群的连接,可用于创建各种输入源的数据流。它可以来自现有的SparkContext。在创建和转换数据流之后,可以分别使用context.start()和context.stop()启动和停止流计算。允许当前线程通过stop()或异常等待上下文的终止。
addStreamingListener(streamingListener)
添加一个[[org.apache.spark.streaming.scheduler.StreamingListener]]对象,用于接收与流相关的系统事件。
awaitTermination(timeout=None)
等待执行停止。
参数:timeout–等待时间(秒)
awaitTerminationOrTimeout(timeout)
等待执行停止。如果停止,则返回true;或者在执行期间抛出报告的错误;如果在从方法返回之前等待时间已过,则返回false。
参数:timeout–等待时间(秒)
binaryRecordsStream(directory, recordLength)
创建一个输入流,用于监视与Hadoop兼容的文件系统中的新文件,并将其作为具有固定长度记录的平面二进制文件读取。必须将文件从同一文件系统中的另一个位置“移动”到受监视的目录中。文件名开头。被忽略。
参数:目录-从中加载数据的目录
recordLength—每条记录的长度(字节)
checkpoint(directory)
将上下文设置为定期检查主容错的数据流操作。图形将在每个批处理间隔被检查。
参数:directory–与HDFS兼容的目录,其中checkpiont数据可以可靠地存储
classmethod getActive()
返回当前活动的StreamingContext(即,如果存在已启动但未停止的context)或者返回无。
classmethod getActiveOrCreate(checkpointPath, setupFunc)
返回激活的StreamingContext(即当前已启动但未停止),或者从检查点数据重新创建StreamingContext,或者使用提供的setupFunc函数创建新的StreamingContext。如果检查点路径为None或不包含有效的检查点数据,则将调用setupFunc以创建新上下文和setup dstream。
参数:checkpointPath–早期流式处理程序中使用的检查点目录。如果目的是在没有活动上下文时始终创建新上下文,则可以为“无”。
setupFunc–创建新的JavaStreamingContext和setup dstream的函数
返回活动的StreamingContext(即当前已启动但未停止),或者从检查点数据重新创建StreamingContext,或者使用提供的setupFunc函数创建新的StreamingContext。如果checkpoint路径为None或不包含有效的checkpoint数据,则将调用setupFunc以创建新上下文和建立 dstream。
参数:checkpointPath–早期流式处理程序中使用的Checkpoint目录。如果目的是在没有活动上下文时始终创建新上下文,则可以为“无”。
classmethod getOrCreate(checkpointPath, setupFunc):
从checkpoint数据重新创建StreamingContext或创建新的StreamingContext。如果提供的checkpoint路径中存在checkpoint数据,则将从checkpoint数据重新创建StreamingContext。如果数据不存在,则将使用提供的setupFunc创建新上下文。
参数:checkpointPath–早期流式处理程序中使用的checkpoint目录
setupFunc–创建新上下文和设置数据流的函数
queueStream(rdds, oneAtATime=True, default=None)
从RDD或列表队列创建输入流。在每个批处理中,它将处理队列返回的一个或所有RDD。
注意:创建流后对队列的更改将无法识别。
参数:rdds–rdds队列
oneAtATime-每次选择一个rdd或选择所有rdd一次。
default–如果没有更多的rdds,则为默认rdd
remember(duration)
在此上下文中设置每个数据流以记住它在上一个给定持续时间内生成的RDD。数据流只在有限的时间内记住RDD,并释放它们以进行垃圾收集。此方法允许开发人员指定如何长时间记住RDDs(如果开发人员希望在DStream计算之外查询旧数据)。
参数:duration–每个数据流应记住rdds的最小持续时间(秒)
socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))
从TCP源hostname:port创建输入。使用TCP socket接收数据,并将接收字节解释为UTF8编码的分隔行。
参数:hostname–要连接以接收数据的主机名
端口-用于接收数据的连接端口
Storage level—用于存储接收对象的存储级别
sparkContext
返回与此StreamingContext关联的SparkContext。
start()
开始执行流。
stop(stopSparkContext=True, stopGraceFully=False)
停止流的执行,并选择确保已处理所有接收到的数据。
参数:stopSparkContext–是否停止关联的SparkContext
stopGracefully–通过等待所有接收数据的处理完成而优雅地停止
textFileStream(directory)
创建一个输入流,用于监视与Hadoop兼容的文件系统中的新文件,并将其作为文本文件读取。必须将文件从同一文件系统中的另一个位置“移动”到受监视的目录中。文件名开头。被忽略。
transform(dstreams, transformFunc)
创建一个新的数据流,在其中通过对数据流的RDD应用函数生成每个RDD。转换函数参数中javardds的顺序将与列表中相应数据流的顺序相同。
union(*dstreams)
从相同类型和相同滑动持续时间的多个数据流创建统一的数据流。