数据算法 Hadoop/Spark大数据处理---第十章

本章为推荐引擎

本章为基于电影内容的推荐,假设输入为<用户,电影,评分>,输入为<电影1,电影2><三种算法的相似度>。


本章实现方式

  1. 基于传统spark来实现
  2. 基于传统Scala来实现

本章实现方式的思路

- spark:
  • 1.先创建JavaSparkContext,textFile读入文件。
  • 2.对String进行切分,转换成key = Movie3 value=(User1,3)。
  • 3.之后对key进行分组,得到key=Movie2 value=[(User1,4),(User2,5),(User3,3),(User4,3)]
  • 4.对Uset进行groupByKey,得到key = user value = Tuple3[Movie2,4,4],Tuple3[Movie1,2,4]之类的。
  • 5.对value进行操作,转换成<Movie1,Movie2><1 10 2 20 2 1 4> <Movie1,Movie3><1 10 3 30 3 1 9>
  • 6.对<Movie1,Movie2>进行分组,转换成:<Movie1,Movie2> [<1 10 2 20 2 1 4> <2 10 3 20 6 4 9>]
  • 7.之后可以通过pearsonCorrelation,cosineCorrelation,jaccardCorrelation公式来求得两个movie之间的相似值

++基于传统spark来实现++

//对String进行切分
 JavaPairRDD<String, Tuple2<String, Integer>> moviesRDD = usersRatings.mapToPair(new PairFunction<String, String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<String, Integer>> call(String s) throws Exception {
                String[] record = s.split("\t");
                String user = record[0];
                String movie = record[1];
                Integer rating = new Integer(record[2]);
                Tuple2<String, Integer> userAndRating = new Tuple2<>(user, rating);
                return new Tuple2<>(movie, userAndRating);
            }
        });
 //生成key=User1 value=Tuple3[Movie2,4,4]  key=User2 value=Tuple3[Movie2,5,4]
JavaPairRDD<String, Tuple3<String, Integer, Integer>> usersRDD = moviesGrouped.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>, String, Tuple3<String, Integer, Integer>>() {
            @Override
            public Iterator<Tuple2<String, Tuple3<String, Integer, Integer>>> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> s) throws Exception {
                ArrayList<Tuple2<String, Integer>> listOfUsersAndRating = new ArrayList<>();
                //保存movie
                String movie = s._1;
                Iterable<Tuple2<String, Integer>> pairsOfUserAndRating = s._2;
                //numberOfRates用于数一个movie有几个
                int numberOfRates = 0;
                for (Tuple2<String, Integer> t2 : pairsOfUserAndRating) {
                    numberOfRates++;
                    listOfUsersAndRating.add(t2);
                }
                ArrayList<Tuple2<String, Tuple3<String, Integer, Integer>>> results = new ArrayList<>();
                for (Tuple2<String, Integer> t2 : listOfUsersAndRating) {
                    String user = t2._1;
                    Integer rating = t2._2;
                    Tuple3<String, Integer, Integer> t3 = new Tuple3<>(movie, rating, numberOfRates);
                    results.add(new Tuple2<>(user, t3));
                }
                return results.iterator();
            }
        });
//<Movie1,Movie2><1 10 2 20 2 1 4> <Movie1,Movie3><1 10 3 30 3 1 9>的格式,之后就可以给其它的公式去求相似度了
 JavaPairRDD<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>> moviePairs = groupedbyUser.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple3<String, Integer, Integer>>>, Tuple2<String, String>,
                Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>() {
            @Override
            public Iterator<Tuple2<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>>
            call(Tuple2<String, Iterable<Tuple3<String, Integer, Integer>>> s) throws Exception {
                String user = s._1;
                Iterable<Tuple3<String, Integer, Integer>> movies = s._2;
                //把第二部分Iterable的Tuple3[Movie2,4,4]变成{movie2,4,4}的形式
                List<Tuple3<String, Integer, Integer>> listOfMovies = toList(movies);
                //返回的大概是以某个字母为开始,然后两两的组合Tuple3[Movie2,4,4],一个大的List包含着包含各个小字母的小的List的三元数组
                List<List<Tuple3<String, Integer, Integer>>> comb2 = Combination.findSortedCombinations(listOfMovies, 2);
                ArrayList<Tuple2<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>> result = new ArrayList<>();
                for (List<Tuple3<String, Integer, Integer>> twoMovies : comb2) {
                    //从组合的 movie对中获取
                    Tuple3<String, Integer, Integer> movie1 = twoMovies.get(0);
                    Tuple3<String, Integer, Integer> movie2 = twoMovies.get(1);
                    //生成两个Movies列表
                    Tuple2<String, String> k3 = new Tuple2<String, String>(movie1._1, movie2._1);
                    Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer> v3 = getTuple7(movie1, movie2);
                    //这一步生成大概像:<Movie1,Movie2><1 10 2 20 2 1 4> <Movie1,Movie3><1 10 3 30 3 1 9>
                    Tuple2<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>> k3v3 = new Tuple2<>(k3, v3);
                    result.add(k3v3);
                }
                return result.iterator();
            }
        });

