Overview
- 每一个spark程序都是有一个驱动程序组成,并且通过main函数运行。
- spark有两个重要的抽象:
- RDD,分布式弹性数据集,他是一个跨越多个节点的分布式集合。
- 另一个抽象是共享变量。spark支持两种类型的共享变量:一个是广播(broadcast variables)他可以缓存一个值在集群的各个节点。另一个是累加器(accumulators)他只能执行累加的操作,比如可以做计数器和求和。
Initializing Spark
- 运行spark程序,需要在程序中创建一个JavaSparkContext对象,但是在创建JavaSparkContext对象的时候需要指定SparkConf对象,这个对象是配置一些重要的参数。
SparkConf conf = new SparkConf.setAppName(appName).setMaster(url);
JavaSparkContext sc = new JavaSparkContext(conf);
- appName就是这个任务的名字,url代表spark集群的Url地址。这个地址不要写死,因为这样可以通过命令行来传入需要执行的spark集群,方便调试与线上集群运行。
Resilient Distributed Datasets (RDDs)
- Spark的核心就是围绕着RDD,它是一个自动容错的分布式数据集合。他有两种方式创建,第一种就是在驱动程序中对一个集合进行并行化。第二种是来源于一个外部的存储系统。比如:共享系统、HDFS、HBase或者任何提供任何Hadoop 输入格式的数据源。
- 第一种:Parallelized Collections 创建这个集合需要调用那个JavaSparkContext的parallelize方法来初始化一个已经存在的集合。
List<Integer> data = Arrays.asList(1,2,3,4,5); JavaRDD<Iteger> distData = sc.parallelize(data);
- 这就创建了一个并行的集合,在这个集合上可以执行 distData.reduce((a, b) -> a + b)
在并行数组中一个很重要的参数是partitions,它来描述数组被切割的数据集数量。Spark会在每一个partitions上运行任务,这个partitions会被spark自动设置,一般都是集群中每个CPU上运行2-4partitions,但是也可以自己设置,可以通过parallelize (e.g. sc.parallelize(data, 10)),在有些地方把partitions成为 slices。
External Datasets
- JavaRDD distFile = sc.textFile("data.txt");
textFile也可以设置partitions参数,一般都是一个block一个partitions,但是也可以自己设置,自己设置必须要不能少于block的数量。
针对Hadoop的其他输入格式,你能用这个JavaSparkContext.hadoopRDD方法,你需要设置JobConf和输入格式的类。也可以使用JavaSparkContext.newAPIHadoopRDD针对输入格式是基于“new”的MapReduceAPI
RDD Operations
RDD的操作可以分成两类:
1. transformations
2. actions
map就是一个transformation操作,他会针对数据集的每一个元素执行一个函数。然后返回一个新的RDD来替换掉原先的RDD。
reduce就是一个action操作,他能够聚合所有的元素通过执行一个函数,然后返回给驱动程序一个最终的值。
所有的transformations都是lazy操作,也就是说并不会马上计算结果,要等到执行一个action操作并且返回一个值给驱动程序的时候才会执行transformations操作。
可以调用RDD的 persist (or cache) 方法来持久化RDD,这样的话在下次使用RDD的时候就会非常快。同时也支持持久化到磁盘,或者复制到多个节点。
直接使用 rdd.foreach(println) 在local模式下是可行的,但是在cluster模式下是不行的,必须要执行collect()方法,将所有的数据拉取到本地,然后执行foreach()操作。如果是数据量比较小的话可以使用take方法,rdd.take(100).foreach(println)