第5章 数据读取与保存
本章对于工程师和数据科学家都较为实用。工程师会了解到更多的输出格式,有利于找到非常适合用于下游处理程序的格式。数据科学家则可能更关心数据的现有的组织形式。
5.1 动机
我们已经学了很多在 Spark 中对已分发的数据执行的操作。到目前为止,所展示的示例都是从本地集合或者普通文件中进行数据读取和保存的。但有时候,数据量可能大到无法放在一台机器中,这时就需要探索别的数据读取和保存的方法了。
Spark 支持很多种输入输出源。一部分原因是 Spark 本身是基于 Hadoop 生态圈而构建,特别是 Spark 可以通过 Hadoop MapReduce 所使用的 InputFormat 和OutputFormat 接口访问数据,而大部分常见的文件格式与存储系统(例如 S3、HDFS、Cassandra、HBase 等)都支持这种接口。
不过,基于这些原始接口构建出的高层 API 会更常用。幸运的是,Spark 及其生态系统提供了很多可选方案。本章会介绍以下三类常见的数据源。
文件格式与文件系统
对于存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的数据,Spark 可以访问很多种不同的文件格式,包括文本文件、JSON、SequenceFile,以及 protocol buffer。Spark SQL 中的结构化数据源
第 9 章会介绍 Spark SQL 模块,它针对包括 JSON 和 Apache Hive 在内的结构化数据源,为我们提供了一套更加简洁高效的 API。数据库与键值存储
本章还会概述 Spark 自带的库和一些第三方库,它们可以用来连接 Cassandra、HBase、Elasticsearch 以及 JDBC 源。
5.2 文件格式
Spark 对很多种文件格式的读取和保存方式都很简单。从诸如文本文件的非结构化的文件,到诸如 JSON 格式的半结构化的文件,再到诸如 SequenceFile 这样的结构化的文件,Spark 都可以支持。Spark 会根据文件扩展名选择对应的处理方式。这一过程是封装好的,对用户透明。
除了 Spark 中直接支持的输出机制,还可以对键数据(或成对数据)使用 Hadoop 的新旧文件 API。由于 Hadoop 接口要求使用键值对数据,所以也只能这样用,即使有些格式事实上忽略了键。对于那些会忽视键的格式,通常使用假的键(比如 null)。
5.2.1 文本文件
在 Spark 中读写文本文件很容易。当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为 RDD 的一个元素。也可以将多个完整的文本文件一次性读取为一个 pair RDD,其中键是文件名,值是文件内容。
读取文本文件
只需要使用文件路径作为参数调用 SparkContext 中的 textFile() 函数,就可以读取一个文本文件。如果要控制分区数的话,可以指定 minPartitions。
# 例5-1:在 Python 中读取一个文本文件
input = sc.textFile("file:///home/holden/repos/spark/README.md")
# 例5-2:在 Scala 中读取一个文本文件
val input = sc.textFile("file:///home/holden/repos/spark/README.md")
# 例5-3:在 Java 中读取一个文本文件
JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")
Spark 支持读取给定目录中的所有文件,以及在输入路径中使用通配字符(如part-*.txt)。大规模数据集通常存放在多个文件中,因此这一特性很有用,尤其是在同一目录中存在一些别的文件(比如成功标记文件)的时候。
如果多个输入文件以一个包含数据所有部分的目录的形式出现,可以用两种方式来处理。
如果文件足够小,那么可以使用 textFile 函数,传递目录作为参数,这样它会把各部分都读取到RDD中。有时候有必要知道数据的各部分分别来自哪个文件(比如将键放在文件名中的时间数据),有时候则希望同时处理整个文件。
wholeTextFiles() 方法,该方法会返回一个 pair RDD,其中键是输入文件的文件名,在每个文件表示一个特定时间段内的数据时非常有用。如果有表示不同阶段销售数据的文件,则可以很容易地求出每个阶段的平均值。
// 例5-4:在 Scala 中求每个文件的平均值
val input = sc.wholeTextFiles("file://home/holden/salesFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
}
保存文本文件
saveAsTextFile() 方法接收一个路径,并将 RDD 中的内容都输入到路径对应的文件中。Spark 将传入的路径作为目录对待,会在那个目录下输出多个文件。这样,Spark 就可以从多个节点上并行输出了。在这个方法中,我们不能控制数据的哪一部分输出到哪个文件中,不过有些输出格式支持控制。
# 例5-5:在Python 中将数据保存为文本文件
result.saveAsTextFile(outputFile)
5.2.2 JSON
JSON 是一种使用较广的半结构化数据格式。读取 JSON 数据的最简单的方式是将数据作为文本文件读取,然后使用 JSON 解析器来对 RDD 中的值进行映射操作。类似地,也可以使用我们喜欢的 JSON 序列化库来将数据转为字符串,然后将其写出去。
读取 JSON
将数据作为文本文件读取,然后对 JSON 数据进行解析,这样的方法可以在所有支持的编程语言中使用。这种方法假设文件中的每一行都是一条 JSON 记录。如果你有跨行的 JSON 数据,你就只能读入整个文件,然后对每个文件进行解析。如果在你使用的语言中构建一个 JSON 解析器的开销较大,你可以使用 mapPartitions() 来重用解析器。
处理格式不正确的记录有可能会引起很严重的问题,尤其对于像 JSON 这样的半结构化数据来说。对于小数据集来说,可以接受在遇到错误的输入时停止程序(程序失败),但是对于大规模数据集来说,格式错误是家常便饭。如果选择跳过格式不正确的数据,你应该尝试使用累加器来跟踪错误的个数。
# 例5-6:在Python 中读取非结构化的 JSON
import json
data = input.map(lambda x: json.loads(x))
// 在 Scala 和Java 中,通常将记录读入到一个代表结构信息的类中,可能还需要略过一些无效的记录
// 将记录读取为 Person 类
// 例5-7:在Scala 中读取 JSON
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) // 必须是顶级类
...
// 将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None)
// 来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result = input.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}
})
// 例5-8:在 Java 中读取 JSON
class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
public Iterable<Person> call(Iterator<String> lines) throws Exception {
ArrayList<Person> people = new ArrayList<Person>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
people.add(mapper.readValue(line, Person.class));
} catch (Exception e) {
// 跳过失败的数据
}
}
return people;
}
}
JavaRDD<String> input = sc.textFile("file.json");
JavaRDD<Person> result = input.mapPartitions(new ParseJson());
保存JSON
写出 JSON 文件比读取它要简单得多,因为不需要考虑格式错误的数据,并且也知道要写出的数据的类型。可以使用之前将字符串 RDD 转为解析好的JSON 数据的库,将由结构化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去。这样一来,就可以通过已有的操作文本数据的机制和 JSON 库,使用 Spark 轻易地读取和保存 JSON 数据了。
# 假设要选出喜爱熊猫的人,就可以从第一步中获取输入数据,然后筛选出喜爱熊猫的人
# 例5-9:在 Python 保存为 JSON
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x))
.saveAsTextFile(outputFile))
// 例5-10:在 Scala 中保存为 JSON
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)
// 例5-11:在Java 中保存为JSON
class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
public Iterable<String> call(Iterator<Person> people) throws Exception {
ArrayList<String> text = new ArrayList<String>();
ObjectMapper mapper = new ObjectMapper();
while (people.hasNext()) {
Person person = people.next();
text.add(mapper.writeValueAsString(person));
}
return text;
}
}
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(
new LikesPandas()
);
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outfile);
5.2.3 逗号分隔值与制表符分隔值
逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开(在制表符分隔值文件,即 TSV 文件中用制表符隔开)。记录通常是一行一条,不过也不总是这样,有时也可以跨行。CSV 文件和 TSV 文件有时支持的标准并不一致,主要是在处理换行符、转义字符、非 ASCII 字符、非整数值等方面。CSV 原生并不支持嵌套字段,所以需要手动组合和分解特定的字段。
与 JSON 中的字段不一样的是,这里的每条记录都没有相关联的字段名,只能得到对应的序号。常规做法是使用第一行中每列的值作为字段名。
读取CSV
读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。由于格式标准的缺失,同一个库的不同版本有时也会用不同的方式处理输入数据。
# 如果恰好你的 CSV 的所有数据字段均没有包含换行符,你也可以使用 textFile() 读取并解析数据
# 例5-12:在 Python 中使用 textFile() 读取 CSV
import csv
import StringIO
...
def loadRecord(line):
input = StringIO.StringIO(line) # 解析一行 CSV记录
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
// 例5-13:在 Scala 中使用 textFile() 读取 CSV
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
// 例5-14:在 Java 中使用 textFile() 读取 CSV
import au.com.bytecode.opencsv.CSVReader;
import Java.io.StringReader;
...
public static class ParseLine implements Function<String, String[]> {
public String[] call(String line) throws Exception {
CSVReader reader = new CSVReader(new StringReader(line));
return reader.readNext();
}
}
JavaRDD<String> csvFile1 = sc.textFile(inputFile);
JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());
# 如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析各段
# 如果只有一小部分输入文件,需要使用wholeFile() 方法,可能还需要对输入数据进行重新分区使得 Spark 能够更高效地并行化执行后续操作。
# 例5-15:在 Python 中完整读取 CSV
def loadRecords(fileNameContents):
input = StringIO.StringIO(fileNameContents[1]) # 读取给定文件中的所有记录
reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
// 例5-16:在 Scala 中完整读取 CSV
case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0), x(1)))
}
// 例5-17:在 Java 中完整读取 CSV
public static class ParseLine
implements FlatMapFunction<Tuple2<String, String>, String[]> {
public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
CSVReader reader = new CSVReader(new StringReader(file._2()));
return reader.readAll();
}
}
JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);
JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
保存CSV
和 JSON 数据一样,写出 CSV/TSV 数据相当简单,同样可以通过重用输出编码器来加速。由于在 CSV 中我们不会在每条记录中输出字段名,因此为了使输出保持一致,需要创建一种映射关系。一种简单做法是写一个函数,用于将各字段转为指定顺序的数组。在 Python 中,如果输出字典,CSV 输出器会根据创建输出器时给定的fieldnames 的顺序帮我们完成这一行为。我们所使用的 CSV 库要输出到文件或者输出器,所以可以使用 StringWriter 或 StringIO来将结果放到RDD 中.
# 只能在我们知道所要输出的所有字段时使用
# 如果一些字段名是在运行时由用户输入决定的,最简单的方法是遍历所有的数据,提取不同的键,然后分别输出
# 例5-18:在 Python 中写 CSV
def writeRecords(records):
output = StringIO.StringIO() #写出一些CSV记录
writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
// 例5-19:在 Scala 中写 CSV
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
5.2.4 SequenceFile
5.2.5 对象文件
5.2.6 Hadoop输入输出格式
5.2.7 文件压缩
在大数据工作中,我们经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数 Hadoop 输出格式来说,我们可以指定一种压缩编解码器来压缩数据。我们已经提过,Spark 原生的输入方式(textFile 和sequenceFile)可以自动处理一些类型的压缩。在读取压缩后的数据时,一些压缩编解码器可以推测压缩类型。
这些压缩选项只适用于支持压缩的 Hadoop 格式,也就是那些写出到文件系统的格式。写入数据库的Hadoop 格式一般没有实现压缩支持。如果数据库中有压缩过的记录,那应该是数据库自己配置的。
选择一个输出压缩编解码器可能会对这些数据以后的用户产生巨大影响。对于像 Spark 这样的分布式系统,我们通常会尝试从多个不同机器上一起读入数据。要实现这种情况,每个工作节点都必须能够找到一条新记录的开端。有些压缩格式会使这变得不可能,而必须要单个节点来读入所有数据,这就很容易产生性能瓶颈。可以很容易地从多个节点上并行读取的格式被称为“可分割”的格式。
尽管 Spark 的 textFile() 方法可以处理压缩过的输入,但即使输入数据被以可分割读取的方式压缩,Spark 也不会打开splittable。因此,如果你要读取单个压缩过的输入,最好不要考虑使用 Spark 的封装,而是使用 newAPIHadoopFile 或者 hadoopFile,并指定正确的压缩编解码器。
有些输入格式(例如SequenceFile)允许我们只压缩键值对数据中的值,这在查询时很有用。其他一些输入格式也有自己的压缩控制:比如,Twitter 的 Elephant Bird 包中的许多格式都可以使用 LZO 算法压缩的数据。
5.3 文件系统
Spark 支持读写很多种文件系统,可以使用任何我们想要的文件格式。
5.3.1 本地/“常规”文件系统
Spark 支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下都可以找到。一些像 NFS、AFS 以及 MapR 的 NFS layer 这样的网络文件系统会把文件以常规文件系统的形式暴露给用户。如果你的数据已经在这些系统中,那么你只需要指定输入为一个 file:// 路径;只要这个文件系统挂载在每个节点的同一个路径下,Spark 就会自动处理。
// 例5-29:在 Scala 中从本地文件系统读取一个压缩的文本文件
val rdd = sc.textFile("file:///home/holden/happypandas.gz")
如果文件还没有放在集群中的所有节点上,你可以在驱动器程序中从本地读取该文件而无需使用整个集群,然后再调用parallelize 将内容分发给工作节点。不过这种方式可能会比较慢,所以推荐的方法是将文件先放到像 HDFS、NFS、S3 等共享文件系统上。
5.3.2 Amazon S3
5.3.3 HDFS
Hadoop 分布式文件系统(HDFS)是一种广泛使用的文件系统,Spark 能够很好地使用它。HDFS 被设计为可以在廉价的硬件上工作,有弹性地应对节点失败,同时提供高吞吐量。
Spark 和 HDFS 可以部署在同一批机器上,这样 Spark 可以利用数据分布来尽量避免一些网络开销。
在Spark 中使用 HDFS 只需要将输入输出路径指定为 hdfs://master:port/path 就够了。
5.4 Spark SQL 中的结构化数据
在各种情况下,我们把一条 SQL 查询给 Spark SQL,让它对一个数据源执行查询(选出一些字段或者对字段使用一些函数),然后得到由 Row 对象组成的 RDD,每个 Row 对象表示一条记录。
在 Java 和 Scala 中,Row 对象的访问是基于下标的。每个 Row 都有一个 get() 方法,会返回一个一般类型让我们可以进行类型转换。在Python 中,可以使用 row[column_number] 以及 row.column_name 来访问元素。
5.4.1 Apache Hive
Apache Hive 是 Hadoop 上的一种常见的结构化数据源。Hive 可以在 HDFS 内或者在其他存储系统上存储多种格式的表。这些格式从普通文本到列式存储格式,应有尽有。Spark SQL 可以读取 Hive 支持的任何表。
# 以由行组成的 RDD 的形式拿到返回数据
# 例5-30:用 Python 创建 HiveContext 并查询数据
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print firstRow.name
// 例5-31:用 Scala 创建 HiveContext 并查询数据
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段
// 例5-32:用 Java 创建 HiveContext 并查询数据
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users");
Row firstRow = rows.first();
System.out.println(firstRow.getString(0)); // 字段0是name字段
5.4.2 JSON
如果你有记录间结构一致的 JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,并将这些数据读取为记录,这样就可以使得提取字段的操作变得很简单。
从整个文件中获取由 Row 对象组成的 RDD。除了使用整个 Row 对象,你也可以将 RDD注册为一张表,然后从中选出特定的字段。
# 例5-33:JSON 中的示例推文,每行一条记录
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
# 可以读取这些数据,只从中选取 username(用户名)和 text(文本)字段
# 例5-34:在 Python 中使用 Spark SQL 读取 JSON 数据
tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")
// 例5-35:在 Scala 中使用 Spark SQL 读取 JSON 数据
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
// 例5-36:在 Java 中使用 Spark SQL 读取 JSON 数据
SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
tweets.registerTempTable("tweets");
SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");
5.5 数据库
通过数据库提供的 Hadoop 连接器或者自定义的 Spark 连接器,Spark 可以访问一些常用的数据库系统。本节来展示四种常见的连接器。
5.5.1 Java数据库连接
5.5.2 Cassandra
5.5.3 HBase
5.5.4 Elasticsearch
5.6 总结
在本章结束之际,你应该已经能够将数据读取到 Spark 中,并掌握了读取和保存大规模数据集的方法,将计算结果以你所希望的方式存储起来。我们调查了数据可以使用的一些不同格式,一些压缩选项以及它们对应的数据处理的方式。
第6章 Spark编程进阶
6.1 简介
本章介绍 Spark 编程的各种进阶特性,会介绍两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。在已有的 RDD 转化操作的基础上,我们为类似查询数据库这样需要很大配置代价的任务引入了批操作。为了扩展可用的工具范围,本章会介绍 Spark 与外部程序交互的方式,比如如何与用 R 语言编写的脚本进行交互。
# 例6-1:一条 JSON 格式的呼叫日志示例,其中某些字段已省略
{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}
6.2 累加器
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
第一种共享变量,即累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。例如,假设我们在从文件中读取呼号列表对应的日志,同时也想知道输入文件中有多少空行(也许不希望在有效输入中看到很多这样的行)。
# 使用累加器,而没有分别使用 filter() 和 reduce()
# 例6-2:在 Python 中累加空行
file = sc.textFile(inputFile)
blankLines = sc.accumulator(0) # 创建 Accumulator[Int] 并初始化为 0
def extractCallSigns(line):
global blankLines # 访问全局变量
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns") # flagmap 是惰性的,执行完saveAsTextFile才能看到正确的计数
print "Blank lines: %d" % blankLines.value
// 例6-3:在 Scala 中累加空行
val sc = new SparkContext(...)
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // 创建Accumulator[Int]并初始化为0
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 // 累加器加1
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value)
// 例6-4:在 Java 中累加空行
JavaRDD<String> rdd = sc.textFile(args[1]);
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
if (line.equals("")) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}
});
callSigns.saveAsTextFile("output.txt")
System.out.println("Blank lines: "+ blankLines.value());
总结起来,累加器的用法如下所示。
• 通过在驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。
• Spark 闭包里的执行器代码可以使用累加器的+= 方法(在 Java 中是 add)增加累加器的值。
• 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue())来访
问累加器的值。
例6-5:在Python 使用累加器进行错误计数
# 分别创建累加器来记录有效和无效的呼号
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)
def validateSign(sign):
global validSignCount, invalidSignCount
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
validSignCount += 1
return True
else:
invalidSignCount += 1
return False
# 对与每个呼号的联系次数进行计数
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x
+ y)
# 强制求值计算计数
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.
value)
对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。
对于在RDD 转化操作中使用的累加器,就不能保证有这种情况了。转化操作中累加器可能会发生不止一次更新。因此转化操作中的累加器最好只在调试时使用。
6.3 广播变量
Spark 的第二种共享变量类型是广播变量,它可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。
前面提过,Spark 会自动把闭包中所有引用到的变量发送到工作节点上。虽然这很方便,但也很低效。原因有二:首先,默认的任务发射机制是专门为小任务进行优化的;其次,事实上你可能会在多个并行操作中使用同一个变量,但是 Spark 会为每个操作分别发送。
例6-6这个程序可以运行,但是如果表更大(比如表中不是呼号而是IP 地址),signPrefixes 很容易就会达到数 MB 大小,从主节点为每个任务发送一个这样的数组就会代价巨大。而且,如果之后还要再次使用 signPrefixes 这个对象(可能还要在 file2.txt 上运行同样的代码),则还需要向每个节点再发送一遍。
可以把 signPrefixes 变为广播变量来解决这一问题,。广播变量其实就是类型为spark.broadcast.Broadcast[T] 的一个对象,其中存放着类型为T 的值。可以在任务中通过对 Broadcast 对象调用 value 来获取该对象的值。这个值只会被发送到各节点一次,使用的是一种高效的类似 BitTorrent 的通信机制。
# 查询 RDD contactCounts 中的呼号的对应位置。将呼号前缀读取为国家代码来进行查询
# 例6-6:在 Python 中查询国家
signPrefixes = loadCallSignTable()
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x + y)))
# 例6-7:在 Python 中使用广播变量查询国家
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x + y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
使用广播变量的过程很简单:
(1)通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。
任何可序列化的类型都可以这么实现。
(2)通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
(3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
6.4 基于分区进行操作
基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作,都是我们应当尽量避免为每个元素都配置一次的工作。Spark 提供基于分区的 map 和 foreach,让你的部分代码只对 RDD 的每个分区运行一次,这样可以帮助降低这些操作的代价。通过使用基于分区的操作,可以在每个分区内共享一个数据库连接池,来避免建立太多连接,同时还可以重用 JSON 解析器。
# 使用 mapPartitions 函数获得输入 RDD 的每个分区中的元素迭代器,而需要返回的是执行结果的序列的迭代器
# 例6-10:在 Python 中使用共享连接池
# 查询呼号
def processCallSigns(signs):
http = urllib3.PoolManager() # 创建一个连接池
urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs) # 与每条呼号记录相关联的URL
requests = map(lambda x: (x, http.request('GET', x)), urls) # 创建请求(非阻塞)
result = map(lambda x: (x[0], json.loads(x[1].data)), requests) # 获取结果
return filter(lambda x: x[1] is not None, result) # 删除空的结果并返回
# 获取呼号
def fetchCallSigns(input):
return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
contactsContactList = fetchCallSigns(validSigns)
除了避免重复的配置工作,也可以使用 mapPartitions() 避免创建对象的开销。有时需要创建一个对象来将不同类型的数据聚合起来。回忆一下第3 章中,当计算平均值时,一种方法是将数值RDD 转为二元组RDD,以在归约过程中追踪所处理的元素个数。现在,可以为每个分区只创建一次二元组,而不用为每个元素都执行这个操作。
# 例6-13:在Python 中不使用 mapPartitions() 求平均值
def combineCtrs(c1, c2):
return (c1[0] + c2[0], c1[1] + c2[1])
def basicAvg(nums): # 计算平均值
nums.map(lambda num: (num, 1)).reduce(combineCtrs)
# 例6-14:在Python 中使用mapPartitions() 求平均值
def partitionCtr(nums): # 计算分区的sumCounter
sumCount = [0, 0]
for num in nums:
sumCount[0] += num
sumCount[1] += 1
return [sumCount]
def fastAvg(nums): # 计算平均值
sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
return sumCount[0] / float(sumCount[1])
6.5 与外部程序间的管道
有三种可用的语言供你选择,这可能已经满足了你用来编写 Spark 应用的几乎所有需求。但是,如果 Scala、Java 以及 Python 都不能实现你需要的功能,那么 Spark 也为这种情况提供了一种通用机制,可以将数据通过管道传给用其他语言编写的程序,比如 R 语言脚本。
6.6 数值 RDD 的操作
Spark 对包含数值数据的 RDD 提供了一些描述性的统计操作。这是我们会在第 11 章介绍的更复杂的统计方法和机器学习方法的一个补充。
Spark 的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。
# 例6-19:用 Python 移除异常值
# 要把 String 类型 RDD 转为数字数据,这样才能使用统计函数并移除异常值
distanceNumerics = distances.map(lambda string: float(string))
stats = distanceNumerics.stats()
stddev = std.stdev()
mean = stats.mean()
reasonableDistances = distanceNumerics.filter(
lambda x: math.fabs(x - mean) < 3 * stddev
)
print reasonableDistances.collect()
第7章 在集群上运行 Spark
第8章 Spark 调优与调试
第9章 Spark SQL
本章介绍 Spark 用来操作结构化和半结构化数据的接口——Spark SQL。结构化数据是指任何有结构信息的数据。所谓结构信息,就是每条记录共用的已知的字段集合。当数据符合这样的条件时,Spark SQL 就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,Spark SQL 提供了以下三大功能:
(1)Spark SQL 可以从各种结构化数据源(例如 JSON、Hive、Parquet 等)中读取数据。
(2)Spark SQL 不仅支持在 Spark 程序内使用 SQL 语句进行数据查询,也支持从类似商业智能软件 Tableau 这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接 Spark SQL 进行查询。
(3) 当在 Spark 程序内使用 Spark SQL 时,Spark SQL 支持 SQL 与常规的 Python/Java/ Scala代码高度整合,包括连接 RDD 与 SQL 表、公开的自定义 SQL 函数接口等。这样一来,许多工作都更容易实现了。
为了实现这些功能,Spark SQL 提供了一种特殊的 RDD,叫作 SchemaRDD。SchemaRDD是存放 Row 对象的 RDD,每个 Row 对象代表一行记录。SchemaRDD 还包含记录的结构信息(即数据字段)。SchemaRDD 看起来和普通的 RDD 很像,但是在内部,SchemaRDD 可以利用结构信息更加高效地存储数据。此外,SchemaRDD 还支持 RDD 上所没有的一些新操作,比如运行 SQL 查询。SchemaRDD 可以从外部数据源创建,也可以从查询结果或普通 RDD 中创建。
9.1 连接Spark SQL
9.2 在应用中使用Spark SQL
Spark SQL 最强大之处就是可以在 Spark 应用内使用。这种方式让我们可以轻松读取数据并使用 SQL 查询,同时还能把这一过程和普通的 Python/Java/Scala 程序代码结合在一起。
要以这种方式使用 Spark SQL,需要基于已有的 SparkContext 创建出一个 HiveContext(如果使用的是去除了 Hive 支持的 Spark 版本,则创建出 SQLContext)。这个上下文环境提供了对 Spark SQL 的数据进行查询和交互的额外函数。使用 HiveContext 可以创建出表示结构化数据的 SchemaRDD,并且使用 SQL 或是类似 map() 的普通 RDD 操作来操作这些 SchemaRDD。
9.2.1 初始化Spark SQL
# 例9-5:Python 中 SQL 的 import 声明
# 导入Spark SQL
from pyspark.sql import HiveContext, Row
# 当不能引入hive依赖时
from pyspark.sql import SQLContext, Row
# 例9-8:在 Python 中创建 SQL 上下文环境
hiveCtx = HiveContext(sc)
9.2.2 基本查询示例
# 例9-11:在 Python 中读取并查询推文
input = hiveCtx.jsonFile(inputFile)
# 注册输入的 SchemaRDD 为临时表
# 临时表是当前使用的 HiveContext 或 SQLContext 中的临时变量,在你的应用退出时这些临时表就不再存在了
input.registerTempTable("tweets")
# 调用 sql() 方法,依据 retweetCount(转发计数)选出推文
topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM
tweets ORDER BY retweetCount LIMIT 10""")
9.2.3 SchemaRDD
读取数据和执行查询都会返回 SchemaRDD。SchemaRDD 和传统数据库中的表的概念类似。从内部机理来看,SchemaRDD 是一个由 Row 对象组成的 RDD,附带包含每列数据类型的结构信息。Row 对象只是对基本数据类型(如整型和字符串型等)的数组的封装。
SchemaRDD 仍然是 RDD,所以你可以对其应用已有的 RDD 转化操作,比如 map() 和 filter()。然而,SchemaRDD 也提供了一些额外的功能支持。最重要的是,你可以把任意 SchemaRDD 注册为临时表,这样就可以对它进行查询了,如例 9-11。
使用Row对象
Row 对象表示 SchemaRDD 中的记录,其本质就是一个定长的字段数组。
在 Python 中,由于没有显式的类型系统,Row 对象变得稍有不同。我们使用 row[i] 来访问第 i 个元素。除此之外,Python 中的 Row 还支持以 row.column_name 的形式使用名字来访问其中的字段。
# 例9-14:在 Python 中访问 topTweet 这个 SchemaRDD 中的 text 列
topTweetText = topTweets.map(lambda row: row.text)
9.2.4 缓存
Spark SQL 的缓存机制与 Spark 中的稍有不同。由于我们知道每个列的类型信息,所以 Spark 可以更加高效地存储数据。为了确保使用更节约内存的表示方式进行缓存而不是储存整个对象,应当使用专门的 hiveCtx.cacheTable("tableName") 方法。当缓存数据表时,Spark SQL 使用一种列式存储格式在内存中表示数据。这些缓存下来的表只会在驱动器程序的生命周期里保留在内存中,所以如果驱动器进程退出,就需要重新缓存数据。和缓存 RDD 时的动机一样,如果想在同样的数据上多次运行任务或查询时,就应把这些数据表缓存起来。
也可以使用 HiveQL/SQL 语句来缓存表。只需要运行 CACHE TABLEtableName 或 UNCACHE TABLEtableName 来缓存表或者删除已有的缓存即可。这种使用方式在 JDBC 服务器的命令行客户端中很常用。
9.3 读取和存储数据
Spark SQL 支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取到 Row 对象。这些数据源包括 Hive 表、JSON 和 Parquet 文件。此外,当你使用 SQL 查询这些数据源中的数据并且只用到了一部分字段时,Spark SQL 可以智能地只扫描这些用到的字段,而不是像SparkContext.hadoopFile 中那样简单粗暴地扫描全部数据。除这些数据源之外, 你也可以在程序中通过指定结构信息, 将常规的 RDD 转化为 SchemaRDD。这使得在 Python 或者 Java 对象上运行SQL 查询更加简单。当需要计算许多数值时,SQL 查询往往更加简洁(比如要同时求出平均年龄、最大年龄、不重复的用户 ID 数目等)。不仅如此,你还可以自如地将这些 RDD 和来自其他 Spark SQL 数据源的 SchemaRDD 进行连接操作。在本节中,我们会讲解外部数据源以及这种使用 RDD 的方式。
9.3.1 Apache Hive
# 例9-15:使用 Python 从 Hive 读取
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT key, value FROM mytable")
keys = rows.map(lambda row: row[0])
9.3.2 Parquet
9.3.3 JSON
如果你有一个 JSON 文件,其中的记录遵循同样的结构信息,那么 Spark SQL 就可以通过扫描文件推测出结构信息,并且让你可以使用名字访问对应字段(如例 9-21 所示)。如果你在一个包含大量 JSON 文件的目录中进行尝试,你就会发现 Spark SQL 的结构信息推断可以让你非常高效地操作数据,而无需编写专门的代码来读取不同结构的文件。
# 例9-21:输入记录
{"name": "Holden"}
{"name": "Sparky The Bear", "lovesPandas":true,"knows": {"friends":["holden"]}}
# 例9-22:在 Python 中使用 Spark SQL 读取 JSON 数据
# 如果想获得从数据中推断出来的结构信息,可以在生成的 SchemaRDD 上调用 printSchema 方法
input = hiveCtx.jsonFile(inputFile)
# 例 9-27:用 SQL 查询嵌套数据以及数组元素
# 如果使用 Python,或已经把数据注册为了一张 SQL 表,可以通过.来访问各个嵌套层级的嵌套元素(比如toplevel.nextlevel)
select hashtagEntities[0].text from tweets LIMIT 1;
9.3.4 基于 RDD
除了读取数据,也可以基于 RDD 创建 SchemaRDD。在 Scala 中,带有 case class 的 RDD可以隐式转换成 SchemaRDD。
9.4 JDBC/ODBC服务器
Spark SQL 也提供 JDBC 连接支持,这对于让商业智能(BI)工具连接到 Spark 集群上以及在多用户间共享一个集群的场景都非常有用。JDBC 服务器作为一个独立的 Spark 驱动器程序运行,可以在多用户之间共享。任意一个客户端都可以在内存中缓存数据表,对表进行查询。集群的资源以及缓存数据都在所有用户之间共享。
9.4.1 使用 Beeline
在 Beeline 客户端中,你可以使用标准的 HiveQL 命令来创建、列举以及查询数据表。
9.4.2 长生命周期的表与查询
使用 Spark SQL 的 JDBC 服务器的优点之一就是我们可以在多个不同程序之间共享缓存下来的数据表。JDBC Thrift 服务器是一个单驱动器程序,这就使得共享成为了可能。如前一节中所述,你只需要注册该数据表并对其运行 CACHE 命令,就可以利用缓存了。
9.5 用户自定义函数
用户自定义函数,也叫 UDF,可以让我们使用 Python/Java/Scala 注册自定义函数,并在 SQL中调用。这种方法很常用,通常用来给机构内的 SQL 用户们提供高级功能支持,这样这些用户就可以直接调用注册的函数而无需自己去通过编程来实现了。在 Spark SQL 中,编写 UDF 尤为简单。Spark SQL 不仅有自己的 UDF 接口,也支持已有的 Apache Hive UDF。
9.5.1 Spark SQL UDF
我们可以使用 Spark 支持的编程语言编写好函数,然后通过 Spark SQL 内建的方法传递进来,非常便捷地注册我们自己的 UDF。在 Scala 和 Python 中,可以利用语言原生的函数和 lambda 语法的支持,而在 Java 中,则需要扩展对应的 UDF 类。UDF 能够支持各种数据类型,返回类型也可以与调用时的参数类型完全不一样。
# 例9-36:Python 版本耳朵字符串长度 UDF
# 写一个求字符串长度的 UDF
hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
9.5.2 Hive UDF
Spark SQL 也支持已有的 Hive UDF。标准的 Hive UDF 已经自动包含在了 Spark SQL 中。如
果需要支持自定义的 Hive UDF,我们要确保该 UDF 所在的 JAR 包已经包含在了应用中。
要使用 Hive UDF,应该使用 HiveContext,而不能使用常规的 SQLContext。要注册一个 Hive UDF,只需调用 hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function")。
9.6 Spark SQL 性能
Spark SQL 提供的高级查询语言及附加的类型信息可以使 Spark SQL 数据查询更加高效。
Spark SQL 不仅是给熟悉 SQL 的用户使用的。Spark SQL 使有条件的聚合操作变得非常容易,比如对多个列进行求值。利用 Spark SQL 则不再需要像第 6 章中讨论的那样创建一些特殊的对象来进行这种操作。
# 例9-40:Spark SQL 多列求和
SELECT SUM(user.favouritesCount), SUM(retweetCount), user.id FROM tweets
GROUP BY user.id
Spark SQL 可以利用其对类型的了解来高效地表示数据。当缓存数据时,Spark SQL 使用内存式的列式存储。这不仅仅节约了缓存的空间,而且尽可能地减少了后续查询中针对某几个字段查询时的数据读取。
谓词下推可以让 Spark SQL 将查询中的一些部分工作“下移”到查询引擎上。如果我们只需在 Spark 中读取某些特定的记录,标准的方法是读入整个数据集,然后在上面执行筛选条件。然而,在 Spark SQL 中,如果底层的数据存储支持只读取键值在一个范围内的记录,或是其他某些限制条件,Spark SQL 就可以把查询语句中的筛选限制条件推到数据存储层,从而大大减少需要读取的数据。
9.7 总结
现在,我们学完了 Spark 利用 Spark SQL 进行结构化和半结构化数据处理的方式。除了本章探索过的查询语句,第 3 章到第 6 章中讲到的操作 RDD 的方法同样适用于 Spark SQL 中的 SchemaRDD。很多时候,我们会把 SQL 与其他的编程语言结合起来使用,以充分利用 SQL 的简洁性和编程语言擅长表达复杂逻辑的优点。而在使用 Spark SQL 时,Spark 执行引擎也能根据数据的结构信息对查询进行优化,让我们从中获益。
第10章 Spark Streaming
许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用、训练机器学习模型的应用,还有自动检测异常的应用。Spark Streaming 是 Spark 为这些应用而设计的模型。它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的 DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。
和批处理程序不同,Spark Streaming 应用需要进行额外配置来保证 24/7 不间断工作。本章会讨论检查点(checkpointing)机制,也就是把数据存储到可靠文件系统(比如 HDFS)上的机制,这也是Spark Streaming 用来实现不间断工作的主要方式。此外,还会讲到在遇到失败时如何重启应用,以及如何把应用设置为自动重启模式。
最后,就 Spark 1.1 来说,Spark Streaming 只可以在 Java 和 Scala 中使用。试验性的Python支持在 Spark 1.2 中引入,不过只支持文本数据。本章就只用 Java 和 Scala 来展示所有的 API,不过类似的概念对 Python 也是适用的。
第11章 基于MLlib的机器学习
MLlib 是 Spark 中提供机器学习函数的库。它是专为在集群上并行运行的情况而设计的。MLlib 中包含许多机器学习算法,可以在 Spark 支持的所有编程语言中使用。本章会展示如何在你的程序中调用MLlib,并且给出一些常用的使用技巧。
11.1 概述
MLlib 的设计理念非常简单:把数据以 RDD 的形式表示,然后在分布式数据集上调用各种算法。MLlib 引入了一些数据类型(比如点和向量),不过归根结底,MLlib 就是 RDD上一系列可供调用的函数的集合。比如,如果要用 MLlib 来完成文本分类的任务(例如识别垃圾邮件),你只需要按如下步骤操作。
(1)首先用字符串 RDD 来表示你的消息。
(2)运行 MLlib 中的一个特征提取(feature extraction)算法来把文本数据转换为数值特征(适合机器学习算法处理);该操作会返回一个向量 RDD。
(3)对向量 RDD 调用分类算法(比如逻辑回归);这步会返回一个模型对象,可以使用该对象对新的数据点进行分类。
(4)使用 MLlib 的评估函数在测试数据集上评估模型。
11.3 机器学习基础
在开始讲 MLlib 中的函数之前,先来简单回顾一下机器学习的相关概念。
机器学习算法尝试根据训练数据(training data)使得表示算法行为的数学目标最大化,并以此来进行预测或作出决定。机器学习问题分为几种,包括分类、回归、聚类,每种都有不一样的目标。拿分类(classification)作为一个简单的例子:分类是基于已经被标记的其他数据点(比如一些已经分别被标记为垃圾邮件或非垃圾邮件的邮件)作为例子来识别一个数据点属于几个类别中的哪一种(比如判断一封邮件是不是垃圾邮件)。
所有的学习算法都需要定义每个数据点的特征(feature)集,也就是传给学习函数的值。举个例子,对于一封邮件来说,一些特征可能包括其来源服务器、提到 free 这个单词的次数、字体颜色等。在很多情况下,正确地定义特征才是机器学习中最有挑战性的部分。例如,在产品推荐的任务中,仅仅加上一个额外的特征(例如我们意识到推荐给用户的书籍可能也取决于用户看过的电影),就有可能极大地改进结果。
大多数算法都只是专为数值特征(具体来说,就是一个代表各个特征值的数字向量)定义的,因此提取特征并转化为特征向量是机器学习过程中很重要的一步。例如,在文本分类中(比如垃圾邮件和非垃圾邮件的例子),有好几个提取文本特征的方法,比如对各个单词出现的频率进行计数。
当数据已经成为特征向量的形式后,大多数机器学习算法都会根据这些向量优化一个定义好的数学函数。例如,某个分类算法可能会在特征向量的空间中定义出一个平面,使得这个平面能“最好”地分隔垃圾邮件和非垃圾邮件。这里需要为“最好”给出定义(比如大多数数据点都被这个平面正确分类)。算法会在运行结束时返回一个代表学习决定的模型(比如这个选中的平面),而这个模型就可以用来对新的点进行预测(例如根据新邮件的特征向量在平面的哪一边来决定它是不是垃圾邮件)。
最后,大多数机器学习算法都有多个会影响结果的参数,所以现实中的机器学习流水线会训练出多个不同版本的模型,然后分别对其进行评估(evaluate)。要这么做的话,通常需要把输入数据分为“训练集”和“测试集”,并且只使用前者进行训练,这样就可以用后者来检验模型是否过度拟合(overfit)了训练数据。MLlib 提供了几个算法来进行模型评估。
示例:垃圾邮件分类
# 例11-1:Python 版垃圾邮件分类器
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD
# spam.txt:垃圾邮件,normal.txt:非垃圾邮件,每行一个
# 根据词频把每个文件中的文本转化为特征向量,然后训练出一个可以把两类消息分开的逻辑回归模型
spam = sc.textFile("spam.txt")
normal = sc.textFile("normal.txt")
# 创建一个 HashingTF 实例来把邮件文本映射为包含 10000 个特征的向量,从文本数据构建词频(term frequency)特征向量
tf = HashingTF(numFeatures = 10000)
# 各邮件都被切分为单词,每个单词被映射为一个特征
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
normalFeatures = normal.map(lambda email: tf.transform(email.split(" ")))
# 创建 LabeledPoint 数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = normalFeatures.map(lambda features: LabeledPoint(0, features))
trainingData = positiveExamples.union(negativeExamples)
trainingData.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据 RDD
# 使用随机梯度下降法(Stochastic Gradient Descent,简称SGD)实现逻辑回归
model = LogisticRegressionWithSGD.train(trainingData)
# 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试。首先使用一样的 HashingTF 特征来得到特征向量,然后对该向量应用得到的模型
posTest = tf.transform("O M G GET cheap stuff by sending money to ...".split(" "))
negTest = tf.transform("Hi Dad, I started studying Spark the other ...".split(" "))
print "Prediction for positive test example: %g" % model.predict(posTest)
print "Prediction for negative test example: %g" % model.predict(negTest)
11.4 MLlib 数据类型
(1)Vector
一个数学向量。MLlib 既支持稠密向量也支持稀疏向量,前者表示向量的每一位都存储下来,后者则只存储非零位以节约空间。
(2)LabeledPoint
在诸如分类和回归这样的监督式学习(supervised learning)算法中,LabeledPoint 用来表示带标签的数据点。它包含一个特征向量与一个标签(由一个浮点数表示)。
(3)Rating
用户对一个产品的评分,用于产品推荐。
(4)各种 Model 类
每个 Model 都是训练算法的结果,一般有一个 predict() 方法可以用来对新的数据点或数据点组成的RDD 应用该模型进行预测。
大多数算法直接操作由 Vector、LabeledPoint 或 Rating 对象组成的 RDD。你可以用任意方式创建出这些对象,不过一般来说你需要通过对外部数据进行转化操作来构建出 RDD:例如,通过读取一个文本文件或者运行一条 Spark SQL 命令。接下来,使用 map() 将你的数据对象转为 MLlib 的数据类型。
操作向量
向量有两种:稠密向量与稀疏向量。稠密向量把所有维度的值存放在一个浮点数数组中。例如,一个 100 维的向量会存储 100 个双精度浮点数。相比之下,稀疏向量只把各维度中的非零值存储下来。当最多只有 10% 的元素为非零元素时,我们通常更倾向于使用稀疏向量(不仅是出于对内存使用的考虑,也是出于对速度的考虑)。许多特征提取技术都会生成非常稀疏的向量,所以这种方式常常是一种很关键的优化手段。
创建向量的方式在各种语言中有一些细微的差别。在 Python 中,你在 MLlib 中任意地方传递的 NumPy 数组都表示一个稠密向量,你也可以使用 mllib.linalg.Vectors 类创建其他类型的向量。而在 Java 和 Scala 中,都需要使用 mllib.linalg.Vectors 类。
# 例11-4:用 Python 创建向量
from numpy import array
from pyspark.mllib.linalg import Vectors
# 创建稠密向量<1.0, 2.0, 3.0>
denseVec1 = array([1.0, 2.0, 3.0]) # NumPy 数组可以直接传给 MLlib
denseVec2 = Vectors.dense([1.0, 2.0, 3.0]) # 或者使用 Vectors 类来创建
# 创建稀疏向量<1.0, 0.0, 2.0, 0.0>;该方法只接收向量的维度(4)以及非零位的位置和对应的值
# 这些数据可以用一个 dictionary 来传递,或使用两个分别代表位置和值的 list
sparseVec1 = Vectors.sparse(4, {0: 1.0, 2: 2.0})
sparseVec2 = Vectors.sparse(4, [0, 2], [1.0, 2.0])
11.5 算法
11.5.1 特征提取
mllib.feature 包中包含一些用来进行常见特征转化的类。这些类中有从文本(或其他表示)创建特征向量的算法,也有对特征向量进行正规化和伸缩变换的方法。
TF-IDF
词频—逆文档频率(简称 TF-IDF)是一种用来从文本文档(例如网页)中生成特征向量的简单方法。它为文档中的每个词计算两个统计值:一个是词频(TF),也就是每个词在文档中出现的次数,另一个是逆文档频率(IDF),用来衡量一个词在整个文档语料库中出现的(逆)频繁程度。这些值的积,也就是TF × IDF,展示了一个词与特定文档的相关程度(比如这个词在某文档中很常见,但在整个语料库中却很少见)。
MLlib 有两个算法可以用来计算 TF-IDF:HashingTF 和 IDF, 都在 mllib.feature 包内。
(1)HashingTF 从一个文档中计算出给定大小的词频向量。为了将词与向量顺序对应起来,它使用了哈希法(hasing trick)。在类似英语这样的语言中,有几十万个单词,因此将每个单词映射到向量中的一个独立的维度上需要付出很大代价。而 HashingTF 使用每个单词对所需向量的长度 S 取模得出的哈希值,把所有单词映射到一个0 到S-1 之间的数字上。由此我们可以保证生成一个 S 维的向量。在实践中,即使有多个单词被映射到同一个哈希值上,算法依然适用。MLlib 开发者推荐将S 设置在 218 到 220 之间。
(2)当你构建好词频向量之后,你就可以使用 IDF 来计算逆文档频率,然后将它们与词频相乘来计算TF-IDF。
# 例11-7:在 Python 中使用 HashingTF
# HashingTF 可以一次只运行于一个文档中,也可以运行于整个 RDD 中。它要求每个“文档”都使用对象的可迭代序列来表示:例如 Python 中的 list 或 Java 中的 Collection
from pyspark.mllib.feature import HashingTF
sentence = "hello hello world"
words = sentence.split() # 将句子切分为一串单词
tf = HashingTF(10000) # 创建\一个向量,其尺寸 S = 10,000
tf.transform(words)
SparseVector(10000, {3065: 1.0, 6861: 2.0})
rdd = sc.wholeTextFiles("data").map(lambda (name, text): text.split())
tfVectors = tf.transform(rdd) # 对整个 RDD 进行转化操作
# 例11-8:在 Python 中使用 TF-IDF
from pyspark.mllib.feature import HashingTF, IDF
# 将若干文本文件读取为TF向量
rdd = sc.wholeTextFiles("data").map(lambda (name, text): text.split())
tf = HashingTF()
tfVectors = tf.transform(rdd).cache()
# 首先对 IDF 对象调用 fit() 方法来获取一个 IDFModel,它代表语料库中的逆文档频率
# 接下来,对模型调用 transform() 来把 TF 向量转为 IDF 向量
idf = IDF()
idfModel = idf.fit(tfVectors)
tfIdfVectors = idfModel.transform(tfVectors)
缩放
大多数机器学习算法都要考虑特征向量中各元素的幅值,并且在特征缩放调整为平等对待时表现得最好(例如所有的特征平均值为0,标准差为1)。
# 例11-9:在 Python 中缩放向量
from pyspark.mllib.feature import StandardScaler
vectors = [Vectors.dense([-2.0, 5.0, 1.0]), Vectors.dense([2.0, 0.0, 1.0])]
dataset = sc.parallelize(vectors) # 构建特征向量
scaler = StandardScaler(withMean=True, withStd=True) # 进行缩放
model = scaler.fit(dataset) # 为每一列计算平均值和标准差
result = model.transform(dataset) # 缩放一个数据集
# 结果:{[-0.7071, 0.7071, 0.0], [0.7071, -0.7071, 0.0]}
正规化
在一些情况下,在准备输入数据时,把向量正规化为长度1 也是有用的。使用 Normalizer 类可以实现,只要使用 Normalizer.transform(rdd) 就可以了。
Word2Vec
Word2Vec 是一个基于神经网络的文本特征化算法,可以用来将数据传给许多下游算法。Spark 在 mllib.feature.Word2Vec 类中引入了该算法的一个实现。
要训练 Word2Vec,你需要传给它一个用 String 类(每个单词用一个)的 Iterable 表示的语料库。和前面的“TF-IDF”一节所讲的很像,Word2Vec 也推荐对单词进行正规化处理(例如全部转为小写、去除标点和数字)。当你通过 Word2Vec.fit(rdd) 训练好模型之后,你会得到一个 Word2VecModel,可以用来将每个单词通过 transform() 转为一个向量。注意,Word2Vec 算法中模型的大小等于你的词库中的单词数乘以向量的大小(向量大小默认为 100)。你可能希望筛选掉不在标准字典中的单词来控制模型大小。一般来说,比较合适的词库大小约为 100000 个词。
11.5.2 统计
不论是在即时的探索中,还是在机器学习的数据理解中,基本的统计都是数据分析的重要部分。MLlib 通过 mllib.stat.Statistics 类中的方法提供了几种广泛使用的统计函数,这些函数可以直接在 RDD 上使用。
11.5.3 分类与回归
分类与回归是监督式学习的两种主要形式。监督式学习指算法尝试使用有标签的训练数据(也就是已知结果的数据点)根据对象的特征预测结果。分类和回归的区别在于预测的变量的类型:在分类中,预测出的变量是离散的(也就是一个在有限集中的值,叫作类别);比如,分类可能是将邮件分为垃圾邮件和非垃圾邮件,也有可能是文本所使用的语言。在回归中,预测出的变量是连续的(例如根据年龄和体重预测一个人的身高)。
线性回归
线性回归是回归中最常用的方法之一,是指用特征的线性组合来预测输出值。
# 例11-10:Python 中的线性回归
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
points = # (创建 LabeledPoint 组成的 RDD)
model = LinearRegressionWithSGD.train(points, iterations=200, intercept=True)
print "weights: %s, intercept: %s" % (model.weights, model.intercept)
逻辑回归
逻辑回归是一种二元分类方法,用来寻找一个分隔阴性和阳性示例的线性分割平面。
支持向量机
支持向量机(简称 SVM)算法是另一种使用线性分割平面的二元分类算法,同样只预期 0 或者 1 的标签。
朴素贝叶斯
朴素贝叶斯(Naive Bayes)算法是一种多元分类算法,它使用基于特征的线性函数计算将一个点分到各类中的得分。这种算法通常用于使用 TF-IDF 特征的文本分类,以及其他一些应用。
决策树与随机森林
决策树是一个灵活的模型,可以用来进行分类,也可以用来进行回归。决策树以节点树的形式表示,每个节点基于数据的特征作出一个二元决定(比如,这个人的年龄是否大于 20?),而树的每个叶节点则包含一种预测结果(例如,这个人是不是会买一个产品?)。决策树的吸引力在于模型本身容易检查,而且决策树既支持分类的特征,也支持连续的特征。
11.5.4 聚类
聚类算法是一种无监督学习任务,用于将对象分到具有高度相似性的聚类中。前面提到的监督式任务中的数据都是带标签的,而聚类可以用于无标签的数据。该算法主要用于数据探索(查看一个新数据集是什么样子)以及异常检测(识别与任意聚类都相距较远的点)。
11.5.5 协同过滤与推荐
协同过滤是一种根据用户对各种产品的交互与评分来推荐新产品的推荐系统技术。协同过滤吸引人的地方就在于它只需要输入一系列用户/ 产品的交互记录:无论是“显式”的交互(例如在购物网站上进行评分)还是“隐式”的(例如用户访问了一个产品的页面但是没有对产品评分)交互皆可。仅仅根据这些交互,协同过滤算法就能够知道哪些产品之间比较相似(因为相同的用户与它们发生了交互)以及哪些用户之间比较相似,然后就可以作出新的推荐。
尽管 MLlib 的 API 使用了“用户”和“产品”的概念,但你也可以将协同过滤用于其他应用场景中,比如在社交网络中推荐用户,为文章推荐要添加的标签,为电台推荐歌曲等。
交替最小二乘
MLlib 中包含交替最小二乘(简称 ALS)的一个实现,这是一个协同过滤的常用算法,可以很好地扩展到集群上。
11.5.6 降维
主成分分析
给定一个高维空间中的点的数据集,我们经常需要减少点的维度,来使用更简单的工具对其进行分析。例如,我们可能想要在二维平面上画出这些点,或者只是想减少特征的数量使得模型训练更加高效。
机器学习社区中使用的主要的降维技术是主成分分析(简称PCA)。在这种技术中,我们会把特征映射到低维空间,让数据在低维空间表示的方差最大化,从而忽略一些无用的维度。要计算出这种映射,我们要构建出正规化的相关矩阵,并使用这个矩阵的奇异向量和奇异值。与最大的一部分奇异值相对应的奇异向量可以用来重建原始数据的主要成分。
奇异值分解
MLlib 也提供了低层的奇异值分解(简称SVD)原语。对于大型矩阵,通常不需要进行完全分解,只需要分解出靠前的奇异值和与之对应的奇异向量即可。这样可以节省存储空间、降噪,并有利于恢复低秩矩阵。
11.5.7 模型评估
无论机器学习任务使用的是何种算法,模型评估都是端到端机器学习流水线的一个重要环节。许多机器学习任务可以使用不同的模型来应对,而且即使使用的是同一个算法,参数设置也可以带来不同的结果。不仅如此,我们还要考虑模型对训练数据过度拟合的风险,因此你最好通过在另一个数据集上测试模型来对模型进行评估,而不是使用训练数据集。
11.6 一些提示与性能考量
11.6.1 准备特征
尽管机器学习演讲中经常着重强调所使用的算法,但切记在实践中,每个算法的好坏只取决于你所使用的特征!许多从事大规模数据机器学习的人员都认为特征准备是大规模机器学习中最重要的一步。添加信息更丰富的特征(例如与其他数据集连接以引入更多信息)与将现有特征转为合适的向量表示(例如缩放向量)都能极大地帮助改进结果。
11.6.2 配置算法
在正规化选项可用时,MLlib 中的大多数算法都会在正则化打开时表现得更好(在预测准确度方面)。此外,大多数基于 SGD 的算法需要大约 100 轮迭代来获得较好的结果。MLlib 尝试提供合适的默认值,但是你应该尝试增加迭代次数,来看看是否能够提高精确度。例如,使用 ALS 算法时,rank 的默认值10 相对较低,所以你应该尝试提高这个值。确保在评估这些参数变化时将测试数据排除在训练集之外。
11.6.3 缓存RDD以重复使用
MLlib 中的大多数算法都是迭代的,对数据进行反复操作。因此,在把输入数据集传给 MLlib 前使用cache() 将它缓存起来是很重要的。即使数据在内存中放不下,你也应该尝试 persist。
11.6.4 识别稀疏程度
当你的特征向量包含很多零时,用稀疏格式存储这些向量会为大规模数据集节省巨大的时间和空间。在空间方面,当至多三分之二的位为非零值时,MLlib 的稀疏表示比它的稠密表示要小。在数据处理代价方面,当至多 10% 的位为非零值时,稀疏向量所要花费的代价也会更小。(这是因为使用稀疏表示需要对向量中的每个元素执行的指令比使用稠密向量表示时要多。)但是如果使用稀疏表示能够让你缓存使用稠密表示时无法缓存的数据,即使数据本身比较稠密,你也应当选择稀疏表示。
11.6.5 并行度
对于大多数算法而言,你的输入 RDD 的分区数至少应该和集群的 CPU 核心数相当,这样才能达到完全的并行。回想一下,默认情况下 Spark 会为文件的每个“块”创建一个分区,而块一般为 64 MB。你可以通过向 SparkContext.textFile() 这样的函数传递分区数的最小值来改变默认行为:例如 sc.textFile("data.txt", 10)。另一种方法是对 RDD 调用 repartition(numPartitions) 来将 RDD 分区成 numPartitions 个分区。你始终可以通过 Spark 的网页用户界面看到每个 RDD 的分区数。同时,注意不要使用太多分区,因为这会增加通信开销。
11.7 流水线 API
流水线就是一系列转化数据集的算法(要么是特征转化,要么是模型拟合)。流水线的每个步骤都可能有参数(例如逻辑回归中的迭代次数)。流水线 API 通过使用所选的评估矩阵评估各个集合,使用网格搜索自动找到最佳的参数集。
11.8 总结
本章概述了 Spark 的机器学习算法库。如你所见,MLlib 与 Spark 的其他 API 紧密联系。它可以让你操作 RDD,而得到的结果也可以在其他 Spark 函数中使用。MLlib 也是 Spark开发最为活跃的组件之一,它还在不断发展中。