数据算法 Hadoop/Spark大数据处理---第三章

本章欲解决的问题为求TOP(N),共用到的方法有:

  • 假设输入键都是唯一的,也即给定的输入集合{(K,V)},所有的K都是唯一的,用Mapreduce/Hadoop方法
  • 假设输入键都是唯一的,也即给定的输入集合{(K,V)},所有的K都是唯一的,用spark方法
  • 假设输入键都不是唯一的,也即给定的输入集合{(K,V)},K是有重复的,用spark强大的排序算法top()函数和takeOrdered()等

主要用到的TOP N函数

java中实现Top N的方法最常用的是适用SortedMap<K,V>和TreeMap<K,V>,然后将L的所有元素增加到topN中,如果topN.size()>N,则删除第一个元素或最后一个元素

//TOP K 中最关键的算法
static SortMap<Integer,T> topN(List<Tuple2<T,Integer>> L,int N){
    if((L==null) || (L.isEmpty())){
        return null;
    }
    
    SortMap<Integer,T> topN = new TreeMap<Integer,T>();
    for(Tuple2<T,Integer> element : L){
        topN.put(element._1,element._2);
        if(topN.size() > N){
            topN.remove(topN.firstKey())
        }
    }
    return topN;
}


基于MapReduce实现的键唯一方法

类名 描述
TopN_Driver 提交作业的驱动器
TopN_Mapper 定义map()
TopN_Reduce 定义reduce()

  • 重写setup和cleanup函数,这里两个函数再每次启动映射器都会执行一次,setup用于获取N的值,cleanup用于发射每个映射器的TOP N到reduce端
  //获取N的值
  @Override
   protected void setup(Context context) throws IOException,
         InterruptedException {
      this.N = context.getConfiguration().getInt("N", 10); // default is top 10
   }
   
    //将结果发射,其中NullWritable.get()获取的值都相同,也即都映射到相同的reduce端
   @Override
   protected void cleanup(Context context) throws IOException,
         InterruptedException {
      for (String str : top.values()) {
         context.write(NullWritable.get(), new Text(str));
      }
   }

- Map函数,完成分区的TOP N求值

 @Override
   public void map(Text key, IntWritable value, Context context)
         throws IOException, InterruptedException {

      String keyAsString = key.toString();
      int frequency =  value.get();
      String compositeValue = keyAsString + "," + frequency;
      top.put(frequency, compositeValue);
      // keep only top N
      if (top.size() > N) {
         top.remove(top.firstKey());
      }
   }

- Reduce函数,完成所有的TOP N求值

 private int N = 10; // default
   private SortedMap<Integer, String> top = new TreeMap<Integer, String>();

   //同样的SortedMap<Integer, String>操作
   @Override
   public void reduce(NullWritable key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
      for (Text value : values) {
         String valueAsString = value.toString().trim();
         String[] tokens = valueAsString.split(",");
         String url = tokens[0];
         int frequency =  Integer.parseInt(tokens[1]);
         top.put(frequency, url);
         // keep only top N
         if (top.size() > N) {
            top.remove(top.firstKey());
         }
      }
      
      // 发射最终的 final top N
        List<Integer> keys = new ArrayList<Integer>(top.keySet());
        for(int i=keys.size()-1; i>=0; i--){
         context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
      }
   }
   
   //也先执行setup获得N的值
   @Override
   protected void setup(Context context) 
      throws IOException, InterruptedException {
      this.N = context.getConfiguration().getInt("N", 10); // default is top 10
   }

- 驱动程序类TopNDriver.java

 Job job = new Job(getConf());
      HadoopUtil.addJarsToDistributedCache(job, "/lib/");
      int N = Integer.parseInt(args[0]); // top N
      job.getConfiguration().setInt("N", N);
      job.setJobName("TopNDriver");

      job.setInputFormatClass(SequenceFileInputFormat.class);
      job.setOutputFormatClass(SequenceFileOutputFormat.class);

      job.setMapperClass(TopNMapper.class);
      job.setReducerClass(TopNReducer.class);
      //设置reduce的数目为1个,也即所有的TOP N都到同一个Reduce
      job.setNumReduceTasks(1);

      // map()'s output (K,V)
      job.setMapOutputKeyClass(NullWritable.class);   
      job.setMapOutputValueClass(Text.class);   
      
      // reduce()'s output (K,V)
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(Text.class);


