本章为查找图中的所有三角形
查找三角形的算法的思想
本章实现方式
- 1.基于MapReduce实现
- 2.基于spark来实现
- 3.基于传统Scala来实现
++基于传统MapReduce来实现++
1. MapReduce实现的过程
2. MapReduce的实现类
3. Map端代码实现
public class GraphEdgeMapper extends Mapper<LongWritable,Text,LongWritable,LongWritable>{
LongWritable k2 = new LongWritable();
LongWritable v2 = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String edge = value.toString().trim();
String[] nodes = StringUtils.split(edge, " ");
long nodeA = Long.parseLong(nodes[0]);
long nodeB = Long.parseLong(nodes[1]);
k2.set(nodeA);
v2.set(nodeB);
//生成两个相对边的集合
context.write(k2,v2);
context.write(v2,k2);
}
}
4. Reduce端代码实现
public class GraphEdgeReducer extends Reducer<LongWritable,LongWritable,PairOfLongs,LongWritable> {
PairOfLongs k2 = new PairOfLongs();
LongWritable v2 = new LongWritable();
//这个类发出{(key,value),1} 的对和{(value1,value2),key}的对
@Override
protected void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<Long> list = new ArrayList<>();
v2.set(0);
for(LongWritable value : values){
list.add(value.get());
k2.set(key.get(),value.get());
context.write(k2,v2);
}
Collections.sort(list);
v2.set(key.get());
for(int i=0;i<list.size();i++){
for(int j=i+1;j<list.size();j++){
k2.set(list.get(i),list.get(j));
context.write(k2,v2);
}
}
}
}
public class TriadsReducer extends Reducer<PairOfLongs, LongWritable, Text, Text> {
static final Text EMPTY = new Text("");
@Override
protected void reduce(PairOfLongs key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<Long> list = new ArrayList<>();
boolean haveSeenSpecialNodeZero = false;
for(LongWritable value : values){
long node = value.get();
if(node ==0){
haveSeenSpecialNodeZero = true;
}else{
list.add(node);
}
}
if(haveSeenSpecialNodeZero){
if (list.isEmpty()){return;}
Text triangle = new Text();
for(long node : list){
String triangleAsString = key.getLeftElement()+","+key.getRightElement()+","+node;
triangle.set(triangleAsString);
context.write(triangle,EMPTY);
}
}
else {
// no triangles found
return;
}
}
}
5.最后对三角形的中重复值去重即可
++基于传统spark来实现++
JavaSparkContext ctx = SparkUtil.createJavaSparkContext("count-triangles");
JavaRDD<String> lines = ctx.textFile(inputPath);
JavaPairRDD<Long, Long> edges = lines.flatMapToPair(new PairFlatMapFunction<String, Long, Long>() {
@Override
public Iterator<Tuple2<Long, Long>> call(String s) throws Exception {
String[] tokens = s.split(",");
long start = Long.parseLong(tokens[0]);
long end = Long.parseLong(tokens[1]);
//返回两组边的集合
return Arrays.asList(new Tuple2<Long, Long>(start, end), new Tuple2<Long, Long>(end, start)).iterator();
}
});
//对key进行排序
JavaPairRDD<Long, Iterable<Long>> triads = edges.groupByKey();
//发出三角形表的对
JavaPairRDD<Tuple2<Long, Long>, Long> possibleTriads = triads.flatMapToPair(new PairFlatMapFunction<Tuple2<Long, Iterable<Long>>, Tuple2<Long, Long>, Long>() {
@Override
public Iterator<Tuple2<Tuple2<Long, Long>, Long>> call(Tuple2<Long, Iterable<Long>> s) throws Exception {
Iterable<Long> values = s._2;
List<Tuple2<Tuple2<Long, Long>, Long>> result = new ArrayList<Tuple2<Tuple2<Long, Long>, Long>>();
for (Long value : values) {
Tuple2<Long, Long> k2 = new Tuple2<>(s._1, value);
Tuple2<Tuple2<Long, Long>, Long> k2v2 = new Tuple2<Tuple2<Long, Long>, Long>(k2, 0l);
result.add(k2v2);
}
ArrayList<Long> valuesCopy = new ArrayList<>();
for (Long item : values) {
valuesCopy.add(item);
}
Collections.sort(valuesCopy);
for (int i = 0; i < valuesCopy.size(); i++) {
for (int j = i + 1; j < valuesCopy.size(); j++) {
Tuple2<Long, Long> k2 = new Tuple2<>(valuesCopy.get(i), valuesCopy.get(j));
Tuple2<Tuple2<Long, Long>, Long> k2v2 = new Tuple2<Tuple2<Long, Long>, Long>(k2, s._1);
result.add(k2v2);
}
}
return result.iterator();
}
});
JavaPairRDD<Tuple2<Long,Long>, Iterable<Long>> triadsGrouped = possibleTriads.groupByKey();
JavaRDD<Tuple3<Long, Long, Long>> trianglesWithDuplicates = triadsGrouped.flatMap(new FlatMapFunction<Tuple2<Tuple2<Long, Long>, Iterable<Long>>, Tuple3<Long, Long, Long>>() {
@Override
public Iterator<Tuple3<Long, Long, Long>> call(Tuple2<Tuple2<Long, Long>, Iterable<Long>> s) throws Exception {
Tuple2<Long, Long> key = s._1;
Iterable<Long> values = s._2;
List<Long> list = new ArrayList<>();
boolean haveSeenSpecialNodeZero = false;
for (Long node : values) {
if (node == null) {
haveSeenSpecialNodeZero = true;
} else {
list.add(node);
}
}
List<Tuple3<Long, Long, Long>> result = new ArrayList<Tuple3<Long, Long, Long>>();
if (haveSeenSpecialNodeZero) {
if (list.isEmpty()) {
return result.iterator();
}
for (Long node : list) {
long[] aTraingle = {key._1, key._2, node};
Tuple3<Long, Long, Long> t3 = new Tuple3<Long, Long, Long>(aTraingle[0],
aTraingle[1],
aTraingle[2]);
result.add(t3);
}
} else {
return result.iterator();
}
return result.iterator();
}
});
JavaRDD<Tuple3<Long, Long, Long>> uniqueTriangeles = trianglesWithDuplicates.distinct();
ctx.close();
System.exit(0);
++基于传统Scala来实现++
val sparkConf = new SparkConf().setAppName("CountTriangles")
val sc = new SparkContext(sparkConf)
val input = args(0)
val output = args(1)
val lines = sc.textFile(input)
//对边生成序列
val edges = lines.flatMap(line =>{
val tokens = line.split("\\s+")
val start = tokens(0).toLong
val end = tokens(1).toLong
(start,end)::(end,start):: Nil
})
val triads = edges.groupByKey();
val possibleTriads = triads.flatMap(tuple =>{
val values = tuple._2.toList
val result = values.map(v=>{
((tuple._1,v),0L)
})
//对后面的value进行排序
val sorted = values.sorted
val combinations = sorted.combinations(2).map{case Seq(a,b)=>(a,b)}.toList
combinations.map((_,tuple._1)) ::: result
})
val triadsGrouped =possibleTriads.groupByKey()
val triangleWithDuplicates = triadsGrouped.flatMap(tg =>{
val key = tg._1
val value = tg._2
val list = value.filter(_ !=0)
if(value.exists(_ ==0)){
if (list.isEmpty) Nil
list.map(l =>{
val sorteTriangle = (key._1 :: key._2 :: Nil).sorted
(sorteTriangle(0),sorteTriangle(1),sorteTriangle(2))
})
}else Nil
})
val uniqueTriangles = triangleWithDuplicates distinct
// For debugging purpose
uniqueTriangles.foreach(println)
uniqueTriangles.saveAsTextFile(output)
// done
sc.stop()