sparkCore:算子实现:
object WordCount {
def main(args: Array[String]):Unit = {
//创建
val config =new SparkConf().setMaster("local[*]").setAppName("WC")
//创建SparkContext对象
val sc =new SparkContext(config)
//读取文件
val lines:RDD[String] = sc.textFile("in")
//分解单词
val words:RDD[String] = lines.flatMap(_.split(" "))
val wordOne:RDD[(String,Int)] = words.map((_,1))
val result:Array[(String,Int)] = wordOne.reduceByKey(_+_).collect()
result.foreach(println)
}
SparkSql 实现:
object WordCount {
def main(args: Array[String]):Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("wordCount")
.master("local[*]")
.getOrCreate()
//读取数据
val ds: Dataset[String] = spark.read.textFile("in/word.txt")
//引包,不然无法调用flatMap()
import spark.implicits._
//整理数据 (切分压平)
val ds1: Dataset[String] = ds.flatMap(_.split(" "))
//构建临时表
ds1.createTempView("word")
//执行 SQL 语句,结果倒序
val df:DataFrame = spark.sql("select value,count(*) count from word group by value order by count desc")
//展示
df.show()
//关闭
spark.stop()
}
}
java 实现:MapReduce
mapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k =new Text();
IntWritable v = new IntWritable(1);
// @SuppressWarnings("unused")
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 将一行内容转化为String
String line = value.toString();
// 2 切分
String[] words = line.split(" ");
// 3 循环写出到下一个阶段 写
for (String word : words) {
k.set(word);
context.write(k,v);//写入
}
}
}
reduce
public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{
// hello 1
// hello 1
@Override
//相同的进来
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
// 1 汇总 单词总个数
int sum = 0;
for (IntWritable count : values) {
sum +=count.get();
}
// 2 输出单词的总个数
context.write(key, new IntWritable(sum));
}
}
driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 获取jar包位置
job.setJarByClass(WordCountDriver.class);
// 3 关联mapper he reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置map输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置数据输入 输出文件的 路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7提交代码
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}