Apache Spark入门 - Java架构和应用程序

https://medium.com/@atul94/getting-started-with-apache-spark-ad9d59e71f6f
Apache Spark被解释为“用于大规模数据处理的快速通用引擎”。然而,这甚至没有开始包含它成为大数据空间中如此突出的参与者的原因。Apache Spark是一个分布式计算平台,大数据公司的采用率一直在以惊人的速度增长。

Spark Architecture

spark的架构如下:


Spark生态系统

Spark是一个分布式处理引擎,但它没有自己的资源分布式存储和集群管理器。它运行在开箱即用的集群资源管理器和分布式存储之上。

Spark核心有两个部分:

  • 核心API:非结构化API(RDD),结构化API(DataFrame,数据集)。可用于Scala,Python,Java和R.
  • 计算引擎:内存管理,任务调度,故障恢复,与Cluster Manager交互。

注意:我们将在本文末尾看到Java中的Core API实现。

在核心API之外,Spark提供:

  • Spark SQL:通过类似查询的SQL与结构化数据交互。
  • 流:消费和处理连续的数据流。
  • MLlib:机器学习库。但是,我不建议在这里培训深度学习模型。
  • GraphX:典型的图处理算法。

以上四种都直接依赖于用于分布式计算的spark核心API。

Spark的优点

  • Spark为批处理,结构化数据处理,流媒体等提供了统一的平台。
  • 与Hadoop的map-reduce相比,spark代码更易于编写和使用。
  • Spark最重要的特性,它抽象了并行编程方面。Spark核心抽象了分布式存储,计算和并行编程的复杂性。

Apache Spark的主要用例之一是大规模数据处理。我们创建程序并在spark集群上执行它们。

计划在集群上的执行

主要有两种方法在spark集群上执行程序:

  1. 互动客户,如spark-shellpy-spark,笔记本等。
  2. 提交一份工作。

大多数开发过程都发生在交互式客户端上,但是当我们必须将我们的应用程序投入生产时,我们使用提交作业方法。

新闻Feed - 流媒体工作。每日YouTube分析 - 批量作业

对于长时间运行的流作业或定期批处理作业,我们打包应用程序并将其提交给Spark集群以供执行。

1.png

Spark是一种分布式处理引擎,遵循主从架构。在spark术语中,master是* Driver(驱动程序)*,slave是执行者(executors)

1.png

Driver负责:

  1. 分析
  2. 分布。
  3. 监测。
  4. 调度。
  5. 在Spark过程的生命周期内保持所有必要的信息。
    执行程序(executors)仅负责执行驱动程序(Driver)分配给它们的部分代码,并将状态报告给驱动程序。
    每个Spark过程都有一个单独的Driver和独有的执行程序(executors)。

执行方式

客户端模式:驱动程序是您提交应用程序的本地VM。默认情况下,spark以客户端模式提交所有应用程序。由于驱动程序是整个Spark过程中的主节点,因此在生产设置中,建议不要这样做。对于调试,使用客户端模式更有意义。
群集模式:驱动程序是群集中的执行程序之一。在spark-submit中,您可以按如下方式传递参数:

--deploy-mode cluster

群集资源管理器

1.png

Yarn和Mesos是常用的集群管理器。
Kubernetes是一个通用的容器协调器。

注意:Kubernetes上的Spark不是生产就绪的。

Yarn是最受欢迎的Spark资源管理器,让我们看看它的内在工作:
在客户端模式应用程序中,驱动程序(Driver)是我们的本地VM,用于启动spark应用程序:
步骤1:一旦Driver启动Spark会话请求就转到Yarn以创建Yarn应用程序。
第2步: Yarn Resource Manager创建一个Application Master。对于客户端模式,AM充当执行程序(executor)启动器。
步骤3: AM将联系Yarn Resource经理以请求进一步的容器。
步骤4:资源管理器将分配新容器,AM将在每个容器中启动执行程序(executor)。之后,执行程序(executor)直接与驱动程序(Driver)通信。


