WordCount 是用来统计一个文件中相同单词出现次数的程序, 是一个可以用来描述Spark运行的经典问题:通过将单词拆分映射(map)和对映射进行统计(reduce)实现任务
public final class JavaWordCount
{
//定义一个空格的字符模式
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception
{
// 0. 判断是否输入参数
if (args.length < 1)
{
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
// 1. 创建一个 Spark 的上下文; 在创建上下文的过程中,程序会向集群申请资源以及构建相应的运行环境。
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext jSparkContext = new JavaSparkContext(sparkConf);
// 2. 内存中的集合或者外部文件系统作为输入源
JavaRDD<String> lines = ctx.textFile(args[0], 1);
// 3. flatMap 函数类似于 map 函数,但是每一个输入元素,会被映射为 0 到多个输出元素,因此,func 函数的返回值是一个 Seq,而不是单一元素
// 本例中将文本按空格拆分成一个List
JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>()
{
@Override
public Iterable<String> call(String s)
{
return Arrays.asList(SPACE.split(s));
}
});
// 4. MapToPair 函数类类似于map函数, 对于每一个输入元素, 映射为有特殊Key-Value的函数;
// 本例中将每一个单词对应计数1
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>()
{
@Override
public Tuple2<String, Integer> call(String s)
{
return new Tuple2<String, Integer>(s, 1);
}
});
// 5. reduceByKey函数在一个(K,V) 对的数据集上使用,返回一个(K,V)对的数据集,key 相同的值,都被使用指定的 reduce 函数聚合到一起
// 本例中将相同单词的计数合并(i1+i2),达到合计总数的作用
// reduceByKey是一个Action, 将transformation懒操作(Map)中的操作一并进行实现
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer i1, Integer i2)
{
return i1 + i2;
}
});
// 6. collect() 以数组的形式返回数据集的所有元素
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output)
{
System.out.println(tuple._1() + ": " + tuple._2());
}
// 7. JVM一次只能启动一个JavaSparkContext, 所以运行完成后需要关闭当前SparkContext
jSparkContext.stop();
}
}
参考资料:
http://spark.apache.org/docs/latest/api/java/index.html
http://www.ibm.com/developerworks/cn/opensource/os-cn-spark-deploy1/index.html