- 查找TOP 10 和 Bottom 10

//查找top10 
if(top10Cats.size()>10){
    top10Cats.remove(top10Cats.firstKey())
}

//查找Bottom10
if(top10Cats.size()>10){
    top10Cats.remove(top10Cats.lastKey())
}

基于Spark实现的键唯一方法

Java API使用的spark函数类
spark java类 函数类型
Function<T,R> T=>R
DoubleFunction<T> T=>Double
PairFunction<T,K,V> T=>Tuple2<K,V>
FlatMapFunction<T,R> T=>Iterable<R>
DoubleFlatMapFunction<T> T=>Iterable<Double>
PairFlatMapFunction<T,K,V> T=>Iterable<Tuple2<K,v>>
Function2<T1,T2,R> T1,T2 => R
在spark中使用setUp()和cleanUp()
 JavaRDD<SortedMap<Integer, String>> partitions =
 //使用mapPartitions方法
 pairs.mapPartitions(
  new FlatMapFunction<Iterator<Tuple2<K,V>>, SortedMap<K1, K2>>() {
      @Override
         public Iterator<Tuple2<K,V>> call(Iterator<Tuple2<K,V>> iter) {
             setup();
             while(iter.hasNext()){
                 //map()功能
             }
             cleanUp();
             return <the-result>
         }
  })

- 采用spark实现TOP N

 public static void main(String[] args) throws Exception {
  
      // 输入处理参数
      if (args.length < 1) {
         System.err.println("Usage: Top10 <input-file>");
         System.exit(1);
      }
      String inputPath = args[0];
      System.out.println("args[0]: <input-path>="+inputPath);

      // 连接到spark master
      JavaSparkContext ctx = SparkUtil.createJavaSparkContext();

      // 从HDFS中读取文件并创建第一个RDD
      //  <string-key><,><integer-value>,
      JavaRDD<String> lines = ctx.textFile(inputPath, 1);

    
      // 从现有的JavaRDD<String>创建一个新的成对的RDDJavaPairRDD<String,Integer>
      // Spark Java类:PairFunction<T, K, V>
      // 函数类型:T => Tuple2<K, V>
      //其实每一个JavaPairRDD<String,Integer>也即是Tuple2<String,Integer>()
      JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // cat7,234
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });

      List<Tuple2<String,Integer>> debug1 = pairs.collect();
      for (Tuple2<String,Integer> t2 : debug1) {
         System.out.println("key="+t2._1 + "\t value= " + t2._2);
      }

    
      // 为各个输入分区创建一个本地TOP 10列表
      JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
         new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
         @Override
         public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                top10.put(tuple._2, tuple._1);
                // keep only top N 
                if (top10.size() > 10) {
                   top10.remove(top10.firstKey());
                }  
             }
             //singletonList确保唯一性
             return Collections.singletonList(top10).iterator();
         }
      });

    
      SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
      //使用collect得到所有TOP 10列表
      List<SortedMap<Integer, String>> alltop10 = partitions.collect();
      //获得最终所有的TOP 10
      for (SortedMap<Integer, String> localtop10 : alltop10) {
          //System.out.println(tuple._1 + ": " + tuple._2);
          // weight/count = tuple._1
          // catname/URL = tuple._2
          for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
              //   System.out.println(entry.getKey() + "--" + entry.getValue());
              finaltop10.put(entry.getKey(), entry.getValue());
              // keep only top 10 
              if (finaltop10.size() > 10) {
                 finaltop10.remove(finaltop10.firstKey());
              }
          }
      }
    
      // 输出最终的TOP 10列表
      for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
         System.out.println(entry.getKey() + "--" + entry.getValue());
      }

      System.exit(0);
   }

