Spark 组成: lower-level APIS , Structured APIs 以及一系列附加的函数库
Running Production Applications
spark使得开发和创建针对大数据的应用程序变得非常简单。 通过spark-submit一个命令行构建工具,Spark还使您可以轻松地将交互式探索转换为生产应用程序
spark-submit does one thing:他使得你发送你的应用程序代码到集群并且在提交到的地方执行。一旦提交,应用将会运行直到退出(完成任务),或者遇到错误,你可以在任何Spark支持的集群管理器上使用它,包括 Standalone 、 Mesos 、 Yarn
spark-submit提供了一些controls,使用这些controls你可以指定应用所需的资源,他如何运行,以及命令行参数。
你可以使用任何Spark支持的语言编写应用程序并提交他们执行。最简单的方式就是在你本地的机器上运行应用,我们以Spark自带的Scala应用程序实例进行说明,在Spark的安装目录下执行以下程序:
./bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master local\
./examples/jars/sparkexamples_2.11-2.2.0.jar 10
该应用用于估计一个pi值,上面的命令意思是指定使用我们本地机器运行该程序,想要运行的Jar文件以及一些命令行参数。
Python版本的如下:
/bin/spark-submit \
--master local \
./examples/src/main/python/pi.py 10
通过改变上面spark-submit中的master参数的值,我们也可以将同样的应用程序提交到Spark的Standlone 集群管理器,Mesos或者Yarn上执行。
Datasets: Type-Safe Structured APIs
第一个介绍的是 type-safe 的Sparks API 叫做 DataSets 。适用于Java和SCala编写的静态类型的代码。他无法在Python和R等动态类型的语言中使用。
正如前面所讲的DataFrame,一个分布式的Row类型的数据集可以存储多种表格类型的数据。 Dataset API允许用户为数据框架中的记录分配Java/Scala类,并将其作为类型化对象的集合进行操作,类似于Java数组列表或Scala Seq,DataSet的API是类型安全的,这意味着你不能将数据集中的对象视为除指定类之外的另一个类, 这使得数据集对于编写大型应用程序特别有吸引力,因为大型项目中许多软件工程师必须通过定义良好的接口与之交互。
Dataset类是参数化的,其中包含的对象类型为: Dataset<T> in Java ,以及 Dataset[T] in Scala .例如, Dataset[Person] 意味着该DataSet中包含的为Person类。在Spark 2.0中,支持所有Java中遵循Java bean模式的类,以及Scala中的case类, 这些类型是受限制的,因为Spark需要能够自动分析类型T并为表格数据创建适当的schema。
DataSet的一个好处是,你可以仅在需要或者想要使用的时候才使用他,例如在下面的例子中,我们定义了自己的数据类型并且通过任意map和filter函数操作他。之后执行我们的action。Spark 可以自动的将它转化成DataFrame,并且我们可以进一步通过使用几百个Spark包含的函数操作它。这就使得当必要时回退到低级的API执行类型安全的代码,当需要更快分析时就以高级的SQL执行。这里有一个简单的例子展示了我们如何同时使用type-safe 函数以及DataFrame-like SQL 快速的表达我们的业务逻辑、
// in Scala
case class Flight(DEST_COUNTRY_NAME:String,
ORIGIN_COUNTRY_NAME: String,
count: Big Int)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
最后一个优势是当你调用DataSet上的Collect 或者take时,他将在DataSet中收集适当类型的Object,而不是DataFrame Rows, 这样就可以很容易地保证类型并安全,并安全的地以分布式和本地方式执行操作,而无需更改代码:
// in Scala
flights
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(flight_row => flight_row)
.take(5)
flights
.take(5)
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))
第11章再详细介绍。
Structured Streaming
Structured Streaming 是一个关于流式程序的high-level API ,在spark2.2中已经是production-ready。使用Structured Streaming,你可以像操作批处理使用Spark的structured API那样执行流式程序,这样可以减少延迟并且支持增量程序。关于Structured Streaming最大的好处就是,你可以快速的从流式系统中提取价值,而不需要修改代码,同时他使得概念转化变得容易,因为可以将批处理作业编写成一种输入作业方式,并且将它转化成流式处理,这个方式的所有工作都是增量的处理数据。
我们看一个实例,这实例中,我们将看到一个零售数据集,他有一个特定的日期和时间可以供我们使用,我们将使用一个按天划分的数据集,每一个文件表示一天的数据。 我们用这种格式来模拟以在一致性规则的方式下不同的过程产生的数据。 这是零售数据,想象一下,这些数据是由零售商店生产的,并被发送到我们的结构化流是系统中,
数据的样本如下:
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,Unit Price,Customer ID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17...
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kin...
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850...
为了解决这个问题,我们首先将数据作为static DataSet进行分析,然后创建一个DataFrame,我们也将为这个static DataSet创建一个schema ( 有一些使用schema推断的方法,我们将会第五部分涉及到)
// in Scala
val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/by-day/*.csv")
static DataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
# in Python
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("infer Schema", "true")\
.load("/data/retail-data/by-day/*.csv")
static DataFrame.createOrReplaceTempView("retail_data")
static Schema = static Data Frame.schema
因为我们使用的是时间序列的数据(time–series data),值得一提的是我们如何分组和聚合我们的数据,在这个例子中, 在本例中,我们将查看给定客户(通过CustomerId标识)进行大型购买时的销售时间,例如,让我们添加一个总花费的列,看看那一天客户花费最多。
窗口函数将包含聚合中每天的数据。他就是一个简单的在我们时间序列上的窗口。这是一个很有用的工具,因为我们可以指定我们的查询在更多的人上面,并且Spark会把他们分组聚集在一起:
// in Scala
import org.apache.spark.sql.functions.{window, column, desc, col}
staticDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
.show(5)
// in Python
import org.apache.spark.sql.functions.{window, column, desc, col}
static DataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)
你也可以使用SQL执行,正如我们之前提到的那样,
Here’s a sample of the output that you’ll see:
Customer Id | window | sum(total_cost) |
---|---|---|
17450.0 | [2011-09-20 00:00... | 71601.44 |
... | ... | ... |
null | [2011-12-08 00:00... | 31975.590000000007 |
null 值代表一个事实,即在一个事务中没有customerId,这是一个static DataFrame version,如果你熟悉这个语法这并没有什么可意外的,你可能很像在本地local mode上执行他,因为这是一个很好的练习,去自己指定shuffle之后需要创建的partition的个数,默认情况下值为200,但是因为有很多executors在这台机器上,吧这个值减小到5是有必要的,正如第2章中做的那样,因此如果你不记得他为什么重要,看先前一章。
spark.conf.set("spark.sql.shuffle.partitions", "5")
让我们看看一个streaming code,你可能注意到他与之前的代码基本没什么变化,最大的变化在于我们使用readStram而不是read,此外你可能已经注意到,maxFilesPerTrigger 参数,可以轻松的指定我们一次将要读取的文件数。这是为了我们更好的展示“streaming ”,在生产环境中可以别忽略。
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.format("csv")
.option("header", "true")
.load("/data/retail-data/by-day/*.csv")
//In python
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/data/retail-data/by-day/*.csv")
Now we can see whether our Data Frame is streaming:
streamingDataFrame.isStreaming // returns true
让我们实现上面DataFrame相同的逻辑操作,我们将执行一个求和的过程:
//In Scala
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerID",
"(UnitPrce * Quantity) As total_cost",
"InvoiceDate")
.groupBy(
$"CustomerId",window($"InvoiceDate","1 day"))
.sum("total_cost")
//In Python
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerID",
"(UnitPrce * Quantity) As total_cost",
"InvoiceDate")\
.groupBy(
$"CustomerId",window($"InvoiceDate","1 day"))\
.sum("total_cost")
这依旧是一个lazy操作,因此我们需要调用Stream的action去启动并执行这个dataflow
Streaming action与我们常规的static action 有一点不一样,因为我们打算在某个地方填充数据,而不仅仅是像调用count那样(这在stream上没有任何意义),我们将使用的action将输出一个内存表,且我们会在每个trigger之后更新他,这种情况下,每一个tigger基于一个单个文件(我们可以在read参数中设置)。Spark将操作内存表中的数据,这样我们将始终保存在之前聚合中的最高值:
//In Scala
purchaseBycustomerPerHour.writeStream
.format("memory") //memory = store in-memory table
.queryName("customer_purchases") //the name of the in-memory table
.outputMode("complete") //complete = all the counts should be in the table
.start()
//In Python
purchaseBycustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()
当我们开启这个steam,如果我们将输出到一个production sink我们可以对他进行查询,来调试我们的结果,
// in Scala
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")
.show(5)
# in Python
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)
你可能注意到,随着我们读取更多的数据,表的组成也会发生变化!对于每一个文件,结果可能会也可能不会根据数据变化,当然因为我们grouping customers,,我们希望看到随着时间的推移最大客户的购买量会增加(每段时间都是如此),你可以使用另一个选项将结果输出到控制台:
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases_2")
.outputMode("complete")
.start()
你不应该在生产中使用这两种流方法,但是他们确实展示了Structured Streaming’s power.注意这个时间窗口针对的是事件时间域,而不是Spark处理数据的时间域。这是Structure Streeaming已经解决的缺点,我们将在第五部分深入讨论Structure Streeaming。
Machine Learning and Advanced Analytics
Spark 另一个受欢迎的方面是使用已经建立的被称为MLLib的机器学习的库处理大规模机器学习的能力,MLLib允许预处理,训练模型并且对数据进行大规模预测,你甚至可以使用MLLIb训练好的模型在Structure Streaming中进行预测,Spark提供复杂的机器学习API处理大量的机器学习任务,从分类到回归,以及集群的深度学习。为了演示,我们使用标准的被称为K-means的基本聚类算法对数据聚类。
WHAT IS K-MEANS?
k-means是一个聚类算法,其中的k是在数据中随机分配的。 然后,最接近该点的点被“分配”给一个类,并计算出指定点的中心。这个中心点被叫做centroid(即质点),之后我们标记距离质点近的点,并认为这些点与该质点是同一类的,将质点移到这个类群的新中心。我们将重复这个过程执行有限的迭代次数,或者直到收敛。
Spark包含大量的预先处理的开箱即用的方法,为了展示这些方法,我们将以raw数据开始,构建transformation将数据转化成正确的格式,在这一点上,我们可以训练我们的模型之后用于预测;
staticDataFrame.printSchema()
|
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
在MLLib中的机器学习算法要求数据必须为数值类型。我们当前的数据有大量不同的数据类型,包含timestamps,Integer,String。因此我们需要转化这些数据为数值表示。在这个实例中,我们将使用几个DataFrame的transformations来操作我们的数据,
// in Scala
import org.apache.spark.sql.functions.date_format
val preppedDataFrame = staticDataFrame
.na.fill(0)
.withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
.coalesce(5)
我们还需要将数据分成训练集和测试集。在这个例子中,我们将根据发生购买的日期手动执行此操作,当然我们也可以使用MLLib的transformation API划分训练集和测试集。
// in Scala
val trainDataFrame = preppedDataFrame
.where("Invoice Date < '2011-07-01'")
val testDataFrame = preppedDataFrame
.where("Invoice Date >= '2011-07-01'")
因为是时间序列数据,我们需要按照数据中任意的数据对数据进行切分。 虽然这可能不是我们的训练和测试的最佳分割,但是从这个示例的意图和目的来看,它将很好地工作。我们将看到数据集大致分成两部分:
trainDataFrame.count()
testDataFrame.count()
注意这些transformations是DataFrame transformations,我们在第二章曾介绍过。Spark的MLlib还提供了许多transformations,我们可以用这些transformations实现常规的操作,一个简单的例子就是StringIndexer:
// in Scala
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.set InputCol("day_of_week")
.set OutputCol("day_of_week_index")
# in Python
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.set InputCol("day_of_week")\
.set OutputCol("day_of_week_index")
这将把我们的day_of_week转换成相应的数值,例如,Spark可能将Saturday转化为6,将Monday转化为1。然而使用这个数值类型的schema,我们可以含蓄的说Saturday比Monday大(纯数值大小)。这显然是不正确的,为了解决这个问题,我们需要OneHotEncoder去编码列中的每一个值。这些Boolean的标志标识day of week是一周中的某一天。
// in Scala
import org.apache.spark.ml.feature.OneHotEncoder
val encoder = newOneHotEncoder()
.setInputCol("day_of_week_index")
.setOutputCol("day_of_week_encoded")
# in Python
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")
每一个都将产生一组列,我们将“assemble”成一个向量。Spark中的所有机器学习算法都以Vector类型作为输入,且这个 Vector必须是一组数值:
// in Scala
import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded"))
.setOutputCol("features")
# in Python
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")
此时我们有三个关键特征:the price, the quantity, and the day of week。下一步, 我们将把它设置成一个管道,这样以后我们需要转换的任何数据都可以经过相同的过程:
// in Scala
import org.apache.spark.ml.Pipeline
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))
# in Python
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])
为 training做准备是一个分两步走的过程。 首先,我们需要将transformers匹配到这个dataset, 们将在第6部分深入讨论这个问题, 但基本上,我们的StringIndexer需要知道有多少惟一值要被索引。 这些值存在之后,编码就很容易了,但是Spark必须查看要索引的列中的所有不同值,以便以后存储这些值:
// in Scala
val fittedPipeline = transformationPipeline.fit(trainDataFrame)
# in Python
fittedPipeline = transformationPipeline.fit(trainDataFrame)
在我们拟合训练数据之后, 我们准备好使用这个fitted pipeline,并使用它以一相同的和重复的方式transform 我们的所有数据:
// in Scala
val transformedTraining = fittedPipeline.transform(trainDataFrame)
# in Python
transformedTraining = fittedPipeline.transform(trainDataFrame)
在这一点上,值得一提的是,我们本可以在我们的pipeline中包含我们的模型培训,但是为了演示缓存数据的用例,我们并没有选择这样做。取而代之的是,我们将对模型执行一些超参数调优,因为我们不想重复完全相同的transformation。 具体来说,我们将使用缓存,这是一种优化,我们将在第4部分中详细讨论。 这将把中间transformed dataset的副本放入内存,允许我们以比再次运行整个pipeline低得多的成本反复访问它。 使用缓存与不使用缓存的效果是显著的:
transformedTraining.cache()
我们现在有一个训练集, 是时候训练模型了。 首先,我们将导入我们想要使用的相关模型并实例化它:
// in Scala
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
.setK(20)
.setSeed(1L)
# in Python
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1L)
在Spark中,训练一个机器学习模型是一个两步操作。 首先,我们初始化一个未经训练的模型,之后我们训练他。 在MLlib的Data Frame API中,每种算法都有两种类型。他们遵循以下命名模式,Algorithm用于未经训练的版本,AlgorithmModel是训练后的版本。在我们的示例中,KMeans和KMeansModel两个。
MLlib的DataFrame API中的估计量共享大致相同的接口, 正如我们之前看到的预处理transformers,比如 StringIndexer.。 这应该不足为奇,因为它使训练整个pipeline(包括模型)变得简单。 但我们在这个例子中并没有这样做,主要是出于一些目的,即我们想一步一步地做更多的事情:
// in Scala
val kmModel = kmeans.fit(transformed Training)
# in Python
kmModel = kmeans.fit(transformedTraining)
当我们训练完模型之后, 我们可以根据我们训练集上的一些成功的优点来计算成本。 这个数据集的最终成本实际上相当高。 这个数据集的最终成本实际上是相当高的,这可能是由于我们没有正确地预处理和缩放我们的输入数据,我们在第25章深入讨论:
// in Scala
val transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformed Training)
# in Python
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)
当然,我们可以继续改进这个模型,分层进行更多的预处理,并执行超参数调优,以确保得到一个好的模型。我们把这个讨论留到第六部分
Lower-Level APIs
Spark包含一些低级的基本单元允许任意的Java和Python对象通过RDDS进行操作。事实上,Spark的所有东西都是构建RDDs上的。我们将在第四章讨论,DataFrame也是构建在RDD之上的, 并且DataFrame会被编译成这些较低级别的工具,以方便和非常高效地分发执行。 有些事情您可以使用RDDs,尤其是在读取或操作原始数据时,但是在大多数情况下,您应该坚持使用结构化api。RDD相较于DataFrame是低级API,因为它们向最终用户显示物理执行特征(如分区)。
您可以使用RDDs来对存储在驱动程序机器内存中的原始数据进行并行化。 例如,让我们并行化一些简单的数字,然后创建一个DataFrame,之后我们可以将它转化成其他的DataFrame:
// in Scala
spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF()
# in Python
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
RDDs在Scala和Python中都可用,然而他们并不完全等价。 这与DataFrame API(执行特征相同)不同,这是由于一些底层实现细节造成的。 我们将在第4部分中介绍较低层的api,包括RDDs。 作为最终用户,您不应该为了执行许多任务而过多地使用RDDs,除非您仍然保留较旧的Spark代码。 在现在新版的 Spark中基本上没有实例可以让您使用RDDs,而是推荐使用结构化api来处理一些非常原始的数据。