本章欲解决的问题为求TOP(N),共用到的方法有:
- 假设输入键都是唯一的,也即给定的输入集合{(K,V)},所有的K都是唯一的,用Mapreduce/Hadoop方法
- 假设输入键都是唯一的,也即给定的输入集合{(K,V)},所有的K都是唯一的,用spark方法
- 假设输入键都不是唯一的,也即给定的输入集合{(K,V)},K是有重复的,用spark强大的排序算法top()函数和takeOrdered()等
主要用到的TOP N函数
java中实现Top N的方法最常用的是适用SortedMap<K,V>和TreeMap<K,V>,然后将L的所有元素增加到topN中,如果topN.size()>N,则删除第一个元素或最后一个元素
//TOP K 中最关键的算法
static SortMap<Integer,T> topN(List<Tuple2<T,Integer>> L,int N){
if((L==null) || (L.isEmpty())){
return null;
}
SortMap<Integer,T> topN = new TreeMap<Integer,T>();
for(Tuple2<T,Integer> element : L){
topN.put(element._1,element._2);
if(topN.size() > N){
topN.remove(topN.firstKey())
}
}
return topN;
}
基于MapReduce实现的键唯一方法
类名 | 描述 |
---|---|
TopN_Driver | 提交作业的驱动器 |
TopN_Mapper | 定义map() |
TopN_Reduce | 定义reduce() |
- 重写setup和cleanup函数,这里两个函数再每次启动映射器都会执行一次,setup用于获取N的值,cleanup用于发射每个映射器的TOP N到reduce端
//获取N的值
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
//将结果发射,其中NullWritable.get()获取的值都相同,也即都映射到相同的reduce端
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
for (String str : top.values()) {
context.write(NullWritable.get(), new Text(str));
}
}
- Map函数,完成分区的TOP N求值
@Override
public void map(Text key, IntWritable value, Context context)
throws IOException, InterruptedException {
String keyAsString = key.toString();
int frequency = value.get();
String compositeValue = keyAsString + "," + frequency;
top.put(frequency, compositeValue);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
- Reduce函数,完成所有的TOP N求值
private int N = 10; // default
private SortedMap<Integer, String> top = new TreeMap<Integer, String>();
//同样的SortedMap<Integer, String>操作
@Override
public void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
String valueAsString = value.toString().trim();
String[] tokens = valueAsString.split(",");
String url = tokens[0];
int frequency = Integer.parseInt(tokens[1]);
top.put(frequency, url);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
// 发射最终的 final top N
List<Integer> keys = new ArrayList<Integer>(top.keySet());
for(int i=keys.size()-1; i>=0; i--){
context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
}
}
//也先执行setup获得N的值
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
- 驱动程序类TopNDriver.java
Job job = new Job(getConf());
HadoopUtil.addJarsToDistributedCache(job, "/lib/");
int N = Integer.parseInt(args[0]); // top N
job.getConfiguration().setInt("N", N);
job.setJobName("TopNDriver");
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
//设置reduce的数目为1个,也即所有的TOP N都到同一个Reduce
job.setNumReduceTasks(1);
// map()'s output (K,V)
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// reduce()'s output (K,V)
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
- 查找TOP 10 和 Bottom 10
//查找top10
if(top10Cats.size()>10){
top10Cats.remove(top10Cats.firstKey())
}
//查找Bottom10
if(top10Cats.size()>10){
top10Cats.remove(top10Cats.lastKey())
}
基于Spark实现的键唯一方法
Java API使用的spark函数类
spark java类 | 函数类型 |
---|---|
Function<T,R> | T=>R |
DoubleFunction<T> | T=>Double |
PairFunction<T,K,V> | T=>Tuple2<K,V> |
FlatMapFunction<T,R> | T=>Iterable<R> |
DoubleFlatMapFunction<T> | T=>Iterable<Double> |
PairFlatMapFunction<T,K,V> | T=>Iterable<Tuple2<K,v>> |
Function2<T1,T2,R> | T1,T2 => R |
在spark中使用setUp()和cleanUp()
JavaRDD<SortedMap<Integer, String>> partitions =
//使用mapPartitions方法
pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<K,V>>, SortedMap<K1, K2>>() {
@Override
public Iterator<Tuple2<K,V>> call(Iterator<Tuple2<K,V>> iter) {
setup();
while(iter.hasNext()){
//map()功能
}
cleanUp();
return <the-result>
}
})
- 采用spark实现TOP N
public static void main(String[] args) throws Exception {
// 输入处理参数
if (args.length < 1) {
System.err.println("Usage: Top10 <input-file>");
System.exit(1);
}
String inputPath = args[0];
System.out.println("args[0]: <input-path>="+inputPath);
// 连接到spark master
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// 从HDFS中读取文件并创建第一个RDD
// <string-key><,><integer-value>,
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
// 从现有的JavaRDD<String>创建一个新的成对的RDDJavaPairRDD<String,Integer>
// Spark Java类:PairFunction<T, K, V>
// 函数类型:T => Tuple2<K, V>
//其实每一个JavaPairRDD<String,Integer>也即是Tuple2<String,Integer>()
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // cat7,234
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
List<Tuple2<String,Integer>> debug1 = pairs.collect();
for (Tuple2<String,Integer> t2 : debug1) {
System.out.println("key="+t2._1 + "\t value= " + t2._2);
}
// 为各个输入分区创建一个本地TOP 10列表
JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
top10.put(tuple._2, tuple._1);
// keep only top N
if (top10.size() > 10) {
top10.remove(top10.firstKey());
}
}
//singletonList确保唯一性
return Collections.singletonList(top10).iterator();
}
});
SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
//使用collect得到所有TOP 10列表
List<SortedMap<Integer, String>> alltop10 = partitions.collect();
//获得最终所有的TOP 10
for (SortedMap<Integer, String> localtop10 : alltop10) {
//System.out.println(tuple._1 + ": " + tuple._2);
// weight/count = tuple._1
// catname/URL = tuple._2
for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
// System.out.println(entry.getKey() + "--" + entry.getValue());
finaltop10.put(entry.getKey(), entry.getValue());
// keep only top 10
if (finaltop10.size() > 10) {
finaltop10.remove(finaltop10.firstKey());
}
}
}
// 输出最终的TOP 10列表
for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
全局指定TOP N 参数
- 定义broadcastTopN:final Broadcast<Integer> broadcastTopN = context.broadcast(topN)
- 获取N的值:final int topN = broadcastTopN.value();
基于Spark实现的键不唯一的方法
算法过程
- 要保证K是唯一的,要把输入映射到JavaPairRDD<K,V>对,然后交给reduceByKey()
- 将所有唯一的(K,V)对划分为M个分区
- 找到各个分区的TOP N (本地TOP N)
- 找出所有本地TOP N的最终TOP N
基于Spark实现的非唯一键方法
public static void main(String[] args) throws Exception {
// 输入处理参数
if (args.length < 2) {
System.err.println("Usage: Top10 <input-path> <topN>");
System.exit(1);
}
System.out.println("args[0]: <input-path>="+args[0]);
System.out.println("args[1]: <topN>="+args[1]);
final int N = Integer.parseInt(args[1]);
// 创建一个javaSpark上下文对象
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// 将TOP N 广播到所有集群节点
final Broadcast<Integer> topN = ctx.broadcast(N);
// now topN is available to be read from all cluster nodes
// 创建第一个RDD,格式是这样的A,2 | B,2 |C,3这样
//<string-key><,><integer-value-count>
JavaRDD<String> lines = ctx.textFile(args[0], 1);
lines.saveAsTextFile("/output/1");
// RDD分区,返回一个新的RDD,归约到numPartitions分区
//分区的原则:每个执行器使用(2*num_executors*cores_per_executor)个分区
JavaRDD<String> rdd = lines.coalesce(9);
// 将输入(T)映射到(K,V)对
// PairFunction<T, K, V>
// T => Tuple2<K, V>
JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // url,789
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
kv.saveAsTextFile("/output/2");
//用Function函数对重复键进行归约
JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
uniqueKeys.saveAsTextFile("/output/3");
// 为本地的partitions创建本地的TOP N
JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
final int N = topN.value();
SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
localTopN.put(tuple._2, tuple._1);
// keep only top N
if (localTopN.size() > N) {
localTopN.remove(localTopN.firstKey());
}
}
return Collections.singletonList(localTopN).iterator();
}
});
partitions.saveAsTextFile("/output/4");
// 获得最终的TOP N
SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
//获得所有分区的TOP N
List<SortedMap<Integer, String>> allTopN = partitions.collect();
for (SortedMap<Integer, String> localTopN : allTopN) {
for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
// count = entry.getKey()
// url = entry.getValue()
finalTopN.put(entry.getKey(), entry.getValue());
// keep only top N
if (finalTopN.size() > N) {
finalTopN.remove(finalTopN.firstKey());
}
}
}
//输出最终的TOP N
for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
基于takeOrdered实现的键不唯一的方法
//步骤8:获取全局TOP 10的使用
List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
//但需要实现排序方法
static class MyTupleComparator implements Comparator<Tuple2<String, Integer>> ,Serializable {
final static MyTupleComparator INSTANCE = new MyTupleComparator();
@Override
public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
return -t1._2.compareTo(t2._2); // sorts RDD elements descending (use for Top-N)
// return t1._2.compareTo(t2._2); // sorts RDD elements ascending (use for Bottom-N)
}
}
用mapreduce求不唯一的方法
- 先类似wordCount求出唯一的<key,value>
- 之后用第一节唯一键求TOP N即可
使用Scala实现唯一键和不唯一键
唯一键的实现方法
def main(args: Array[String]): Unit = {
if (args.size < 1) {
println("Usage: TopN <input>")
sys.exit(1)
}
//获得sparkConf对象
val sparkConf = new SparkConf().setAppName("TopN")
val sc = new SparkContext(sparkConf)
//广播N变量
val N = sc.broadcast(10)
val path = args(0)
val input = sc.textFile(path)
//注意:key和value倒过来了
val pair = input.map(line => {
val tokens = line.split(",")
(tokens(2).toInt, tokens)
})
import Ordering.Implicits._
val partitions = pair.mapPartitions(itr => {
//sortedMap是对key进行排序的,也即对value排序了
var sortedMap = SortedMap.empty[Int, Array[String]]
itr.foreach { tuple =>
{
sortedMap += tuple
if (sortedMap.size > N.value) {
sortedMap = sortedMap.takeRight(N.value)
}
}
}
//获得分区右边的N个
sortedMap.takeRight(N.value).toIterator
})
//获得所有分区
val alltop10 = partitions.collect()
//把所有分区连接上SortedMap,也即可所有分区都排序好了
val finaltop10 = SortedMap.empty[Int, Array[String]].++:(alltop10)
val resultUsingMapPartition = finaltop10.takeRight(N.value)
//Prints result (top 10) on the console
resultUsingMapPartition.foreach {
case (k, v) => println(s"$k \t ${v.asInstanceOf[Array[String]].mkString(",")}")
}
// 方法二:sortByKey对key进行排序,以降序的方式
val moreConciseApproach = pair.groupByKey().sortByKey(false).take(N.value)
//Prints result (top 10) on the console
moreConciseApproach.foreach {
case (k, v) => println(s"$k \t ${v.flatten.mkString(",")}")
}
// done
sc.stop()
}
不唯一键的实现方法
def main(args: Array[String]): Unit = {
if (args.size < 1) {
println("Usage: TopNNonUnique <input>")
sys.exit(1)
}
val sparkConf = new SparkConf().setAppName("TopNNonUnique")
val sc = new SparkContext(sparkConf)
val N = sc.broadcast(2)
val path = args(0)
val input = sc.textFile(path)
val kv = input.map(line => {
val tokens = line.split(",")
(tokens(0), tokens(1).toInt)
})
val uniqueKeys = kv.reduceByKey(_ + _)
import Ordering.Implicits._
val partitions = uniqueKeys.mapPartitions(itr => {
//SortedMap是一个对键进行排列
var sortedMap = SortedMap.empty[Int, String]
itr.foreach { tuple =>
{
//把元组的值相反再相加
sortedMap += tuple.swap
if (sortedMap.size > N.value) {
sortedMap = sortedMap.takeRight(N.value)
}
}
}
sortedMap.takeRight(N.value).toIterator
})
val alltop10 = partitions.collect()
val finaltop10 = SortedMap.empty[Int, String].++:(alltop10)
val resultUsingMapPartition = finaltop10.takeRight(N.value)
//Prints result (top 10) on the console
resultUsingMapPartition.foreach {
case (k, v) => println(s"$k \t ${v.mkString(",")}")
}
// Below is additional approach which is more concise
val createCombiner = (v: Int) => v
val mergeValue = (a: Int, b: Int) => (a + b)
val moreConciseApproach = kv.combineByKey(createCombiner, mergeValue, mergeValue)
.map(_.swap)
.groupByKey()
.sortByKey(false)
.take(N.value)
//Prints result (top 10) on the console
moreConciseApproach.foreach {
case (k, v) => println(s"$k \t ${v.mkString(",")}")
}
// done
sc.stop()
}