Spark概述

前言

内容:

Spark Core -- 离线处理
Spark SQL -- 离线处理、交互
Spark Streaming -- 实时
Spark Graphx -- 图处理
Spark原理

MapReduce、Spark、Flink(实时)=> 3代计算引擎,昨天、今天、未来

MapReduce、Spark:类MR引擎,底层原理非常相似,数据分区、MapTask、ReduceTask、Shuffle

Spark Core

Spark概述

什么是Spark

官网

Spark是一个快速、通用的计算引擎,特点:

  • 速度快

    与MR相比,Spark基于内存的运算要快100倍,基于硬盘的运算也要快10倍以上,Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据

  • 使用简单

    支持多种语言,支持超过80中高级算法,还支持交互式的Python和Scala的shell,可以方便的在这些shell中使用Spark集群来验证解决问题

  • 通用

    SparK提供了统一的解决方案,Spark可以用于批处理、交互式查询(Spark SQL)实时流处理(Spark Streaming)、机器学习(Spark MLib)和图计算(Graphx)。

  • 兼容好

    可以非常方便的与其他开源产品进行融合,可以使用YARN、Mesos作为他的资源管理和调度,可以处理所以Hadoop支持的数据等。Spark也可以不依赖第三方资源管理和调度器,它实现了Standalone作为内置的资源管理和调度框架

Spark与Hadoop

从狭义的角度上看:Hadoop是分布式框架。有存储、计算、资源调度三个部分组成

Spark是一个分布式计算引擎,由Scala语言编写的计算框架,基于内存的快速、通用、可扩展的大数据分析引擎

从广义上来看,Spark是Hadoop生态不可或缺的一部分

MapReduce的不足:

  • 表达能力有限
  • 磁盘IO开销大
  • 延迟高
    • 任务之间衔接由IO开销
    • 前一个任务未完成,后一个任务无法开始,难以胜任复杂的、多阶段的计算任务


      Spark与Hadoop1.png
Spark与Hadoop2.png

Spark在借鉴的MR的优点的同时,很好的解决了MR面临的问题

MR Spark
数据存储结构:磁盘HDFS文件系统的Split 使用内存构建弹性分布式数据集RDD对数据进行运算和cache
编程范式:Map+Reduce仅提供两个操作,表达力欠缺 提供了丰富的操作,是数据处理逻辑的代码非常简短
计算中间结果需要落到磁盘,IO及序列化、反序列化代价大 计算中间结果在内存中,维护存取速度比磁盘高几个数量级
Task以进程的方式维护,需要数秒时间才能启动 Task以线程的方式维护,对于小数据集读取能够达到亚秒级的延迟

备注:Spark计算模式也属于MR,Spark是对MR框架的优化

在实际应用中,大数据应用有以下三种类型

  • 批量处理(离线处理):通常时间跨度在数十分钟到数小时之间
  • 交互式处理:通常时间跨度在数十秒到十分钟之间
  • 流处理(实时处理):通常时间跨度在数百毫秒到数秒之间

当同时存在以上三种情况的时候,Hadoop框架需要同时部署多种不同的软件:MR/Hive或Impala/Storm。这样做难免会带来一些问题:

  • 不同场景之间输入输出数据无法做到无缝共享,通常需要进行数据转换
  • 不同软件需要不同的开发和维护团队,带来较高的使用成本
  • 比较难以对同一个集群中的各个系统进行统一的资源协调和分配

Spark所提供的生态系统足以解决上述三种场景,即同时支持批处理、交互式查询和流数据处理

  • Spark的设计遵循“一个软件栈满足不同的应用场景”的理念(all in one),逐渐形成了一套完整的生态系统。
  • 即能够在提供内存计算框架,也可以支持SQL即席查询、实时流式计算、机器学习和图计算
  • Spark可以部署在资源管理器YARN之上,提供一站式的大数据解决方案。