全局指定TOP N 参数
  • 定义broadcastTopN:final Broadcast<Integer> broadcastTopN = context.broadcast(topN)
  • 获取N的值:final int topN = broadcastTopN.value();

基于Spark实现的键不唯一的方法

算法过程
  1. 要保证K是唯一的,要把输入映射到JavaPairRDD<K,V>对,然后交给reduceByKey()
  2. 将所有唯一的(K,V)对划分为M个分区
  3. 找到各个分区的TOP N (本地TOP N)
  4. 找出所有本地TOP N的最终TOP N

基于Spark实现的非唯一键方法

public static void main(String[] args) throws Exception {
      // 输入处理参数
      if (args.length < 2) {
         System.err.println("Usage: Top10 <input-path> <topN>");
         System.exit(1);
      }
      System.out.println("args[0]: <input-path>="+args[0]);
      System.out.println("args[1]: <topN>="+args[1]);
      final int N = Integer.parseInt(args[1]);

      // 创建一个javaSpark上下文对象
      JavaSparkContext ctx = SparkUtil.createJavaSparkContext();

      // 将TOP N 广播到所有集群节点
      final Broadcast<Integer> topN = ctx.broadcast(N);
      // now topN is available to be read from all cluster nodes

      // 创建第一个RDD,格式是这样的A,2 | B,2 |C,3这样
      //<string-key><,><integer-value-count>
      JavaRDD<String> lines = ctx.textFile(args[0], 1);
      lines.saveAsTextFile("/output/1");
    
      // RDD分区,返回一个新的RDD,归约到numPartitions分区
      //分区的原则:每个执行器使用(2*num_executors*cores_per_executor)个分区
      JavaRDD<String> rdd = lines.coalesce(9);
       
      // 将输入(T)映射到(K,V)对
      // PairFunction<T, K, V>   
      // T => Tuple2<K, V>
      JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
         @Override
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // url,789
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });
      kv.saveAsTextFile("/output/2");

      //用Function函数对重复键进行归约
      JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
         @Override
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
         }
      });
      uniqueKeys.saveAsTextFile("/output/3");
    
      // 为本地的partitions创建本地的TOP N
      JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
          new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
          @Override
          public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             final int N = topN.value();
             SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                localTopN.put(tuple._2, tuple._1);
                // keep only top N 
                if (localTopN.size() > N) {
                   localTopN.remove(localTopN.firstKey());
                } 
             }
             return Collections.singletonList(localTopN).iterator();
          }
      });
      partitions.saveAsTextFile("/output/4");

      // 获得最终的TOP N
      SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
      //获得所有分区的TOP N
      List<SortedMap<Integer, String>> allTopN = partitions.collect();
      for (SortedMap<Integer, String> localTopN : allTopN) {
         for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
             // count = entry.getKey()
             // url = entry.getValue()
             finalTopN.put(entry.getKey(), entry.getValue());
             // keep only top N 
             if (finalTopN.size() > N) {
                finalTopN.remove(finalTopN.firstKey());
             }
         }
      }
    
      //输出最终的TOP N
      for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
         System.out.println(entry.getKey() + "--" + entry.getValue());
      }

      System.exit(0);
   }

基于takeOrdered实现的键不唯一的方法

//步骤8:获取全局TOP 10的使用
 List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
 
//但需要实现排序方法
static class MyTupleComparator implements Comparator<Tuple2<String, Integer>> ,Serializable {
       final static MyTupleComparator INSTANCE = new MyTupleComparator();
       @Override
       public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
          return -t1._2.compareTo(t2._2);     // sorts RDD elements descending (use for Top-N)
          // return t1._2.compareTo(t2._2);   // sorts RDD elements ascending (use for Bottom-N)
       }
   }

用mapreduce求不唯一的方法

  • 先类似wordCount求出唯一的<key,value>
  • 之后用第一节唯一键求TOP N即可

