Spark 可以从6大核心数据源中获取数据,和其他不同的社区数据源获取数据,该章节重点介绍6大核心数据源的获取和社区数据源该如何配置.
6大核心数据源:
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
一些示例社区数据源:
- Cassandra
- HBase
- MongoDB
- AWS Redshift
- XML
- And many, many others
9.1 数据源的API结构
读取数据源的API结构
DataFrameReader.format(...).option("key", "value").schema(...).load()
读取时,所有的format,option,schema,read mode都是可选的,但是需要给定一个读取数据的路径,这是一个示例:
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
从外部读取数据肯定会遇到错误格式的数据,当这种情况发生时Spark根据你选择的不同模式会采取不同的处理方法:
| Read Mode | 描述 |
|---|---|
| permissive(默认) | 遇到错误时将所有的字段设为null,并将错误的记录放在一个叫 _corrupt_record的string字段中。 |
| dropMalformed | 去掉所有的错误字段。 |
| failFast | 遇到错误字段立即提示失败。 |
写出数据的API结构
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
PartitionBy, bucketBy, sortBy 只用在基于file的数据源,你可以用它们控制指定文件的布局。
示例:
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
| Save modes | 描述 |
|---|---|
| append | 将输出文件附加在已经存在的文件中。 |
| overwrite | 覆盖已有文件。 |
| errorIfExists(默认) | 如果指定位置有同名文件则抛出异常。 |
| ignore | 如果文件存在,则不作任何操作。。 |
9.2 CSV 文件
读取CSV文件
示例:
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.load("some/path/to/file.csv")
option:包含schema(头文件),模式是FAILFAST ,推断Schema类型
我们可以自己确定schema以及schema格式来保证数据是我们期望的:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
.show(5)
写入CSV文件
写入csv是读取csv的子集,示例如下:
val csvFile = spark.read.format("csv")
.option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
9.3 JSON 文件
读取JSON文件
spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
.load("/data/flight-data/json/2010-summary.json").show(5)
写入JSON文件
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
9.4 Parquet 文件
Parquet格式与Spark配合极好,实际上就是默认的格式。
读取Parquet文件
spark.read.format("parquet").load("/data/flight-data/parquet/2010-summary.parquet").show(5)
写入Parquet文件
csvFile.write.format("parquet").mode("overwrite").save("/tmp/my-parquet-file.parquet")
9.5 ORC 文件
ORC格式与Hadoop配合的很好。
读取ORC文件
spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)
写入ORC文件
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")
9.6 SQL 数据库
从数据库读取数据
使用spark-shell时需要加载jdbc驱动(以postgresql驱动为例):
./bin/spark-shell
--driver-class-path postgresql-9.4.1207.jar
--jars postgresql-9.4.1207.jar
SQLite示例:
val driver = "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
val dbDataFrame = spark.read.format("jdbc").option("url", url)
.option("dbtable", tablename).option("driver", driver).load()
PostgreSQL sample:
val pgDF = spark.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://database_server")
.option("dbtable", "schema.tablename")
.option("user", "username").option("password","my-secret-password").load()
Query Pushdown
Spark也能够指定一个查询语句而不是表名,但需要将查询语句括起来然后给一个别名:
val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
AS flight_info"""
val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.load()
Spark也能和文件一样并行读取数据库,只需要手动指定数量:
val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", tablename).option("driver", driver)
.option("numPartitions", 10).load()
写入数据库
写入数据库也一样简单,下面用之前从CSV建好的数据用overwrite全表覆盖模式写入:
val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)
可以通过下面语句查看是否写入:
spark.read.jdbc(newPath, tablename, props).count()
9.7 Text 文件
读取文本文件非常简单:只需将类型指定为textFile:
读取Text
spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()
写入Text
写入Text时要确保只用一String列:
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")