Spark为什么比MR快

  1. Spark积极使用内存。MR框架中的一个Job只能拥有一个MapTask任务和一个ReduceTask任务,如果业务复杂,一个Job就表达不出来,需要多个Job组合起来,然后前一个Job需要写到HDFS中,才能交给后面的Job,在MR中,这样的运算涉及到多次写入和读取操作,Spark框架则可以把多个MapTask和ReduceTask组合在一起连续执行,中间的计算结果不需要落地

    复杂的MR任务:mr + mr + mr+mr ....

    复杂的Spark任务: mr -> mr -> mr ....

  2. 多进程模型(MR)vs多线程模型(Spark),MR框架中的MapTask和ReduceTask是进程级别的,都是JVM进程,每次启动都需要重新申请资源消耗了不必要的时间,而SparkTask是基于线程模型,通过复用线程池的线程来减少启动、关闭Task所需要的系统开销

系统架构

Spark 运行架构包括:

  • Cluster Manager:是集群资源的管理者。Spark支持3种集群模式(Standalone、Yarn、Mesos)
  • Worker Node:工作节点,管理本地资源
  • Driver Program:运行应用的main()方法并且创建了SparkContext。由ClusterManager分配资源,SparkContext发送Task到Executor上执行
  • Executor:在工作节点上运行,执行Driver发送的Task,并向Dirver汇报计算结果


    系统架构.png

Spark集群部署模式

Spark支持3种不是模式:Standalone、Yarn、Mesos

Standalone模式
  • 独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统,从一定程度上说,该模式是其他两种的基础
  • Cluster Manager:Master
  • Worker Node:Worker
  • 仅支持粗粒度的资源分配方式
Spark on Yarn 模式
  • Yarn拥有强大的社区支持,且逐步已经成为大数据集群资源管理系统的标准
  • 在国内生产环境运用最广泛的部署模式
  • Spark on Yarn的支持两种模式
    • Yarn-Cluster:适用于生产环境
    • Yarn-Client:适用于交互、调试,希望立即看到App的输出
  • Cluster Manager:ResourceManager
  • Worker Node:NodeManager
  • 仅支持粗粒度的资源分配方式
Spark in Mesos模式
  • 官方推荐的模式。
  • 运行在Mesos比运行在Yarn上更灵活、更自然
  • Cluster Manager:Mesos Master
  • Worker Node:Mesos Slave
  • 支持粗粒度、细粒度的资源分配方式
总结

粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Driver和多个Executor组成,每个Executor占用若干资源,内部可运行多个Task。程序正式运行之前,需要将所有的资源申请好,且运行的工程中一直占有,即使不使用,也是等到任务结束在回收资源

细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,核心思想就是按需分配。

三种集群部署模式如何选择:

  • 生产环境选择Yarn,国内使用最广的模式
  • Spark的初学者,Standalone,简单
  • 开发测试环境,可选择Standalone
  • 数据量不会太大、应用不是太复杂,建议可以从Standalone模式开始
  • mesos不会设计

相关术语

官方文档

  • Application

    用户提交的Spark应用程序,由集群中的一个Driver和许多executor组成

  • Application Jar

    一个包含Spark应用程序的Jar,Jar不应该包含Spark或Hadoop的jar,这些jar应该在运行中添加

  • Driver Program

    运行应用程序的main(),并创建SparkContext(Spark应用程序)

  • Cluster Manager

    管理集群资源的服务,如Standalone,Mesos,Yarn

  • Deploy Mode

    区分Driver进程在何处运行,在Cluster模式下,在集群内部运行Driver。在Client模式下,Driver在集群外部运行。

  • Worker node

    运行应用程序的工作节点

  • Executor

    运行应用程序Task和保存数据,每个应用程序都有自己的Executors,并且各个executor相互独立

  • Task

    Executors应用程序的最小运行单元

  • Job

    在用户程序中,每次调用Action函数都会产生一个新的Job,也就是说,每个Action会生成一个Job

  • Stage

    一个Job会被分解成多个Stage,每个Stage是一系列的Task集合

Spark安装配置

Spark安装

官网地址

文档地址

下载地址

下载Spark安装包

安装步骤

1、下载后解压缩

cd /opt/lagou/software/
tar zxvf spark-2.4.5-bin-without-hadoop-scala-2.12.tgz
mv spark-2.4.5-bin-without-hadoop-scala-2.12/ ../servers/spark- 2.4.5/