使用Scala实现唯一键和不唯一键

唯一键的实现方法
 def main(args: Array[String]): Unit = {
    if (args.size < 1) {
      println("Usage: TopN <input>")
      sys.exit(1)
    }
    //获得sparkConf对象
    val sparkConf = new SparkConf().setAppName("TopN")
    val sc = new SparkContext(sparkConf)
    //广播N变量
    val N = sc.broadcast(10)
    val path = args(0)

    val input = sc.textFile(path)
    //注意:key和value倒过来了
    val pair = input.map(line => {
      val tokens = line.split(",")
      (tokens(2).toInt, tokens)
    })

    import Ordering.Implicits._
    val partitions = pair.mapPartitions(itr => {
    //sortedMap是对key进行排序的,也即对value排序了
      var sortedMap = SortedMap.empty[Int, Array[String]]
      itr.foreach { tuple =>
        {
          sortedMap += tuple
          if (sortedMap.size > N.value) {
            sortedMap = sortedMap.takeRight(N.value)
          }
        }
      }
      //获得分区右边的N个
      sortedMap.takeRight(N.value).toIterator
    })
    //获得所有分区
    val alltop10 = partitions.collect()
    //把所有分区连接上SortedMap,也即可所有分区都排序好了
    val finaltop10 = SortedMap.empty[Int, Array[String]].++:(alltop10)
    val resultUsingMapPartition = finaltop10.takeRight(N.value)
    
    //Prints result (top 10) on the console
    resultUsingMapPartition.foreach {
      case (k, v) => println(s"$k \t ${v.asInstanceOf[Array[String]].mkString(",")}")
    }

    // 方法二:sortByKey对key进行排序,以降序的方式
    val moreConciseApproach = pair.groupByKey().sortByKey(false).take(N.value)

    //Prints result (top 10) on the console
    moreConciseApproach.foreach {
      case (k, v) => println(s"$k \t ${v.flatten.mkString(",")}")
    }
    
    // done
    sc.stop()
  }
不唯一键的实现方法
def main(args: Array[String]): Unit = {
    if (args.size < 1) {
      println("Usage: TopNNonUnique <input>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("TopNNonUnique")
    val sc = new SparkContext(sparkConf)

    val N = sc.broadcast(2)
    val path = args(0)

    val input = sc.textFile(path)
    val kv = input.map(line => {
      val tokens = line.split(",")
      (tokens(0), tokens(1).toInt)
    })

    val uniqueKeys = kv.reduceByKey(_ + _)
    import Ordering.Implicits._
    val partitions = uniqueKeys.mapPartitions(itr => {
      //SortedMap是一个对键进行排列
      var sortedMap = SortedMap.empty[Int, String]
      itr.foreach { tuple =>
        {
          //把元组的值相反再相加
          sortedMap += tuple.swap
          if (sortedMap.size > N.value) {
            sortedMap = sortedMap.takeRight(N.value)
          }
        }
      }
      sortedMap.takeRight(N.value).toIterator
    })

    val alltop10 = partitions.collect()
    val finaltop10 = SortedMap.empty[Int, String].++:(alltop10)
    val resultUsingMapPartition = finaltop10.takeRight(N.value)

    //Prints result (top 10) on the console
    resultUsingMapPartition.foreach { 
      case (k, v) => println(s"$k \t ${v.mkString(",")}") 
    }

    // Below is additional approach which is more concise
    val createCombiner = (v: Int) => v
    val mergeValue = (a: Int, b: Int) => (a + b)
    val moreConciseApproach = kv.combineByKey(createCombiner, mergeValue, mergeValue)
                                .map(_.swap)
                                .groupByKey()
                                .sortByKey(false)
                                .take(N.value)

    //Prints result (top 10) on the console
    moreConciseApproach.foreach {
      case (k, v) => println(s"$k \t ${v.mkString(",")}")
    }

    // done
    sc.stop()
  }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容