数据算法 Hadoop/Spark大数据处理---第八章

本章为求共同好友

主要的方法是

  • 先获得好友的列表,一般为:(100,100 200 300 400)之类
  • 进行转换变成[(100,200),(100,100 200 300 400)],[(100,300),(100,100 200 300 400)]之类。
  • 之后对key进行排序
  • 最后寻找合并之后的key中的value中相同的即可

本章一共有三种实现方式

  1. 基于传统mapreduce实现
  2. 基于传统spark实现
  3. 基于传统的Scala实现

++基于传统mapreduce实现++

//map函数
    //获得friend列表
    static String getFriends(String[] tokens) {
        if (tokens.length == 2) {
            return "";
        }
        StringBuilder builder = new StringBuilder();
        for (int i = 1; i < tokens.length; i++) {
            builder.append(tokens[i]);
            if (i < (tokens.length - 1)) {
                builder.append(",");
            }
        }
        return builder.toString();
    }
    //对key进行排序,确保相同的为一组
    static String buildSortedKey(String person, String friend) {
        long p = Long.parseLong(person);
        long f = Long.parseLong(friend);
        if (p < f) {
            return person + "," + friend;
        } else {
            return friend + "," + person;
        }
    }

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // parse input, delimiter is a single space
        String[] tokens = StringUtils.split(value.toString(), " ");

        // create reducer value
        String friends = getFriends(tokens);
        REDUCER_VALUE.set(friends);
        //获得整串friend序列的host
        String person = tokens[0];
        for (int i = 1; i < tokens.length; i++) {
            String friend = tokens[i];
            //让person与每个friend逐一的成对,buildSortedKey返回一个string,也是一个text
            String reducerKeyAsString = buildSortedKey(person, friend);
            REDUCER_KEY.set(reducerKeyAsString);
            context.write(REDUCER_KEY, REDUCER_VALUE);
        }
    }

//reduce函数
 @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        Map<String, Integer> map = new HashMap<String, Integer>();
        //对value进行操作
        Iterator<Text> iterator = values.iterator();
        int numOfValues = 0;
        while (iterator.hasNext()) {
            String friends = iterator.next().toString();
            if (friends.equals("")) {
                context.write(key, new Text("[]"));
                return;
            }
            addFriends(map, friends);
            numOfValues++;
        }

        // now iterate the map to see how many have numOfValues
        List<String> commonFriends = new ArrayList<String>();
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            //numOfValues为总数,当entry.getValue()与之相用时,则证明两个人的好友列表都有
            if (entry.getValue() == numOfValues) {
                commonFriends.add(entry.getKey());
            }
        }

        // sen it to output
        context.write(key, new Text(commonFriends.toString()));
    }

    static void addFriends(Map<String, Integer> map, String friendsList) {
        //获得value中的friends数组
        String[] friends = StringUtils.split(friendsList, ",");
        for (String friend : friends) {
            Integer count = map.get(friend);
            if (count == null) {
                map.put(friend, 1);
            } else {
                map.put(friend, ++count);
            }
        }
    }

++基于传统spark实现++