2、设置环境变量,并使其生效

vi /etc/profile
export SPARK_HOME=/opt/lagou/servers/spark-2.4.5
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source /etc/profile

3、修改配置

文件位置:$SPARK_HOME/conf,赋值需要的文件

cp slaves.template slaves
cp spark-defaults.conf.template spark-defaults.conf
cp spark-env.sh.template spark-env.sh
cp log4j.properties.template log4j.properties

修改slaves

linux121
linux122
linux123

vim spark-defaults.conf

#Spark的master节点的地址和端口号
spark.master                     spark://linux121:7077
#是否做日志的聚合
spark.eventLog.enabled           true
#聚合日志在HDFS的目录,如果放在HDFS需要启动HDFS,并提前创建好目录spark-eventlog
spark.eventLog.dir               hdfs://linux121:9000/spark-eventlog
# 序列化方式
spark.serializer                 org.apache.spark.serializer.KryoSerializer
# Driver内存,默认是1G
spark.driver.memory              512m

修改spark-env.sh

export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_231
export HADOOP_HOME=/opt/lagou/servers/hadoop-2.9.2
export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop
#这里使用的是 spark-2.4.5-bin-without-hadoop,所以要将 Hadoop 相关 jars 的位置告诉Spark
export SPARK_DIST_CLASSPATH=$(/opt/lagou/servers/hadoop-2.9.2/bin/hadoop classpath)
export SPARK_MASTER_HOST=linux121
export SPARK_MASTER_PORT=7077

4、将Spark分发到其他两个机器,并修改环境变量,让环境变量生效

rsync-script spark-2.4.5

5、启动集群

cd $SPARK_HOME/sbin
./start-all.sh

分别在linux121、linux122、linux123上执行 jps,可以发现:

linux121:Master、Worker

linux122:Worker

linux123:Worker

此时 Spark 运行在 Standalone 模式下。

在浏览器中输入:http://linux121:8080/

Spark搭建.png

备注:在$HADOOP_HOME/sbin$SPARK_HOME/sbin 下都有 start-all.sh 和 stop-all.sh 文件

whereis start-all.sh.png

此时直接执行start-all或stop-all的时候,回直接先执行hadoop的start-all,有冲突

解决方案:

  • 删除一组start-all/stop-all命令,让另一个生效
  • 对其中一组重命名
  • 将其中一个的sbin目录不放到path中

6、集群测试

# 计算派:3.14……
run-example SparkPi 10 
# 启动shell脚本
spark-shell

#读取HDFS 文件,并进行单词统计
val lines = sc.textFile("/azkaban-wc/wc.txt")
lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)

Spark不是一定要依赖HDFS的,只有用到了HDFS,才需要HDFS

Apache Spark支持多种部署模式。最简单的就是单机本地模式(Spark所有进程都运 行在一台机器的JVM中)、伪分布式模式(在一台机器中模拟集群运行,相关的进程 在同一台机器上)。分布式模式包括:Standalone、Yarn、Mesos。

Apache Spark支持多种部署模式:

  • 本地模式。最简单的运行模式,Spark所有进程都运行在一台机器的 JVM 中
  • 伪分布式模式。在一台机器中模拟集群运行,相关的进程在同一台机器上(用的 非常少)
  • 分布式模式。包括:Standalone、Yarn、Mesos
    • Standalone。使用Spark自带的资源调度框架
    • Yarn。使用 Yarn 资源调度框架
    • Mesos。使用 Mesos 资源调度框架

本地模式

部署在单机,主要用于测试或实验,最简单的运行模式,所有的进程都运行在一个JVM中。本地模式用单机的多个线程来模拟Spark分布式计算,通常用来验证开发出来的应用程序逻辑上又饿密友问题,这种模式非常简单,只需要包Spark安装包解压,修改一些常用配置即可,不用启动Master、Worker守护进程,也不用启动HDFS,除非用到。

  • Spark-shell --master local:在本地启动一个线程来运行作业
  • spark-shell --master local[N]:启动N个线程
  • spark-shell --master local[*]:使用系统所有的核
  • spark-shell --master local[N,M]:N:代表启动的核数,M代表容许作业失败的次数

