《Spark: The Definitive Guide 》第9章:数据源 中文学习笔记

Spark 可以从6大核心数据源中获取数据,和其他不同的社区数据源获取数据,该章节重点介绍6大核心数据源的获取和社区数据源该如何配置.
6大核心数据源:

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

一些示例社区数据源:

  • Cassandra
  • HBase
  • MongoDB
  • AWS Redshift
  • XML
  • And many, many others

9.1 数据源的API结构

读取数据源的API结构

DataFrameReader.format(...).option("key", "value").schema(...).load()

读取时,所有的format,option,schema,read mode都是可选的,但是需要给定一个读取数据的路径,这是一个示例:

spark.read.format("csv")
  .option("mode", "FAILFAST")
  .option("inferSchema", "true")
  .option("path", "path/to/file(s)")
  .schema(someSchema)
  .load()

从外部读取数据肯定会遇到错误格式的数据,当这种情况发生时Spark根据你选择的不同模式会采取不同的处理方法:

Read Mode 描述
permissive(默认) 遇到错误时将所有的字段设为null,并将错误的记录放在一个叫 _corrupt_record的string字段中。
dropMalformed 去掉所有的错误字段。
failFast 遇到错误字段立即提示失败。

写出数据的API结构

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

PartitionBy, bucketBy, sortBy 只用在基于file的数据源,你可以用它们控制指定文件的布局。
示例:

dataframe.write.format("csv")
  .option("mode", "OVERWRITE")
  .option("dateFormat", "yyyy-MM-dd")
  .option("path", "path/to/file(s)")
  .save()
Save modes 描述
append 将输出文件附加在已经存在的文件中。
overwrite 覆盖已有文件。
errorIfExists(默认) 如果指定位置有同名文件则抛出异常。
ignore 如果文件存在,则不作任何操作。。

9.2 CSV 文件

读取CSV文件

示例:

spark.read.format("csv")
  .option("header", "true")
  .option("mode", "FAILFAST")
  .option("inferSchema", "true")
  .load("some/path/to/file.csv")

option:包含schema(头文件),模式是FAILFAST ,推断Schema类型

我们可以自己确定schema以及schema格式来保证数据是我们期望的:

import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
  new StructField("DEST_COUNTRY_NAME", StringType, true),
  new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  new StructField("count", LongType, false)
))
spark.read.format("csv")
  .option("header", "true")
  .option("mode", "FAILFAST")
  .schema(myManualSchema)
  .load("/data/flight-data/csv/2010-summary.csv")
  .show(5)

写入CSV文件

写入csv是读取csv的子集,示例如下:

val csvFile = spark.read.format("csv")
  .option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
  .load("/data/flight-data/csv/2010-summary.csv")

9.3 JSON 文件

读取JSON文件

spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
  .load("/data/flight-data/json/2010-summary.json").show(5)

写入JSON文件

csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")

9.4 Parquet 文件

Parquet格式与Spark配合极好,实际上就是默认的格式。

读取Parquet文件

spark.read.format("parquet").load("/data/flight-data/parquet/2010-summary.parquet").show(5)

写入Parquet文件

csvFile.write.format("parquet").mode("overwrite").save("/tmp/my-parquet-file.parquet")

9.5 ORC 文件

ORC格式与Hadoop配合的很好。

读取ORC文件

spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)

写入ORC文件

csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")

9.6 SQL 数据库

从数据库读取数据

使用spark-shell时需要加载jdbc驱动(以postgresql驱动为例):
./bin/spark-shell
--driver-class-path postgresql-9.4.1207.jar
--jars postgresql-9.4.1207.jar

SQLite示例:

val driver =  "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"

val dbDataFrame = spark.read.format("jdbc").option("url", url)
  .option("dbtable", tablename).option("driver",  driver).load()

PostgreSQL sample:

val pgDF = spark.read
  .format("jdbc")
  .option("driver", "org.postgresql.Driver")
  .option("url", "jdbc:postgresql://database_server")
  .option("dbtable", "schema.tablename")
  .option("user", "username").option("password","my-secret-password").load()

Query Pushdown

Spark也能够指定一个查询语句而不是表名,但需要将查询语句括起来然后给一个别名:

val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
  AS flight_info"""
val dbDataFrame = spark.read.format("jdbc")
  .option("url", url).option("dbtable", pushdownQuery).option("driver",  driver)
  .load()

Spark也能和文件一样并行读取数据库,只需要手动指定数量:

val dbDataFrame = spark.read.format("jdbc")
  .option("url", url).option("dbtable", tablename).option("driver", driver)
  .option("numPartitions", 10).load()

写入数据库

写入数据库也一样简单,下面用之前从CSV建好的数据用overwrite全表覆盖模式写入:

val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)

可以通过下面语句查看是否写入:

spark.read.jdbc(newPath, tablename, props).count()

9.7 Text 文件

读取文本文件非常简单:只需将类型指定为textFile:

读取Text

spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()

写入Text

写入Text时要确保只用一String列:

csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")

9.8 高级 I/O 概念

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容