++基于传统Scala来实现++

//对String进行切分
val userMovieRating = userRatings.map(line =>{
      val tokens = line.split("\\s+")
      // user      movie      rate
      (tokens(0),tokens(1),tokens(2).toInt)
    })
    
//获得每部电影看的人数,对movie进行排序
    val numberOfRatePerMovie = userMovieRating.map(umr => (umr._2,1)).reduceByKey(_+_)
    
 //先从(umr._2, ((umr._1, umr._3),numberOfRatersPerMovie)) =(movie, ((user, rate),count))
    //再到(user,movie,rate,count)
    val userMovieRatingNumberOfRater = userMovieRating.map(umr =>(umr._2,(umr._1,umr._3))).join(numberOfRatePerMovie)
      .map(tuple =>(tuple._2._1._1,tuple._1,tuple._2._1._2,tuple._2._2))

//对user进行排序得到User1 [<Movie1,1,10>,<Movie2,2,20>,<Movie3,3,30>],后面是一个Iterable
    val groupByUser = userMovieRatingNumberOfRater.map(f =>(f._1,(f._2,f._3,f._4))).groupByKey()
    
//生成<Movie2><1 10 2 20 2 1 4>
val moviePairs = groupByUser.flatMap(tuple=>{
      //对movie进行排序,防止重复
      val sorted = tuple._2.toList.sortBy(f => f._1)
      val tuple7 = for{
        movie1 <- sorted
        movie2 <- sorted
        if(movie1._1 < movie2._1)
        ratingProduct = movie1._2 * movie2._2
        rating1Squared = movie1._2 * movie1._2
        rating2Squared = movie2._2 * movie2._2
      }yield {
        ((movie1._1, movie2._1), (movie1._2, movie1._3, movie2._2, movie2._3, ratingProduct, rating1Squared, rating2Squared))
      }
      tuple7
    })
//<Movie1,Movie2> [<1 10 2 20 2 1 4> <2 10 3 20 6 4 9>]
 val moviePairsGrouped = moviePairs.groupByKey()
 
//最后先用foldRight向右折叠求出各类总分,然后再用三个公式求
val result = moviePairsGrouped.mapValues(itr =>{
      val groupSize = itr.size
      //foldRight为向右折叠,把所有的值相加起来
      val (rating1, numOfRaters1, rating2, numOfRaters2, ratingProduct, rating1Squared, rating2Squared) =
        itr.foldRight((List[Int](), List[Int](), List[Int](), List[Int](), List[Int](), List[Int](), List[Int]()))((a, b) =>
          (
            a._1 :: b._1,
            a._2 :: b._2,
            a._3 :: b._3,
            a._4 :: b._4,
            a._5 :: b._5,
            a._6 :: b._6,
            a._7 :: b._7))
      val dotProduct = ratingProduct.sum // sum of ratingProd
      val rating1Sum = rating1.sum // sum of rating1
      val rating2Sum = rating2.sum // sum of rating2
      val rating1NormSq = rating1Squared.sum // sum of rating1Squared
      val rating2NormSq = rating2Squared.sum // sum of rating2Squared
      val maxNumOfumRaters1 = numOfRaters1.max // max of numOfRaters1
      val maxNumOfumRaters2 = numOfRaters2.max // max of numOfRaters2

      val numerator = groupSize * dotProduct - rating1Sum * rating2Sum
      val denominator = math.sqrt(groupSize * rating1NormSq - rating1Sum * rating1Sum) *
        math.sqrt(groupSize * rating2NormSq - rating2Sum * rating2Sum)
      //正则关联度
      val pearsonCorrelation = numerator / denominator
      //欧氏距离
      val cosineCorrelation = dotProduct / (math.sqrt(rating1NormSq) * math.sqrt(rating2NormSq))
      //曼哈顿距离
      val jaccardCorrelation = groupSize.toDouble / (maxNumOfumRaters1 + maxNumOfumRaters2 - groupSize)

      (pearsonCorrelation, cosineCorrelation, jaccardCorrelation)
    })
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容