1.png

注意:在群集模式下,驱动程序(Driver)在AM中启动。

执行程序和内存调整

硬件 - 6个节点,每个节点16个内核,64 GB RAM
让我们从核心数量开始。核心数表示执行程序可以运行的并发任务。研究表明,任何具有超过5个并发任务的应用程序都会导致糟糕的表现。因此,我建议坚持5。

注意:上述数字来自执行程序的性能,而不是系统具有的核心数。因此,32核系统也将保持不变。
1核心操作系统和Hadoop守护进程需要1 GB RAM。因此我们留下了63 GB Ram和15 Core。
对于15个核心,每个节点可以有3个executors。这总共给了我们18个executors。AM Container需要1个executors。因此我们可以得到17位executors。
到内存了,每个executors得到63/3 = 21 GB。但是,在计算完整内存请求时需要考虑很小的开销。
Formula for that over head = max(384, .07 * spark.executor.memory)
Calculating that overhead = .07 * 21 = 1.47

因此内存降至约19 GB。
因此系统变成:

--num-executors 17 --executor- memory 19G --executor-cores 5

注意:如果我们需要更少的内存,我们可以减少内核数量以增加执行程序(executor)的数量。

Spark Core

现在我们来看一下Spark提供的一些核心API。Spark需要一个数据结构来保存数据。我们有三个备选方案RDD,DataFrame和Dataset。从Spark 2.0开始,建议仅使用Dataset和DataFrame。这两个内部编译到RDD本身。
这三个是弹性,分布式,分区和不可变的数据集合。


1.png

任务: Spark中最小的工作单元,由执行程序(executor)执行。
数据集提供两种类型的操作:
转换:从现有数据集创建新数据集。它很懒惰,数据仍然是分布式的。
操作: Action将数据返回给驱动程序(driver),本质上是非分布式的。对数据集的操作会触发作业。
随机和排序:重新分区数据集以对其执行操作。它是spark的抽象,我们不需要为它编写代码。这项活动需要一个新阶段。

通用行为和转换

1)lit,geq,leq,gt,lt

lit:创建一个文字值的列。可用于与其他列进行比较。
geq(大于等于),leq(小于等于),gt(大于),lt(小于):用于与其他列值进行比较。例如:

// [https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java](https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java)

// Filter dataset with GENERIC_COL value >= 0
Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).geq(0));
// Filter dataset with GENERIC_COL value != "qwerty"
Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).notEqual(lit("QWERTY")));

2)join(加入)

Spark让我们以各种方式加入数据集。将尝试用示例示例进行解释

// [https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java](https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java)

        new StructField("id", DataTypes.IntegerType, true, Metadata.empty()),
        new StructField("value", DataTypes.StringType, true, Metadata.empty()),
    };
StructType structType = new StructType(structFields);
List<Row> rowListA = new ArrayList<>();List<Row> rowListB = new ArrayList<>();

rowListA.add(RowFactory.create(1, "A1"));rowListA.add(RowFactory.create(2, "A2"));
rowListA.add(RowFactory.create(3, "A3"));rowListA.add(RowFactory.create(4, "A4"));

rowListB.add(RowFactory.create(3, "A3"));rowListB.add(RowFactory.create(4, "A4"));
rowListB.add(RowFactory.create(4, "A4_1"));rowListB.add(RowFactory.create(5, "A5"));
rowListB.add(RowFactory.create(6, "A6"));

// Create 2 sample dataset
dsA = sparkSession.createDataFrame(rowListA, structType);
dsB = sparkSession.createDataFrame(rowListB, structType);

String[] typesOfJoins = {"inner", "outer", "full", "full_outer", "left",
        "left_outer", "right", "right_outer", "left_semi", "left_anti"};

//Print both the dataset         
System.out.println("Dataset A");
dsA.show();
System.out.println("Dataset B");
dsB.show();