前面几种模式没有指定M,默认是为1

测试:

1、启动 Spark 本地运行模式,此时JPS后只有一个SparkSubmit进程

spark-shell --master local

2、 测试

#读取本地文件,懒加载,如果文件不存在,此时不会报错,等到真正使用的时候才会报错
val lines = sc.textFile("file:///root/mess.txt")
# 计算行数
lines.count

备注:如果不需要使用HDFS,可以将 spark-defaults.conf相关的日志聚合部分注释

伪分布式模式

伪分布式模式:在一台机器中模拟集群运行,相关的进程在同一台机器上;

备注:不用启动集群资源管理服务;

  • local-cluster[N,cores,memory]
    • N模拟集群的 Slave(或worker)节点个数
    • cores模拟集群中各个Slave节点上的内核数
    • memory模拟集群的各个Slave节点上的内存大小

备注:参数之间没有空格,memory不能加单位

1、启动 Spark 伪分布式模式

spark-shell --master local-cluster[4,2,1024]

2、使用 jps 检查,发现1个 SparkSubmit 进程和4个 CoarseGrainedExecutorBackend 进程SparkSubmit依然充当全能角色,又是Client进程,又是Driver程序,还有资源管理 的作用。4个CoarseGrainedExecutorBackend,用来并发执行程序的进程。

3、执行简单的测试程序

park-submit --master local-cluster[4,2,1024] --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.11-2.4.5.jar 10

备注:

  • local-cluster[4,2,1024],参数不要给太大,资源不够
  • 这种模式少用,有Bug。SPARK-32236

集群模式-Standalone模式

参考文档

  • 分布式部署才能提现分布式计算的价值
  • 与单机运行模式不同,这里必须先启动Spark的Master和Worker守护进程,关闭Yarn
  • 不用启动Hadoop服务,除非需要
Standalone配置
  • sbin/start-master.sh / sbin/stop-master.sh 启动关闭master节点
  • sbin/start-slaves.sh / sbin/stop-slave.sh 启动关闭当前服务器的worker节点
  • sbin/start-slave.sh / sbin/stop-slaves.sh 启动关闭所有服务器的worker节点
  • sbin/start-all.sh / sbin/stop-all.sh 启动关闭所有的master和worker节点

备注:./sbin/start-slave.sh [options];启动节点上的worker进程,调试中较为常用

在 spark-env.sh 中定义:


Standalone配置.png

SPARK_WORKER_CORES:Total number of cores to allow Spark applications to use on the machine (default: all available cores).

SPARK_WORKER_MEMORY:Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m , 2g (default: total memory minus 1 GiB); note that each application's individual memory is configured using its spark.executor.memory property.

可以在spark-eh.sh中修改改参数

export SPARK_WORKER_CORES=10
export SPARK_WORKER_MEMORY=20g
运行模式

最大的区别:Driver运行在哪里。

  • client模式:(缺省,默认的),Driver提交任务给Client,此时可以在Client模式下,可以看见返回结果,适合交互和调试
  • Cluster模式:Driver运行在Spark集群中,看不见程序的返回结果(可以在Driver运行所在的节点的work目录下看到),适合生产环境


    运行模式.png

测试1(Client模式)

spark-submit --class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 5000

再次使用jps检查集群中的进程:

  • Master进程作为cluster manager,管理集群资源
  • Worker管理节点资源
  • SparkSubmit最为Client段,不再Spark集群中,运行Driver程序,SparkApplication执行完成,进程终止
  • CoarseGrainedExecutorBackend运行在Worker上,用来并发执行应用程序

测试2(Cluster模式)

spark-submit --class org.apache.spark.examples.SparkPi \
--deploy-mode cluster \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 5000
  • SparkSubmit进程会在应用程序提交给集群之后就退出
  • Master会在集群中选择一个Worker进程生成一个子进程DriverWrapper来启动Driver程序
  • Worker节点上会启动CoarseGrainedExecutorBackend
  • DriverWrapper进程会被占用Worker进程的一个core(缺省分配1C1G)
  • 应用程序结果,会在执行Driver程序的节点的stdout输出,而不是打印在屏幕上

