203、Spark 2.0之Structured Streaming:创建流式的dataset和dataframe

创建流式的dataset和dataframe

流式dataframe可以通过DataStreamReader接口来创建,DataStreamReader对象是通过SparkSession的readStream()方法返回的。与创建静态dataframe的read()方法类似,我们可以指定数据源的一些配置信息,比如data format、schema、option等。spark 2.0中初步提供了一些内置的source支持。

  1. file source
    以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。文件必须是被移动到目录中的,比如用mv命令。
  2. socket source
    从socket连接中读取文本内容。driver是负责监听请求的server socket。socket source只能被用来进行测试。

代码

val socketDF = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()

socketDF.isStreaming    
socketDF.printSchema 

val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
    .readStream
    .option("sep", ";")
    .schema(userSchema)      
    .csv("/path/to/directory")    

上面的例子都是产生untyped类型的dataframe,这就意味着在编译时是无法检查其schema的,只有在计算被提交并运行时才会进行检查。一些操作,比如map、flatMap等,需要在编译时就知道具体的类型。为了使用一些typed类型的操作,我们可以将dataframe转换为typed类型的dataset,比如df.as[String]。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容