定义
Spark是一个高效,通用的大数据处理引擎。
背景
- 2009年,Spark诞生于伯克利大学AMPLab,最初属于伯克利大学的研究性项目。
- 2010年,正式开源。
- 2013年,成为了Apache基金项目,同年,基于spark的开源商业公司Databricks成立。
- 2014年,成为Apache基金的顶级项目。
MapReduce & Spark
七个MapReduce作业意味着需要七次读取和写入HDFS,而它们的输入输出数据存在关联,七个作业输入输出数据关系如下图。
基于MapReduce实现此算法存在以下问题:
- 为了实现一个业务逻辑需要使用七个MapReduce作业,七个作业间的数据交换通过HDFS完成,增加了网络和磁盘的开销。
- 七个作业都需要分别调度到集群中运行,增加了Gaia集群的资源调度开销。
- MR2和MR3重复读取相同的数据,造成冗余的HDFS读写开销。
这些问题导致作业运行时间大大增长,作业成本增加。相比与MapReduce编程模型,Spark提供了更加灵活的DAG(Directed Acyclic Graph) 编程模型, 不仅包含传统的map、reduce接口, 还增加了filter、flatMap、union等操作接口,使得编写Spark程序更加灵活方便。使用Spark编程接口实现上述的业务逻辑如下图所示。
相对于MapReduce,Spark在以下方面优化了作业的执行时间和资源使用。
- DAG编程模型。 通过Spark的DAG编程模型可以把七个MapReduce简化为一个Spark作业。Spark会把该作业自动切分为八个Stage,每个Stage包含多个可并行执行的Tasks。Stage之间的数据通过Shuffle传递。最终只需要读取和写入HDFS一次。减少了六次HDFS的读写,读写HDFS减少了70%。
- Spark作业启动后会申请所需的Executor资源,所有Stage的Tasks以线程的方式运行,共用Executors,相对于MapReduce方式,Spark申请资源的次数减少了近90%。
- Spark引入了RDD(Resilient Distributed Dataset)模型,中间数据都以RDD的形式存储,而RDD分布存储于slave节点的内存中,这就减少了计算过程中读写磁盘的次数。RDD还提供了Cache机制,例如对上图的rdd3进行Cache后,rdd4和rdd7都可以访问rdd3的数据。相对于MapReduce减少MR2和MR3重复读取相同数据的问题。
附(spark统计字符串代码)
WordCount.java
public class WordCount {
// 比较器,其中的Tuple2是模仿的scala写法,
// 诸如此类的还有Tuple3,Tuple4,Tuple22
public static class TupleComparator implements Comparator<Tuple2<String, Integer>>, Serializable {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o2._2.compareTo(o1._2);
}
}
public static void main(String[] args) throws InterruptedException {
// 使用local模式,不需要启动spark集群
SparkConf sparkConf = new SparkConf().setAppName("wordCount ").setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> file = ctx.textFile("分析的文件路径", 6);
file.persist(StorageLevel.MEMORY_ONLY());
file.cache();
Comparator<Tuple2<String, Integer>> orderCompare = new TupleComparator();
List<Tuple2<String, Integer>> wordToCounts = file
.flatMap(line -> Arrays.asList(line.split("")).iterator())
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))//把分割的内容作为key,1作为初始值
.reduceByKey((s1, s2) -> s1 + s2)// 将相同的key进行reduce,并将value相加
.takeOrdered(50, orderCompare);
wordToCounts.forEach(line -> System.out.println(line._1() + ":" + line._2()));
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>liao</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>wordcount</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
<!--<artifactId>hadoop-client</artifactId>-->
<!--<version>2.6.5</version>-->
<!--<scope>compile</scope>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ansj</groupId>
<artifactId>ansj_seg</artifactId>
<version>5.1.6</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>