前面几节我们介绍过了Spark的环境搭建,有了环境我们就可以小试牛刀——动手开发自己的Spark程序了。本节先来介绍如何使用Java语言开发一个Spark的WordCount程序。
1.下载Spark安装目录下的所有Jar包
使用WinSCP工具将$SPARK_HOME/jars目录下的所有Jar包下载到本地目录如E:/sparklib中。
2.使用Eclipse搭建Spark程序开发环境
- 打开Eclipse工具,新建Java工程:JavaSparkWordCount
- 在JavaSparkWordCount工程下新建文件夹:lib
- 将刚才下载好的所有Jar包,复制粘贴到lib文件夹
- 选中lib文件夹下的所有Jar包,右键–>Build Path–>Add to Build Path
3.编写WordCount程序
- 在JavaSparkWordCount工程下的src文件夹新建一个Package:demo
- 在demo包下新建Java Class:JavaSparkWordCount,并生成main方法,Finish
- 编写JavaSparkWordCount.java代码如下:
package demo;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class JavaSparkWordCount {
public static void main(String[] args) {
/*
* 创建SparkContext对象需要的配置参数
* setAppName:设置应用程序名字,会显示在网页上
* 这里只设置了一个属性,也可以设置其他属性
*/
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//创建Java版的SparkContext对象:JavaSparkContext,传入配置参数
JavaSparkContext context = new JavaSparkContext(conf);
//根据传入的路径参数args[0]读入要处理的数据,每一行作为一个元素
JavaRDD<String> lines = context.textFile(args[0]);
//将所有行展平成一行,并在展平的过程中按照空格对每一行进行分词
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
//按照空格对每一行进行分词
return Arrays.asList(line.split(" ")).iterator();
}
});
//Map操作:每个单词记一次数,即将word变为(word, 1)形式
JavaPairRDD<String, Integer> wordOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//Reduce操作:使用reduceByKey函数将相同的key的value相加
JavaPairRDD<String, Integer> count = wordOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1+i2;
}
});
//以上算子都是Transformation,不会触发计算;使用Action算子:collect开始计算
List<Tuple2<String, Integer>> result = count.collect();
//将结果输出到屏幕上
for(Tuple2<String, Integer> t:result){
System.out.println(t._1 + "\t" + t._2);
}
//停止context对象
context.stop();
}
}
4.将编写好的JavaSparkWordCount程序打成Jar包
- 右键单击JavaSparkWordCount工程,选择Export
- 选中Java文件夹下的JAR file,Next
- 选择导出目的地:桌面,文件名:JavaSparkWordCount.jar,保存
- Next,Next
- Main Class:demo.JavaSparkWordCount
- Finish,OK
5.将打包好的Jar包上传到Spark集群
使用WinSCP工具将JavaSparkWordCount.jar上传至Spark集群:
[root@spark111 ~]# ls /root/JavaSparkWordCount.jar
/root/JavaSparkWordCount.jar
6.启动Spark集群
6.1启动HDFS
[root@spark111 ~]# cd /root/training/hadoop-2.7.3/sbin
[root@spark111 sbin]# ./start-dfs.sh
Starting namenodes on [spark111]
spark111: starting namenode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-namenode-spark111.out
localhost: starting datanode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-datanode-spark111.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-secondarynamenode-spark111.out
[root@spark111 sbin]# jps
2369 NameNode
2660 SecondaryNameNode
2472 DataNode
2783 Jps
6.2启动Spark
[root@spark111 ~]# start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /root/training/spark-2.1.0-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-spark111.out
spark111: starting org.apache.spark.deploy.worker.Worker, logging to /root/training/spark-2.1.0-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-spark111.out
[root@spark111 ~]# jps
2369 NameNode
2660 SecondaryNameNode
2838 Master
2472 DataNode
2968 Jps
2907 Worker
7.运行JavaSparkWordCount程序
[root@spark111 ~]# cd /root/training/hadoop-2.7.3/bin/
[root@spark111 bin]# ./hdfs dfs -cat /input/data.txt
I love Beijing
I love China
Beijing is the capital of China
[root@spark111 ~]# spark-submit --master spark://spark111:7077
--class demo.JavaSparkWordCount /root/JavaSparkWordCount.jar
hdfs://spark111:9000/input/data.txt
8.查看结果
程序执行期间,在网页上可以看到我们指定的应用程序名字:JavaWordCount
程序执行结束后,可以在屏幕上看到输出的结果:
这里演示的是从HDFS上读入文件,将结果输出到屏幕上。你也可以根据需要改变输入输出的方式。
以上详细介绍了如何使用Java语言来开发一个Spark任务:WordCount程序,以此打开通往Spark编程的大门!本节的介绍就到这里,祝你玩的愉快!