//Print all possible types of join
for (int i = 0; i < typesOfJoins.length; i++) {
  System.out.println(typesOfJoins[i].toUpperCase());
  dsA.join(dsB, dsA.col("id").equalTo(dsB.col("id")), typesOfJoins[i]).drop(dsB.col("id")).show();
}

结果如下:

Dataset A                       Dataset B
+---+-----+                     +---+-----+
| id|value|                     | id|value|
+---+-----+                     +---+-----+
|  1|   A1|                     |  3|   A3|
|  2|   A2|                     |  4|   A4|
|  3|   A3|                     |  4| A4_1|
|  4|   A4|                     |  5|   A5|
+---+-----+                     |  6|   A6|
                            +---+-----+
----------------------------------------------------------------------------------------------------------------------------

INNER JOIN                  OUTER JOIN                  FULL JOIN
+---+-----+-----+               +---+-----+-----+               +---+-----+-----+
| id|value|value|               | id|value|value|               | id|value|value|
+---+-----+-----+               +---+-----+-----+               +---+-----+-----+
|  3|   A3|   A3|               |  1|   A1| null|               |  1|   A1| null|           
|  4|   A4| A4_1|               |  2|   A2| null|               |  2|   A2| null|
|  4|   A4|   A4|               |  3|   A3|   A3|               |  3|   A3|   A3|
+---+-----+-----+               |  4|   A4|   A4|               |  4|   A4| A4_1|
                            |  4|   A4| A4_1|               |  4|   A4|   A4|
                            |  5| null|   A5|               |  5| null|   A5|
                            |  6| null|   A6|               |  6| null|   A6|
                        +---+-----+-----+               +---+-----+-----+

FULL_OUTER JOIN                 LEFT JOIN                   LEFT_OUTER JOIN 
+---+-----+-----+               +---+-----+-----+               +---+-----+-----+
| id|value|value|               | id|value|value|               | id|value|value|
+---+-----+-----+               +---+-----+-----+               +---+-----+-----+
|  1|   A1| null|               |  1|   A1| null|               |  1|   A1| null|
|  2|   A2| null|               |  2|   A2| null|               |  2|   A2| null|
|  3|   A3|   A3|               |  3|   A3|   A3|               |  3|   A3|   A3|
|  4|   A4| A4_1|               |  4|   A4| A4_1|               |  4|   A4| A4_1|
|  4|   A4|   A4|               |  4|   A4|   A4|               |  4|   A4|   A4|
|  5| null|   A5|               +---+-----+-----+               +---+-----+-----+
|  6| null|   A6|
+---+-----+-----+

RIGHT JOIN                  RIGHT_OUTER JOIN                LEFT_SEMI JOIN
+---+-----+-----+               +---+-----+-----+               +---+-----+
| id|value|value|               | id|value|value|               | id|value|
+---+-----+-----+               +---+-----+-----+               +---+-----+
|  3|   A3|   A3|               |  3|   A3|   A3|               |  3|   A3|
|  4|   A4|   A4|               |  4|   A4| A4_1|               |  4|   A4|
|  4|   A4| A4_1|               |  4|   A4|   A4|               +---+-----+
|  5| null|   A5|               |  5| null|   A5|
|  6| null|   A6|               |  6| null|   A6|
+---+-----+-----+               +---+-----+-----+

LEFT_ANTI JOIN
+---+-----+
| id|value|
+---+-----+
|  1|   A1|
|  2|   A2|
+---+-----+

3)union(联合)

Spark联合函数允许我们在两个数据集之间建立联合。数据集应该具有相同的模式。

4)window(窗口)

Spark中的基本功能之一。它允许您基于一组称为Frame的行计算表的每个输入行的返回值。
Spark为翻滚窗口,希望窗口,滑动窗口和延迟窗口提供API。
我们将它用于排名,总和,普通旧窗口等。一些用例是:

// Window Function different use cases

