https://medium.com/@atul94/getting-started-with-apache-spark-ad9d59e71f6f
Apache Spark被解释为“用于大规模数据处理的快速通用引擎”。然而,这甚至没有开始包含它成为大数据空间中如此突出的参与者的原因。Apache Spark是一个分布式计算平台,大数据公司的采用率一直在以惊人的速度增长。
Spark Architecture
spark的架构如下:
Spark是一个分布式处理引擎,但它没有自己的资源分布式存储和集群管理器。它运行在开箱即用的集群资源管理器和分布式存储之上。
Spark核心有两个部分:
- 核心API:非结构化API(RDD),结构化API(DataFrame,数据集)。可用于Scala,Python,Java和R.
- 计算引擎:内存管理,任务调度,故障恢复,与Cluster Manager交互。
注意:我们将在本文末尾看到Java中的Core API实现。
在核心API之外,Spark提供:
- Spark SQL:通过类似查询的SQL与结构化数据交互。
- 流:消费和处理连续的数据流。
- MLlib:机器学习库。但是,我不建议在这里培训深度学习模型。
- GraphX:典型的图处理算法。
以上四种都直接依赖于用于分布式计算的spark核心API。
Spark的优点
- Spark为批处理,结构化数据处理,流媒体等提供了统一的平台。
- 与Hadoop的map-reduce相比,spark代码更易于编写和使用。
- Spark最重要的特性,它抽象了并行编程方面。Spark核心抽象了分布式存储,计算和并行编程的复杂性。
Apache Spark的主要用例之一是大规模数据处理。我们创建程序并在spark集群上执行它们。
计划在集群上的执行
主要有两种方法在spark集群上执行程序:
- 互动客户,如spark-shell,py-spark,笔记本等。
- 提交一份工作。
大多数开发过程都发生在交互式客户端上,但是当我们必须将我们的应用程序投入生产时,我们使用提交作业方法。
对于长时间运行的流作业或定期批处理作业,我们打包应用程序并将其提交给Spark集群以供执行。
Spark是一种分布式处理引擎,遵循主从架构。在spark术语中,master是* Driver(驱动程序)*,slave是执行者(executors)。
Driver负责:
- 分析
- 分布。
- 监测。
- 调度。
- 在Spark过程的生命周期内保持所有必要的信息。
执行程序(executors)仅负责执行驱动程序(Driver)分配给它们的部分代码,并将状态报告给驱动程序。
每个Spark过程都有一个单独的Driver和独有的执行程序(executors)。
执行方式
客户端模式:驱动程序是您提交应用程序的本地VM。默认情况下,spark以客户端模式提交所有应用程序。由于驱动程序是整个Spark过程中的主节点,因此在生产设置中,建议不要这样做。对于调试,使用客户端模式更有意义。
群集模式:驱动程序是群集中的执行程序之一。在spark-submit中,您可以按如下方式传递参数:
--deploy-mode cluster
群集资源管理器
Yarn和Mesos是常用的集群管理器。
Kubernetes是一个通用的容器协调器。
注意:Kubernetes上的Spark不是生产就绪的。
Yarn是最受欢迎的Spark资源管理器,让我们看看它的内在工作:
在客户端模式应用程序中,驱动程序(Driver)是我们的本地VM,用于启动spark应用程序:
步骤1:一旦Driver启动Spark会话请求就转到Yarn以创建Yarn应用程序。
第2步: Yarn Resource Manager创建一个Application Master。对于客户端模式,AM充当执行程序(executor)启动器。
步骤3: AM将联系Yarn Resource经理以请求进一步的容器。
步骤4:资源管理器将分配新容器,AM将在每个容器中启动执行程序(executor)。之后,执行程序(executor)直接与驱动程序(Driver)通信。
注意:在群集模式下,驱动程序(Driver)在AM中启动。
执行程序和内存调整
硬件 - 6个节点,每个节点16个内核,64 GB RAM
让我们从核心数量开始。核心数表示执行程序可以运行的并发任务。研究表明,任何具有超过5个并发任务的应用程序都会导致糟糕的表现。因此,我建议坚持5。
注意:上述数字来自执行程序的性能,而不是系统具有的核心数。因此,32核系统也将保持不变。
1核心操作系统和Hadoop守护进程需要1 GB RAM。因此我们留下了63 GB Ram和15 Core。
对于15个核心,每个节点可以有3个executors。这总共给了我们18个executors。AM Container需要1个executors。因此我们可以得到17位executors。
到内存了,每个executors得到63/3 = 21 GB。但是,在计算完整内存请求时需要考虑很小的开销。
Formula for that over head = max(384, .07 * spark.executor.memory)
Calculating that overhead = .07 * 21 = 1.47
因此内存降至约19 GB。
因此系统变成:
--num-executors 17 --executor- memory 19G --executor-cores 5
注意:如果我们需要更少的内存,我们可以减少内核数量以增加执行程序(executor)的数量。
Spark Core
现在我们来看一下Spark提供的一些核心API。Spark需要一个数据结构来保存数据。我们有三个备选方案RDD,DataFrame和Dataset。从Spark 2.0开始,建议仅使用Dataset和DataFrame。这两个内部编译到RDD本身。
这三个是弹性,分布式,分区和不可变的数据集合。
任务: Spark中最小的工作单元,由执行程序(executor)执行。
数据集提供两种类型的操作:
转换:从现有数据集创建新数据集。它很懒惰,数据仍然是分布式的。
操作: Action将数据返回给驱动程序(driver),本质上是非分布式的。对数据集的操作会触发作业。
随机和排序:重新分区数据集以对其执行操作。它是spark的抽象,我们不需要为它编写代码。这项活动需要一个新阶段。
通用行为和转换
1)lit,geq,leq,gt,lt
lit:创建一个文字值的列。可用于与其他列进行比较。
geq(大于等于),leq(小于等于),gt(大于),lt(小于):用于与其他列值进行比较。例如:
// [https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java](https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java)
// Filter dataset with GENERIC_COL value >= 0
Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).geq(0));
// Filter dataset with GENERIC_COL value != "qwerty"
Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).notEqual(lit("QWERTY")));
2)join(加入)
Spark让我们以各种方式加入数据集。将尝试用示例示例进行解释
// [https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java](https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java)
new StructField("id", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("value", DataTypes.StringType, true, Metadata.empty()),
};
StructType structType = new StructType(structFields);
List<Row> rowListA = new ArrayList<>();List<Row> rowListB = new ArrayList<>();
rowListA.add(RowFactory.create(1, "A1"));rowListA.add(RowFactory.create(2, "A2"));
rowListA.add(RowFactory.create(3, "A3"));rowListA.add(RowFactory.create(4, "A4"));
rowListB.add(RowFactory.create(3, "A3"));rowListB.add(RowFactory.create(4, "A4"));
rowListB.add(RowFactory.create(4, "A4_1"));rowListB.add(RowFactory.create(5, "A5"));
rowListB.add(RowFactory.create(6, "A6"));
// Create 2 sample dataset
dsA = sparkSession.createDataFrame(rowListA, structType);
dsB = sparkSession.createDataFrame(rowListB, structType);
String[] typesOfJoins = {"inner", "outer", "full", "full_outer", "left",
"left_outer", "right", "right_outer", "left_semi", "left_anti"};
//Print both the dataset
System.out.println("Dataset A");
dsA.show();
System.out.println("Dataset B");
dsB.show();
//Print all possible types of join
for (int i = 0; i < typesOfJoins.length; i++) {
System.out.println(typesOfJoins[i].toUpperCase());
dsA.join(dsB, dsA.col("id").equalTo(dsB.col("id")), typesOfJoins[i]).drop(dsB.col("id")).show();
}
结果如下:
Dataset A Dataset B
+---+-----+ +---+-----+
| id|value| | id|value|
+---+-----+ +---+-----+
| 1| A1| | 3| A3|
| 2| A2| | 4| A4|
| 3| A3| | 4| A4_1|
| 4| A4| | 5| A5|
+---+-----+ | 6| A6|
+---+-----+
----------------------------------------------------------------------------------------------------------------------------
INNER JOIN OUTER JOIN FULL JOIN
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| id|value|value| | id|value|value| | id|value|value|
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| 3| A3| A3| | 1| A1| null| | 1| A1| null|
| 4| A4| A4_1| | 2| A2| null| | 2| A2| null|
| 4| A4| A4| | 3| A3| A3| | 3| A3| A3|
+---+-----+-----+ | 4| A4| A4| | 4| A4| A4_1|
| 4| A4| A4_1| | 4| A4| A4|
| 5| null| A5| | 5| null| A5|
| 6| null| A6| | 6| null| A6|
+---+-----+-----+ +---+-----+-----+
FULL_OUTER JOIN LEFT JOIN LEFT_OUTER JOIN
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| id|value|value| | id|value|value| | id|value|value|
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| 1| A1| null| | 1| A1| null| | 1| A1| null|
| 2| A2| null| | 2| A2| null| | 2| A2| null|
| 3| A3| A3| | 3| A3| A3| | 3| A3| A3|
| 4| A4| A4_1| | 4| A4| A4_1| | 4| A4| A4_1|
| 4| A4| A4| | 4| A4| A4| | 4| A4| A4|
| 5| null| A5| +---+-----+-----+ +---+-----+-----+
| 6| null| A6|
+---+-----+-----+
RIGHT JOIN RIGHT_OUTER JOIN LEFT_SEMI JOIN
+---+-----+-----+ +---+-----+-----+ +---+-----+
| id|value|value| | id|value|value| | id|value|
+---+-----+-----+ +---+-----+-----+ +---+-----+
| 3| A3| A3| | 3| A3| A3| | 3| A3|
| 4| A4| A4| | 4| A4| A4_1| | 4| A4|
| 4| A4| A4_1| | 4| A4| A4| +---+-----+
| 5| null| A5| | 5| null| A5|
| 6| null| A6| | 6| null| A6|
+---+-----+-----+ +---+-----+-----+
LEFT_ANTI JOIN
+---+-----+
| id|value|
+---+-----+
| 1| A1|
| 2| A2|
+---+-----+
3)union(联合)
Spark联合函数允许我们在两个数据集之间建立联合。数据集应该具有相同的模式。
4)window(窗口)
Spark中的基本功能之一。它允许您基于一组称为Frame的行计算表的每个输入行的返回值。
Spark为翻滚窗口,希望窗口,滑动窗口和延迟窗口提供API。
我们将它用于排名,总和,普通旧窗口等。一些用例是:
// Window Function different use cases
//Ranking
//Filter top SOME_INTEGER_VAL_COL
WindowSpec w1 = Window.orderBy(col(SOME_COUNT_COL).desc());
inputDataset = inputDataset.select(col(SOME_COL), rank().over(w1).as(RANK_COL)).filter(col(RANK_COL).leq(SOME_INTEGER_VAL));
//Get values over a moving Window
//Helpful for calculating Directed Graphs in data based on time, moving average etc.
mainDataset = mainDataset.groupBy(col(SOME_COL), window(col(DATE_TIME_COL), INTERVAL_STRING))
.agg(collect_list(struct(col(SOME_COL), col(DATE_TIME_COL))).as(SOME_OTHER_COL));
// Above program creates a directed graph based on window
//Getting sum over some col
WindowSpec w2 = Window.groupBy(col(SOME_COL));
rowDataset = rowDataset.select(col(SOME_COL_A), sum(col(SOME_COL_B).over(w2)).as(SOME_COL_C));
其他功能(如lag,lead等)允许您执行其他操作,使您可以对数据集执行复杂的分析。
但是,如果仍需要对数据集执行更复杂的操作,则可以使用UDF。UDF的使用示例:
//UDF
//Some class to calculate similarity between two Wrapped Array with Schema
public class SimilarityCalculator implements AnalyticsUDF {
private Double varACoff;
private Double varBCoff;
private Double varCCoff;
private Double varDCoff;
public SimilarityCalculator(Double varACoff, Double varBCoff, Double varCCoff, Double varDCoff) {
this.varACoff = varACoff;
this.varBCoff = varBCoff;
this.varCCoff = varCCoff;
this.varDCoff = varDCoff;
}
public Double doCal(WrappedArray<GenericRowWithSchema> varA,
WrappedArray<GenericRowWithSchema> varB) {
int lenA = varA.length();
int lenB = varB.length();
double ans = 0;
double temp;
Map<String, Double> mapA = new HashMap<>();
Map<String, Double> mapB = new HashMap<>();
for (int i = 0; i < lenA; i++) {
mapA.put(varA.apply(i).getString(0), varA.apply(i).getDouble(1));
}
for (int i = 0; i < lenB; i++) {
mapB.put(varB.apply(i).getString(0), varB.apply(i).getDouble(1));
}
for (int i = 0; i < lenA; i++) {
if (mapB.containsKey(varA.apply(i).getString(0))) {
temp = varA.apply(i).getDouble(1) - mapB.get(varA.apply(i).getString(0));
ans += temp * temp;
} else {
ans += varA.apply(i).getDouble(1) * varA.apply(i).getDouble(1);
}
}
for (int i = 0; i < lenB; i++) {
if (!mapA.containsKey(varB.apply(i).getString(0))) {
ans += varB.apply(i).getDouble(1) * varB.apply(i).getDouble(1);
}
}
return ans;
}
public Double doCal(Double varA, Double varB, Double varC, Double varD) {
return Math.sqrt(varA * varACoff + varB * varBCoff + varC * varCCoff + varD * varDCoff);
}
public String getName() {
return "L2DistanceScore";
}
public String getSecondName() {
return "MergeScore";
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// At the Spark Session Level
sparkSession.udf().register(similarityCalc.getName(),
(WrappedArray<GenericRowWithSchema> colA, WrappedArray<GenericRowWithSchema> colB) -> similarityCalc
.doCal(colA, colB), DataTypes.DoubleType);
sparkSession.udf().register(similarityCalc.getSecondName(),
(Double a, Double b, Double c, Double d) -> similarityCalc.doCal(a, b, c, d),
DataTypes.DoubleType);
注意:使用UDF应该是最后的手段,因为它们没有针对Spark进行优化; 他们可能需要更长的时间来执行死刑。建议在UDF上使用本机spark函数。
这只是Apache Spark的冰山一角。它的实用程序扩展到各种领域,不仅限于数据分析。观看这个空间了解更多。
参考
- https://www.youtube.com/watch?v=AYZCpxYVxH4&list=PLkz1SCf5iB4dXiPdFD4hXwheRGRwhmd6K
- https://stackoverflow.com/questions/37871194/how-to-tune-spark-executor-number-cores-and-executor-memory
- https://medium.com/@Farox2q/udfs-vs-map-vs-custom-spark-native-functions-91ab2c154b44
- https://stackoverflow.com/questions/45990633/what-are-the-various-join-types-in-spark