在启动 DriverWrapper 的节点上,进入 $SPARK_HOME/work/,可以看见类似 driver-20200810233021-0000 的目录,这个就是 driver 运行时的日志文件,进入 该目录,会发现:

  • jar 文件,这就是移动的计算

  • stderr 运行日志

  • stdout 输出结果

History Server

修改spark-defaults.conf

spark.eventLog.compress     true

修改 spark-env.sh

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://linux121:9000/spark-eventlog"

spark.history.retainedApplications。设置缓存Cache中保存的应用程序历史记录的 个数(默认50),如果超过这个值,旧的将被删除;缓存文件数不表示实际显示的文件总数,只是表示不再缓存中的文件可能需要从硬盘中读取,速度有差别

前提条件:启动hdfs(日志需要写入到HDFS)

启动historyserver,使用 jps 检查,可以看见 HistoryServer 进程。如果看见该进程,请检查对应的日志。

$SPARK_HOME/sbin/start-history-server.sh

web端地址:http://hhb:18080/

高可用配置

Spark Standalone集群是 Master-Slaves架构的集群模式,和大部分的Master- Slaves结构集群一样,存着Master单点故障的问题。如何解决这个问题,Spark提供 了两种方案:

1、基于ZK的Standby Master,适用于生产模式。将 Spark 集群连接到 Zookeeper,利用 Zookeeper 提供的选举和状态保存的功能,一个 Master 处于 Active 状态,其他 Master 处于Standby状态;保证在ZK中的元数据主要是集群的信息,包括:Worker,Driver和Application以及 Executors的信息;如果Active的Master挂掉了,通过选举产生新的 Active 的 Master,然后执行状态恢 复,整个恢复过程可能需要1~2分钟;

2、基于文件系统的单点恢复(Single-Node Rcovery with Local File System), 主要用于开发或者测试环境。将 Spark Application 和 Worker 的注册信息保存在文 件中,一旦Master发生故障,就可以重新启动Master进程,将系统恢复到之前的状态

基于ZK的Standby Master.png

配置步骤

1、启动ZK

2、修改 spark-env.sh 文件,并分发到集群中

# 注释以下两行!!!
# export SPARK_MASTER_HOST=linux120
# export SPARK_MASTER_PORT=7077

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=linux121,linux122,linux123 -Dspark.deploy.zookeeper.dir=/spark"

备注:

  • spark.deploy.recoveryMode:可选值 Zookeeper、FileSystem、None
  • deploy.zookeeper.url:Zookeeper的URL,主机名:端口号(缺省2181)
  • deploy.zookeeper.dir:保存集群元数据信息的地址,在ZooKeeper中保存该信 息

3、启动Spark集群(Linux120)

$SPARK_HOME/sbin/start-all.sh

浏览器输入:http://hhb:8080/,刚开始 Master 的状态是STANDBY,稍等一会 变为:RECOVERING,最终是:ALIVE

4、在 linux121上启动master

$SPARK_HOME/sbin/start-master.sh

进入浏览器输入:http://linux122:8080/,此时 Master 的状态为:STANDBY

5、杀到linux120上 Master 进程,再观察 linux121 上 Master 状态,由 STANDBY => RECOVERING => ALIVE

小结:

  • 配置每个worker的core、memory
  • 运行模式:cluster、client,client为缺省模式,有返回结果,适合调试;cluster与此相反
  • History server
  • 高可用(ZK,Local File,在ZK中记录集群状态)

集群模式-Yarn模式

参考官方文档

需要启动的服务:hdfs服务、yarn服务,需要关闭的Standalone对应的服务(即集群中Master、Worker进程)一山不容二虎

在Yarn模式中,Spark应用程序有两种运行模式

  • yarn-client。Driver程序运行在客户端,适用于交互、调试,希望立即看到app 的输出

  • yarn-cluster。Driver程序运行在由RM启动的 AppMaster中,适用于生产环境

