- 先获得好友的列表,一般为:(100,100 200 300 400)之类
- 进行转换变成[(100,200),(100,100 200 300 400)],[(100,300),(100,100 200 300 400)]之类。
- 之后对key进行排序
- 最后寻找合并之后的key中的value中相同的即可
- 基于传统mapreduce实现
- 基于传统spark实现
- 基于传统的Scala实现
static String getFriends(String[] tokens) {
if (tokens.length == 2) {
return "";
StringBuilder builder = new StringBuilder();
for (int i = 1; i < tokens.length; i++) {
if (i < (tokens.length - 1)) {
return builder.toString();
static String buildSortedKey(String person, String friend) {
long p = Long.parseLong(person);
long f = Long.parseLong(friend);
if (p < f) {
return person + "," + friend;
} else {
return friend + "," + person;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// parse input, delimiter is a single space
String[] tokens = StringUtils.split(value.toString(), " ");
// create reducer value
String friends = getFriends(tokens);
String person = tokens[0];
for (int i = 1; i < tokens.length; i++) {
String friend = tokens[i];
String reducerKeyAsString = buildSortedKey(person, friend);
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Integer> map = new HashMap<String, Integer>();
Iterator<Text> iterator = values.iterator();
int numOfValues = 0;
while (iterator.hasNext()) {
String friends = iterator.next().toString();
if (friends.equals("")) {
context.write(key, new Text("[]"));
addFriends(map, friends);
// now iterate the map to see how many have numOfValues
List<String> commonFriends = new ArrayList<String>();
for (Map.Entry<String, Integer> entry : map.entrySet()) {
if (entry.getValue() == numOfValues) {
// sen it to output
context.write(key, new Text(commonFriends.toString()));
static void addFriends(Map<String, Integer> map, String friendsList) {
String[] friends = StringUtils.split(friendsList, ",");
for (String friend : friends) {
Integer count = map.get(friend);
if (count == null) {
map.put(friend, 1);
} else {
map.put(friend, ++count);
public static void main(String[] args) throws Exception {
if (args.length < 1) {
// Spark master URL:
// format: spark://<spark-master-host-name>:7077
// example: spark://myserver00:7077
System.err.println("Usage: FindCommonFriends <input-file>");
//String sparkMasterURL = args[0];
String hdfsInputFileName = args[0];
// create context object
JavaSparkContext ctx = SparkUtil.createJavaSparkContext("FindCommonFriends");
// create the first RDD from input file
JavaRDD<String> records = ctx.textFile(hdfsInputFileName, 1);
// debug0
List<String> debug0 = records.collect();
for (String t : debug0) {
System.out.println("debug0 record="+t);
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<Tuple2<Long,Long>,Iterable<Long>> pairs =
// T K V
records.flatMapToPair(new PairFlatMapFunction<String, Tuple2<Long,Long>, Iterable<Long>>() {
public Iterator<Tuple2<Tuple2<Long,Long>,Iterable<Long>>> call(String s) {
//String的输入格式为100,100 200 400 300 etc
String[] tokens = s.split(",");
long person = Long.parseLong(tokens[0]);
String friendsAsString = tokens[1];
String[] friendsTokenized = friendsAsString.split(" ");
if (friendsTokenized.length == 1) {
Tuple2<Long,Long> key = buildSortedTuple(person, Long.parseLong(friendsTokenized[0]));
return Arrays.asList(new Tuple2<Tuple2<Long,Long>,Iterable<Long>>(key, new ArrayList<Long>())).iterator();
List<Long> friends = new ArrayList<Long>();
for (String f : friendsTokenized) {
List<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>> result =
new ArrayList<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>>();
for (Long f : friends) {
Tuple2<Long,Long> key = buildSortedTuple(person, f);
result.add(new Tuple2<Tuple2<Long,Long>, Iterable<Long>>(key, friends));
return result.iterator();
// 输出Tuple2
List<Tuple2<Tuple2<Long, Long> ,Iterable<Long>>> debug1 = pairs.collect();
for (Tuple2<Tuple2<Long,Long>,Iterable<Long>> t2 : debug1) {
System.out.println("debug1 key="+t2._1+"\t value="+t2._2);
JavaPairRDD<Tuple2<Long, Long>, Iterable<Iterable<Long>>> grouped = pairs.groupByKey();
// debug2
List<Tuple2<Tuple2<Long, Long> ,Iterable<Iterable<Long>>>> debug2 = grouped.collect();
for (Tuple2<Tuple2<Long,Long>, Iterable<Iterable<Long>>> t2 : debug2) {
System.out.println("debug2 key="+t2._1+"\t value="+t2._2);
// Find intersection of all List<List<Long>>
// mapValues[U](f: (V) => U): JavaPairRDD[K, U]
// Pass each value in the key-value pair RDD through a map function without changing the keys;
// this also retains the original RDD's partitioning.
JavaPairRDD<Tuple2<Long, Long>, Iterable<Long>> commonFriends =
grouped.mapValues(new Function< Iterable<Iterable<Long>>, // input
Iterable<Long> // output
>() {
public Iterable<Long> call(Iterable<Iterable<Long>> s) {
Map<Long, Integer> countCommon = new HashMap<Long, Integer>();
int size = 0;
for (Iterable<Long> iter : s) {
List<Long> list = iterableToList(iter);
if ((list == null) || (list.isEmpty())) {
for (Long f : list) {
Integer count = countCommon.get(f);
if (count == null) {
countCommon.put(f, 1);
else {
countCommon.put(f, ++count);
// if countCommon.Entry<f, count> == countCommon.Entry<f, s.size()>
// then that is a common friend
List<Long> finalCommonFriends = new ArrayList<Long>();
for (Map.Entry<Long, Integer> entry : countCommon.entrySet()){
if (entry.getValue() == size) {
return finalCommonFriends;
// debug3
List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> debug3 = commonFriends.collect();
for (Tuple2<Tuple2<Long, Long>, Iterable<Long>> t2 : debug3) {
System.out.println("debug3 key="+t2._1+ "\t value="+t2._2);
static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
if (a < b) {
return new Tuple2<Long, Long>(a,b);
else {
return new Tuple2<Long, Long>(b,a);
static List<Long> iterableToList(Iterable<Long> iterable) {
List<Long> list = new ArrayList<Long>();
for (Long item : iterable) {
return list;
def main(args: Array[String]): Unit = {
if (args.size < 2) {
println("Usage: FindCommonFriends <input-dir> <output-dir>")
val sparkConf = new SparkConf().setAppName("FindCommonFriends")
val sc = new SparkContext(sparkConf)
val input = args(0)
val output = args(1)
val records = sc.textFile(input)
val pairs = records.flatMap(s => {
val tokens = s.split(",")
val person = tokens(0).toLong
val friends = tokens(1).split("\\s+").map(_.toLong).toList
val result = for {
i <- 0 until friends.size
friend = friends(i)
} yield {
if (person < friend)
((person, friend), friends)
((friend, person), friends)
val grouped = pairs.groupByKey()
val commonFriends = grouped.mapValues(iter => {
val friendCount = for {
list <- iter
if !list.isEmpty
friend <- list
} yield ((friend, 1))
//分到相用组eg:[a,,1,1] 把这个序列打平编程[a,,(1,1)]
friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1)
// save the result to the file
//Format result for easy viewing
val formatedResult = commonFriends.map(
f => s"(${f._1._1}, ${f._1._2})\t${f._2.mkString("[", ", ", "]")}"
// done!