//Ranking
//Filter top SOME_INTEGER_VAL_COL
WindowSpec w1 = Window.orderBy(col(SOME_COUNT_COL).desc());
inputDataset = inputDataset.select(col(SOME_COL), rank().over(w1).as(RANK_COL)).filter(col(RANK_COL).leq(SOME_INTEGER_VAL));

//Get values over a moving Window
//Helpful for calculating Directed Graphs in data based on time, moving average etc.
mainDataset = mainDataset.groupBy(col(SOME_COL), window(col(DATE_TIME_COL), INTERVAL_STRING))
        .agg(collect_list(struct(col(SOME_COL), col(DATE_TIME_COL))).as(SOME_OTHER_COL));
// Above program creates a directed graph based on window

//Getting sum over some col
WindowSpec w2 = Window.groupBy(col(SOME_COL));
rowDataset = rowDataset.select(col(SOME_COL_A), sum(col(SOME_COL_B).over(w2)).as(SOME_COL_C));

其他功能(如lag,lead等)允许您执行其他操作,使您可以对数据集执行复杂的分析。
但是,如果仍需要对数据集执行更复杂的操作,则可以使用UDF。UDF的使用示例:

//UDF
//Some class to calculate similarity between two Wrapped Array with Schema
public class SimilarityCalculator implements AnalyticsUDF {

  private Double varACoff;
  private Double varBCoff;
  private Double varCCoff;
  private Double varDCoff;

  public SimilarityCalculator(Double varACoff, Double varBCoff, Double varCCoff, Double varDCoff) {
    this.varACoff = varACoff;
    this.varBCoff = varBCoff;
    this.varCCoff = varCCoff;
    this.varDCoff = varDCoff;
  }

  public Double doCal(WrappedArray<GenericRowWithSchema> varA,
      WrappedArray<GenericRowWithSchema> varB) {
    int lenA = varA.length();
    int lenB = varB.length();
    double ans = 0;
    double temp;
    Map<String, Double> mapA = new HashMap<>();
    Map<String, Double> mapB = new HashMap<>();
    for (int i = 0; i < lenA; i++) {
      mapA.put(varA.apply(i).getString(0), varA.apply(i).getDouble(1));
    }
    for (int i = 0; i < lenB; i++) {
      mapB.put(varB.apply(i).getString(0), varB.apply(i).getDouble(1));
    }
    for (int i = 0; i < lenA; i++) {
      if (mapB.containsKey(varA.apply(i).getString(0))) {
        temp = varA.apply(i).getDouble(1) - mapB.get(varA.apply(i).getString(0));
        ans += temp * temp;
      } else {
        ans += varA.apply(i).getDouble(1) * varA.apply(i).getDouble(1);
      }
    }
    for (int i = 0; i < lenB; i++) {
      if (!mapA.containsKey(varB.apply(i).getString(0))) {
        ans += varB.apply(i).getDouble(1) * varB.apply(i).getDouble(1);
      }
    }
    return ans;
  }

  public Double doCal(Double varA, Double varB, Double varC, Double varD) {
    return Math.sqrt(varA * varACoff + varB * varBCoff + varC * varCCoff + varD * varDCoff);
  }
  
  public String getName() {
    return "L2DistanceScore";
  }
  
  public String getSecondName() {
    return "MergeScore";
  }
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// At the Spark Session Level
sparkSession.udf().register(similarityCalc.getName(),
        (WrappedArray<GenericRowWithSchema> colA, WrappedArray<GenericRowWithSchema> colB) -> similarityCalc
            .doCal(colA, colB), DataTypes.DoubleType);
sparkSession.udf().register(similarityCalc.getSecondName(),
        (Double a, Double b, Double c, Double d) -> similarityCalc.doCal(a, b, c, d),
        DataTypes.DoubleType);

注意:使用UDF应该是最后的手段,因为它们没有针对Spark进行优化; 他们可能需要更长的时间来执行死刑。建议在UDF上使用本机spark函数。

这只是Apache Spark的冰山一角。它的实用程序扩展到各种领域,不仅限于数据分析。观看这个空间了解更多。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容