二者的主要区别:Driver在哪里

1、关闭 Standalon 模式下对应的服务;开启 hdfs、yarn、historyserver 服务

2、修改 yarn-site.xml 配置。在 $HADOOP_HOME/etc/hadoop/yarn-site.xml 中增加,分发到集群,重启 yarn服务

<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

备注:

  • yarn.nodemanager.pmem-check-enabled。是否启动一个线程检查每个任务 正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true
  • yarn.nodemanager.vmem-check-enabled。是否启动一个线程检查每个任务正 使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true

3、修改配置,分发到集群

# spark-default.conf(以下是优化)
# 与 hadoop historyserver集成 
spark.yarn.historyServer.address linux120:18080
# 添加(以下是优化)
spark.yarn.jars hdfs:///spark-yarn/jars/*.jar

上传jar包

# 将 $SPARK_HOME/jars 下的jar包上传到hdfs 
hdfs dfs -mkdir -p /spark-yarn/jars/ 
cd $SPARK_HOME/jars
hdfs dfs -put * /spark-yarn/jars/

4、测试

# client
spark-submit --master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 2000

在提取App节点上可以看见:SparkSubmit、CoarseGrainedExecutorBackend

在集群的其他节点上可以看见:CoarseGrainedExecutorBackend

在提取App节点上可以看见:程序计算的结果(即可以看见计算返回的结果)

# cluster
spark-submit --master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 2000

在提取App节点上可以看见:SparkSubmit

在集群的其他节点上可以见:CoarseGrainedExecutorBackend、ApplicationMaster(Driver运行在此)

在提取App节点上看不见最终的结果

整合HistoryServer服务

前提:Hadoop的 HDFS、Yarn、HistoryServer 正常;Spark historyserver服务正常;

Hadoop:JobHistoryServer

Spark:HistoryServer

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 20
整合HistoryServer服务1.png
整合HistoryServer服务2.png

1、修改 spark-defaults.conf,并分发到集群

spark.yarn.historyServer.address linux121:18080
spark.history.ui.port 18080

2、重启/启动 spark 历史服务

stop-history-server.sh
start-history-server.sh

3、查看日志

打开日志http://linux123:8088/cluster

备注:

1、在课程学习的过程中,大多数情况使用Standalone模式 或者 local模式
2、建议不使用HA;更多关注的Spark开发

开发环境搭建IDEA

前提:安装scala插件;能读写HDFS文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>com.hhb.bigdata</artifactId>
        <groupId>com.hhb.bigdata</groupId>
        <version>1.0</version>
        <relativePath>../bigdata/pom.xml</relativePath>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark</artifactId>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.10</scala.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.9.2</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- hive 文件-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.3.7</version>
        </dependency>

    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码:

package com.hhb.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-10-13 22:21
 **/
object WordCount {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(conf)
    //    val lines: RDD[String] = sc.textFile("hdfs://linux121:9000/azkaban-wc/wc.txt")
    //    val lines: RDD[String] = sc.textFile("/wcinput/wc.txt")
    val lines: RDD[String] = sc.textFile("data/wc.dat")
    lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
    sc.stop()
  }
}

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>
    </property>
</configuration>
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容

  • Spark内置模块 (1)Spark是一种基于内存的快速、通用、可扩展的大数据分析引擎。(2)SparkSQL、S...
    香山上的麻雀阅读 382评论 0 0
  • Apache Spark是用于分布式计算的通用框架,为批处理和交互式处理提供高性能的服务。 它公开了Java,Py...
    miaoiao阅读 260评论 0 0
  • Spark 概述Apache Spark 是一个快速的, 多用途的集群计算系统。 它在 Java, Scala, ...
    it_zzy阅读 552评论 0 3
  • 前面的文章中我们分析了Spark的部署模式和提交程序的流程,也分析了Master中schedule方法在感知到资源...
    sun4lower阅读 1,409评论 2 9
  • spark-submit的时候如何引入外部jar包 在通过spark-submit提交任务时,可以通过添加配置参数...
    博弈史密斯阅读 2,695评论 1 14