//传统spark实现
public static void main(String[] args) throws Exception {
  
    if (args.length < 1) {
       // Spark master URL:
       // format:   spark://<spark-master-host-name>:7077
       // example:  spark://myserver00:7077
       System.err.println("Usage: FindCommonFriends <input-file>");
       System.exit(1);
    }
    
    //String sparkMasterURL = args[0];
    //System.out.println("sparkMasterURL="+sparkMasterURL);
    
    String hdfsInputFileName = args[0];
    System.out.println("hdfsInputFileName="+hdfsInputFileName);

    // create context object
    JavaSparkContext ctx = SparkUtil.createJavaSparkContext("FindCommonFriends");

    // create the first RDD from input file
    JavaRDD<String> records = ctx.textFile(hdfsInputFileName, 1);

    // debug0 
    List<String> debug0 = records.collect();
    for (String t : debug0) {
      System.out.println("debug0 record="+t);
    }


    // PairFlatMapFunction<T, K, V>
    // T => Iterable<Tuple2<K, V>>
    JavaPairRDD<Tuple2<Long,Long>,Iterable<Long>> pairs = 
          //                                            T       K                  V
          records.flatMapToPair(new PairFlatMapFunction<String, Tuple2<Long,Long>, Iterable<Long>>() {
      @Override
      public Iterator<Tuple2<Tuple2<Long,Long>,Iterable<Long>>> call(String s) {
          //String的输入格式为100,100 200 400 300 etc
         String[] tokens = s.split(",");
         long person = Long.parseLong(tokens[0]);
         String friendsAsString = tokens[1];
         String[] friendsTokenized = friendsAsString.split(" ");
         //只有一个好友的情况,即两人没有共同的好友
         if (friendsTokenized.length == 1) {
            Tuple2<Long,Long> key = buildSortedTuple(person, Long.parseLong(friendsTokenized[0]));
            return Arrays.asList(new Tuple2<Tuple2<Long,Long>,Iterable<Long>>(key, new ArrayList<Long>())).iterator();
         }
         //获得出person的好友列表
         List<Long> friends = new ArrayList<Long>();
         for (String f : friendsTokenized) {
            friends.add(Long.parseLong(f));
         }
         //发送特定格式的Tuple2
         List<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>> result = 
             new ArrayList<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>>();
         for (Long f : friends) {
            Tuple2<Long,Long> key = buildSortedTuple(person, f);
            result.add(new Tuple2<Tuple2<Long,Long>, Iterable<Long>>(key, friends));
         }
         return result.iterator();
      }
    });

    // 输出Tuple2
    List<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>> debug1 = pairs.collect();
    for (Tuple2<Tuple2<Long,Long>,Iterable<Long>> t2 : debug1) {
      System.out.println("debug1 key="+t2._1+"\t value="+t2._2);
    }
    //对key进行排序
    JavaPairRDD<Tuple2<Long, Long>, Iterable<Iterable<Long>>> grouped = pairs.groupByKey();  

    // debug2
    List<Tuple2<Tuple2<Long, Long> ,Iterable<Iterable<Long>>>> debug2 = grouped.collect();
    for (Tuple2<Tuple2<Long,Long>, Iterable<Iterable<Long>>> t2 : debug2) {
      System.out.println("debug2 key="+t2._1+"\t value="+t2._2);
    }

    // Find intersection of all List<List<Long>>
    // mapValues[U](f: (V) => U): JavaPairRDD[K, U]
    // Pass each value in the key-value pair RDD through a map function without changing the keys; 
    // this also retains the original RDD's partitioning.
    JavaPairRDD<Tuple2<Long, Long>, Iterable<Long>> commonFriends = 
        grouped.mapValues(new Function< Iterable<Iterable<Long>>, // input
                                        Iterable<Long>            // output
                                      >() {  
      @Override
      public Iterable<Long> call(Iterable<Iterable<Long>> s) {
         Map<Long, Integer> countCommon = new HashMap<Long, Integer>();
         int size = 0; 
         for (Iterable<Long> iter : s) {
            size++;
            List<Long> list = iterableToList(iter);
            if ((list == null) || (list.isEmpty())) {
               continue;
            }
            //
            for (Long f : list) {
               Integer count = countCommon.get(f);
               if (count == null) {
                  countCommon.put(f, 1);
               }
               else {
                  countCommon.put(f, ++count);
               }
            }
         }
         
         // if countCommon.Entry<f, count> ==  countCommon.Entry<f, s.size()>
         // then that is a common friend
         List<Long> finalCommonFriends = new ArrayList<Long>();
         for (Map.Entry<Long, Integer> entry : countCommon.entrySet()){
             //size表示总数目,当entry的value等于的时候,则表示双方都有的共同好友
            if (entry.getValue() == size) {
               finalCommonFriends.add(entry.getKey());
            }
         } 
         return finalCommonFriends;
      }
    });
   
    // debug3
    List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> debug3 = commonFriends.collect();
    for (Tuple2<Tuple2<Long, Long>, Iterable<Long>> t2 : debug3) {
      System.out.println("debug3 key="+t2._1+ "\t value="+t2._2);
    }
    
    System.exit(0);
  }
  
  static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
     if (a < b) {
        return new Tuple2<Long, Long>(a,b);
     }
     else {
        return new Tuple2<Long, Long>(b,a);
     }
  }

  static List<Long> iterableToList(Iterable<Long> iterable) {
    List<Long> list = new ArrayList<Long>(); 
    for (Long item : iterable) {      
       list.add(item);
    }
    return list;
  }

++基于传统的Scala实现++

//传统Scala实现
def main(args: Array[String]): Unit = {
    if (args.size < 2) {
      println("Usage: FindCommonFriends <input-dir> <output-dir>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("FindCommonFriends")
    val sc = new SparkContext(sparkConf)

    val input = args(0)
    val output = args(1)

    val records = sc.textFile(input)

    val pairs = records.flatMap(s => {
      val tokens = s.split(",")
      //获得host
      val person = tokens(0).toLong
      //获得friend列表的list方式
      val friends = tokens(1).split("\\s+").map(_.toLong).toList
      val result = for {
        i <- 0 until friends.size
        friend = friends(i)
        //这里就以后把host与所有friend的全部生产出来了
      } yield {
        //这个判断person和friend的方法,也即是在完成key的排序
        if (person < friend)
          ((person, friend), friends)
        else
          ((friend, person), friends)
      }
      result
    })
    //对key进行分组
    val grouped = pairs.groupByKey()

    val commonFriends = grouped.mapValues(iter => {
      val friendCount = for {
        list <- iter
        if !list.isEmpty
        friend <- list
        //分组类所有人的friend都赋值上1
      } yield ((friend, 1))
      //分到相用组eg:[a,,1,1] 把这个序列打平编程[a,,(1,1)]
      //然后对第二个_.2进行相加,之后大于1的取出它的key
      friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1)
    })

    // save the result to the file
    commonFriends.saveAsTextFile(output) 

    //Format result for easy viewing
    val formatedResult = commonFriends.map(
      f => s"(${f._1._1}, ${f._1._2})\t${f._2.mkString("[", ", ", "]")}"
    )
    
    formatedResult.foreach(println)

    // done!
    sc.stop()
  }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,888评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,677评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,386评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,726评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,729评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,337评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,902评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,807评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,349评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,439评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,567评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,242评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,933评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,420评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,531评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,995评论 3 377
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,585评论 2 359

推荐阅读更多精彩内容