本章为推荐引擎
本章为基于电影内容的推荐,假设输入为<用户,电影,评分>,输入为<电影1,电影2><三种算法的相似度>。
本章实现方式
- 基于传统spark来实现
- 基于传统Scala来实现
本章实现方式的思路
- spark:
- 1.先创建JavaSparkContext,textFile读入文件。
- 2.对String进行切分,转换成key = Movie3 value=(User1,3)。
- 3.之后对key进行分组,得到key=Movie2 value=[(User1,4),(User2,5),(User3,3),(User4,3)]
- 4.对Uset进行groupByKey,得到key = user value = Tuple3[Movie2,4,4],Tuple3[Movie1,2,4]之类的。
- 5.对value进行操作,转换成<Movie1,Movie2><1 10 2 20 2 1 4> <Movie1,Movie3><1 10 3 30 3 1 9>
- 6.对<Movie1,Movie2>进行分组,转换成:<Movie1,Movie2> [<1 10 2 20 2 1 4> <2 10 3 20 6 4 9>]
- 7.之后可以通过pearsonCorrelation,cosineCorrelation,jaccardCorrelation公式来求得两个movie之间的相似值
++基于传统spark来实现++
//对String进行切分
JavaPairRDD<String, Tuple2<String, Integer>> moviesRDD = usersRatings.mapToPair(new PairFunction<String, String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> call(String s) throws Exception {
String[] record = s.split("\t");
String user = record[0];
String movie = record[1];
Integer rating = new Integer(record[2]);
Tuple2<String, Integer> userAndRating = new Tuple2<>(user, rating);
return new Tuple2<>(movie, userAndRating);
}
});
//生成key=User1 value=Tuple3[Movie2,4,4] key=User2 value=Tuple3[Movie2,5,4]
JavaPairRDD<String, Tuple3<String, Integer, Integer>> usersRDD = moviesGrouped.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>, String, Tuple3<String, Integer, Integer>>() {
@Override
public Iterator<Tuple2<String, Tuple3<String, Integer, Integer>>> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> s) throws Exception {
ArrayList<Tuple2<String, Integer>> listOfUsersAndRating = new ArrayList<>();
//保存movie
String movie = s._1;
Iterable<Tuple2<String, Integer>> pairsOfUserAndRating = s._2;
//numberOfRates用于数一个movie有几个
int numberOfRates = 0;
for (Tuple2<String, Integer> t2 : pairsOfUserAndRating) {
numberOfRates++;
listOfUsersAndRating.add(t2);
}
ArrayList<Tuple2<String, Tuple3<String, Integer, Integer>>> results = new ArrayList<>();
for (Tuple2<String, Integer> t2 : listOfUsersAndRating) {
String user = t2._1;
Integer rating = t2._2;
Tuple3<String, Integer, Integer> t3 = new Tuple3<>(movie, rating, numberOfRates);
results.add(new Tuple2<>(user, t3));
}
return results.iterator();
}
});
//<Movie1,Movie2><1 10 2 20 2 1 4> <Movie1,Movie3><1 10 3 30 3 1 9>的格式,之后就可以给其它的公式去求相似度了
JavaPairRDD<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>> moviePairs = groupedbyUser.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple3<String, Integer, Integer>>>, Tuple2<String, String>,
Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>() {
@Override
public Iterator<Tuple2<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>>
call(Tuple2<String, Iterable<Tuple3<String, Integer, Integer>>> s) throws Exception {
String user = s._1;
Iterable<Tuple3<String, Integer, Integer>> movies = s._2;
//把第二部分Iterable的Tuple3[Movie2,4,4]变成{movie2,4,4}的形式
List<Tuple3<String, Integer, Integer>> listOfMovies = toList(movies);
//返回的大概是以某个字母为开始,然后两两的组合Tuple3[Movie2,4,4],一个大的List包含着包含各个小字母的小的List的三元数组
List<List<Tuple3<String, Integer, Integer>>> comb2 = Combination.findSortedCombinations(listOfMovies, 2);
ArrayList<Tuple2<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>> result = new ArrayList<>();
for (List<Tuple3<String, Integer, Integer>> twoMovies : comb2) {
//从组合的 movie对中获取
Tuple3<String, Integer, Integer> movie1 = twoMovies.get(0);
Tuple3<String, Integer, Integer> movie2 = twoMovies.get(1);
//生成两个Movies列表
Tuple2<String, String> k3 = new Tuple2<String, String>(movie1._1, movie2._1);
Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer> v3 = getTuple7(movie1, movie2);
//这一步生成大概像:<Movie1,Movie2><1 10 2 20 2 1 4> <Movie1,Movie3><1 10 3 30 3 1 9>
Tuple2<Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>> k3v3 = new Tuple2<>(k3, v3);
result.add(k3v3);
}
return result.iterator();
}
});
++基于传统Scala来实现++
//对String进行切分
val userMovieRating = userRatings.map(line =>{
val tokens = line.split("\\s+")
// user movie rate
(tokens(0),tokens(1),tokens(2).toInt)
})
//获得每部电影看的人数,对movie进行排序
val numberOfRatePerMovie = userMovieRating.map(umr => (umr._2,1)).reduceByKey(_+_)
//先从(umr._2, ((umr._1, umr._3),numberOfRatersPerMovie)) =(movie, ((user, rate),count))
//再到(user,movie,rate,count)
val userMovieRatingNumberOfRater = userMovieRating.map(umr =>(umr._2,(umr._1,umr._3))).join(numberOfRatePerMovie)
.map(tuple =>(tuple._2._1._1,tuple._1,tuple._2._1._2,tuple._2._2))
//对user进行排序得到User1 [<Movie1,1,10>,<Movie2,2,20>,<Movie3,3,30>],后面是一个Iterable
val groupByUser = userMovieRatingNumberOfRater.map(f =>(f._1,(f._2,f._3,f._4))).groupByKey()
//生成<Movie2><1 10 2 20 2 1 4>
val moviePairs = groupByUser.flatMap(tuple=>{
//对movie进行排序,防止重复
val sorted = tuple._2.toList.sortBy(f => f._1)
val tuple7 = for{
movie1 <- sorted
movie2 <- sorted
if(movie1._1 < movie2._1)
ratingProduct = movie1._2 * movie2._2
rating1Squared = movie1._2 * movie1._2
rating2Squared = movie2._2 * movie2._2
}yield {
((movie1._1, movie2._1), (movie1._2, movie1._3, movie2._2, movie2._3, ratingProduct, rating1Squared, rating2Squared))
}
tuple7
})
//<Movie1,Movie2> [<1 10 2 20 2 1 4> <2 10 3 20 6 4 9>]
val moviePairsGrouped = moviePairs.groupByKey()
//最后先用foldRight向右折叠求出各类总分,然后再用三个公式求
val result = moviePairsGrouped.mapValues(itr =>{
val groupSize = itr.size
//foldRight为向右折叠,把所有的值相加起来
val (rating1, numOfRaters1, rating2, numOfRaters2, ratingProduct, rating1Squared, rating2Squared) =
itr.foldRight((List[Int](), List[Int](), List[Int](), List[Int](), List[Int](), List[Int](), List[Int]()))((a, b) =>
(
a._1 :: b._1,
a._2 :: b._2,
a._3 :: b._3,
a._4 :: b._4,
a._5 :: b._5,
a._6 :: b._6,
a._7 :: b._7))
val dotProduct = ratingProduct.sum // sum of ratingProd
val rating1Sum = rating1.sum // sum of rating1
val rating2Sum = rating2.sum // sum of rating2
val rating1NormSq = rating1Squared.sum // sum of rating1Squared
val rating2NormSq = rating2Squared.sum // sum of rating2Squared
val maxNumOfumRaters1 = numOfRaters1.max // max of numOfRaters1
val maxNumOfumRaters2 = numOfRaters2.max // max of numOfRaters2
val numerator = groupSize * dotProduct - rating1Sum * rating2Sum
val denominator = math.sqrt(groupSize * rating1NormSq - rating1Sum * rating1Sum) *
math.sqrt(groupSize * rating2NormSq - rating2Sum * rating2Sum)
//正则关联度
val pearsonCorrelation = numerator / denominator
//欧氏距离
val cosineCorrelation = dotProduct / (math.sqrt(rating1NormSq) * math.sqrt(rating2NormSq))
//曼哈顿距离
val jaccardCorrelation = groupSize.toDouble / (maxNumOfumRaters1 + maxNumOfumRaters2 - groupSize)
(pearsonCorrelation, cosineCorrelation, jaccardCorrelation)
})