1.通过Scala方式,本地模式实现word count:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object ScalaWordCount {
def main(args: Array[String]): Unit = {
//配置文件,设置了程序的名字,和集群地址
val conf = new SparkConf().setAppName("Scala word count").setMaster("local")
//获取SparkContext对象
val sc = new SparkContext(conf)
val result =sc.textFile("D:\\zou.txt")//读取文件
.flatMap(_.split(" "))//分词
.map((_,1))//给每个单词计数1
.reduceByKey(_+_)//统计每个单词的数量
//输出到屏幕上
result.foreach(println)
}
}
运行的结果:
(goodbey,1)
(bey,2)
(you,2)
(when,1)
(say,1)
(i,2)
(to,1)
(,1)
(see,1)
(it's,1)
(again,1)
(so,1)
(time,1)
(much,1)
(miss,1)
说明:本地模式,读取的文件是Windows本地的文件
2.Scala方式,通过集群方式实现word count:
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
object ScalaWordCount {
def main(args: Array[String]): Unit = {
//提交到集群,对hdfs上的数据进行word count
//如果要提交到集群中运行,不需要设置Master
//设置动作的名字,和集群的地址
val conf = new SparkConf().setAppName("Scala word count")
//获取spark Context的对象
val sc = new SparkContext(conf)
val result = sc.textFile(args(0))//获取文件,参数为spark集群的地址
.flatMap(_.split(" "))//分词
.map((_,1))//给每个单词计数1
.reduceByKey(_+_)//统计每个单词的数量
.saveAsTextFile(args(1))//将结果保存,参数为要保存的hdfs的地址
sc.stop()
}
}
程序编写好之后,打成jar包,上传到Linux服务器的某个目录下,然后通过spark-2.1.0-bin-hadoop2.7/bin 目录下的spark-submit方式执行任务:
[root@bigdata111 bin]# ./spark-submit --master spark://bigdata111:7077 --class com.hengan.WordCount.ScalaWordCount /opt/jars/Dome1.jar hdfs://bigdata111:9000/word.txt hdfs://bigdata111:9000/result
说明:
通过spark-submit方式执行spark任务,
集群的地址:spark://bigdata111:7077,
程序的全类名:com.hengan.WordCount.ScalaWordCount,
jar包的位置: /opt/jars/Dome1.jar ,
要读取的文件的路径:hdfs://bigdata111:9000/word.txt,
结果存放的路径:hdfs://bigdata111:9000/result
结果:
(shuai,1)
(are,1)
(b,1)
(best,1)
(zouzou,1)
(word,1)
(hello,1)
(world,1)
(you,1)
(a,1)
(the,1)
[root@bigdata111 ~]#
3.Java方式, 实现word count
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import parquet.format.PageHeader;
import scala.Tuple2;
public class JavaWordCount {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
//创建SparkContext对象
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//读取hdfs上的文件
JavaRDD<String> lines = ctx.textFile("hdfs://192.168.109.131:8020/tmp_files/test_WordCount.txt");
//根据空格切分
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
// I Love Beijing
return Arrays.asList(input.split(" ")).iterator();
}
});
//每个词计数(单词,1)
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(s, 1);
}
});
//reduce求和
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0 + arg1;
}
});
//输出
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<String, Integer> tuple2 : output) {
System.out.println(tuple2._1+" : " + tuple2._2);
}
ctx.stop();
}
}