常用图算法实现--Spark

使用Spark实现PageRank,强连通分量等图算法

PageRank

数据准备

边:

1 2
1 15
2 3
2 4
2 5
2 6
2 7
3 13
4 2
5 11
5 12
6 1
6 7
6 8
7 1
7 8
8 1
8 9
8 10
9 14
9 1
10 1
10 13
11 12
11 1
12 1
13 14
14 12
15 1

网页:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

将这两个文件放入HDFS:

hdfs dfs -mkdir input/PageRank
hdfs dfs -put links.txt input/PageRank
hdfs dfs -put pages.txt input/PageRank

编写程序

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import static java.lang.Math.abs;


public class PageRank {
    private static int MaxIteration = 100;
    private static final double DAMPENING_FACTOR = 0.85;
    private static final double EPSILON = 0.0001;

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("PageRank");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");

        String linksFile = "hdfs:///user/hadoop/input/PageRank/links.txt";
        String pagesFile = "hdfs:///user/hadoop/input/PageRank/pages.txt";
        String rankFile = "hdfs:///user/hadoop/output/Graph/SparkPageRank";

        /**
         *  neighborRDD: (from, s)
         *  linksRDD: tuple (from, [to,1/m])
         *  pageRDD: vertex
         *  pageRankRDD: (point, 1/n)
         */


        JavaPairRDD<Integer, Integer> neighborRDD = sc.textFile(linksFile)
                .mapToPair(
                        line -> new Tuple2<>(
                                Integer.parseInt(line.split(" ")[0]), 1))
                .reduceByKey((x, y) -> x + y);

        JavaPairRDD<Integer, Tuple2<Integer, Integer>> linksRDD = sc.textFile(linksFile)
                .mapToPair(
                        line -> new Tuple2<>(
                                Integer.parseInt(line.split(" ")[0]),
                                Integer.parseInt(line.split(" ")[1])
                        ))
                .join(neighborRDD);

        JavaRDD<Integer> pagesRDD = sc.textFile(pagesFile).map(line -> Integer.parseInt(line));
        long pageCount = pagesRDD.count();
        JavaPairRDD<Integer, Double> pageRankRDD = pagesRDD.mapToPair(
                vertex -> new Tuple2<>(vertex, 1.0 / pageCount)
        );

        int count = 0;
        while (count < MaxIteration) {
            JavaPairRDD<Integer, Double> NewPageRankRDD = linksRDD.join(pageRankRDD)
                    .mapToPair(
                            new PairFunction<Tuple2<Integer, Tuple2<Tuple2<Integer, Integer>, Double>>, Integer, Double>() {
                                @Override
                                public Tuple2<Integer, Double> call(Tuple2<Integer, Tuple2<Tuple2<Integer, Integer>, Double>> ans) throws Exception {
//                               // [ toNode, fraction * rank]
                                    return new Tuple2<>(ans._2._1._1, ans._2._2/ans._2._1._2);
                                }
                            })
                    .reduceByKey((v1, v2) -> v1 + v2)
                    .mapValues(
                            new Function<Double, Double>() {
                                double dampening = DAMPENING_FACTOR;
                                double randomJump = (1 - DAMPENING_FACTOR) / pageCount;

                                @Override
                                public Double call(Double value) throws Exception {
                                    value = value * dampening + randomJump;
                                    return value;
                                }
                            }
                    );
            count++;
            JavaPairRDD<Integer, Tuple2<Double, Double>> compare = pageRankRDD.join(NewPageRankRDD).filter(each -> abs(each._2._1 - each._2._2) > EPSILON);
            if (compare.isEmpty() || count > MaxIteration)
                break;
            pageRankRDD = NewPageRankRDD;

        }

        pageRankRDD.saveAsTextFile(rankFile);

    }
}

思路:

  1. 全部使用Lambda表达式进行,首先需要找到所有的边的条数,初始化Rank值
  2. 然后使用Join进行合并,并计算下一轮Rank
  3. 使用DAMPENING_FACTOR进行随机跳转

运行

spark-submit  --class PageRank PageRank-1.0.jar
hdfs dfs -cat output/Graph/SparkPageRank/*

结果为:

54622233513

ConnectedComponents

数据准备

提供基本数据集,与PageRank一样,指定顶点和边

vertices.txt

准备一些顶点,例如1-16

edges.txt

准备一些连接边:

1 2
2 3
2 4
3 5
6 7
8 9
8 10
5 11
11 12
10 13
9 14
13 14
1 15
16 1

将这两个文件放入HDFS:

hdfs dfs -mkdir input/ConnectedComponents
hdfs dfs -put edges.txt input/ConnectedComponents
hdfs dfs -put vertices.txt input/ConnectedComponents

编写程序

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import static java.lang.StrictMath.min;

public class ConnectedComponents {

    public static int MaxIteration = 100;

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ConnectedComponents");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");

        String edgesFile = "hdfs:///user/hadoop/input/ConnectedComponents/edges.txt";
        String verticesFile = "hdfs:///user/hadoop/input/ConnectedComponents/vertices.txt";
        String outFile = "hdfs:///user/hadoop/output/Graph/SparkConnectedComponents";


        /**
         * edgesRDD: [x,y]
         * componentsRDD: [x,x] init
         */

        JavaPairRDD<Integer, Integer> edgesRDD = sc.textFile(edgesFile)
                .mapToPair(
                        line -> new Tuple2<>(
                                Integer.parseInt(line.split(" ")[0]),
                                Integer.parseInt(line.split(" ")[1])
                        )
                );

        JavaPairRDD<Integer, Integer> componentsRDD = sc.textFile(verticesFile)
                .mapToPair(
                        line -> new Tuple2<>(Integer.parseInt(line), Integer.parseInt(line))
                );

        int count = 0;

        while (count < MaxIteration) {
            JavaPairRDD<Integer, Integer> newcomponentsRDD = componentsRDD.join(edgesRDD)
                    .mapToPair(
                            x -> new Tuple2<>(x._2._2, x._2._1)
                    )
                    .reduceByKey(
                            (v1, v2) -> min(v1, v2)
                    );

            JavaPairRDD<Integer, Tuple2<Integer, Integer>> filterRDD = newcomponentsRDD.join(componentsRDD)
                    .filter(
                            each -> each._2._1 < each._2._2
                    );

            if (filterRDD.isEmpty())
                break;

            // update to componentsRDD
            componentsRDD = componentsRDD.leftOuterJoin(newcomponentsRDD).
                    mapValues(
                            v -> min(v._1, v._2.orElse(v._1))
                    );

            count++;
        }

        componentsRDD.saveAsTextFile(outFile);
    }
}

思路:

  1. 首先需要将每个点映射成自己的强连通分支
  2. 每次迭代,更新与自己相连的点的强连通分支,取最小值
  3. 使用左连接更新原始的强连通分支

运行

spark-submit  --class ConnectedComponents ConnectedComponents-1.0.jar
hdfs dfs -cat output/Graph/SparkConnectedComponents/*

查看结果:

54622728559

SingleSourceShortestPaths

数据准备

首先我们需要准备边和点

边:

1 2 12.0
1 3 13.0
2 3 23.0
3 4 34.0
3 5 35.0
4 5 45.0
5 1 51.0

点:

1
2
3
4
5

将这两个文件放入HDFS:

hdfs dfs -mkdir input/SingleSourceShortestPaths
hdfs dfs -put edges.txt input/SingleSourceShortestPaths
hdfs dfs -put vertices.txt input/SingleSourceShortestPaths

编写程序

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import javax.validation.constraints.Max;

import static java.lang.StrictMath.min;

public class SingleSourceShortestPaths {
    public static int sourceVerticeID = 1;
    public static int MaxIteration = 100;

    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("ConnectedComponents");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");

        String edgesFile = "hdfs:///user/hadoop/input/SingleSourceShortestPaths/edges.txt";
        String verticesFile = "hdfs:///user/hadoop/input/SingleSourceShortestPaths/vertices.txt";
        String outFile = "hdfs:///user/hadoop/output/Graph/SparkSingleSourceShortestPaths";

        /**
         * edgesRDD: [from, to, dis ]
         * verticesRDD: [vertice, dis]
         */


        JavaPairRDD<Integer, Tuple2<Integer, Double>> edgesRDD = sc.textFile(edgesFile)
                .mapToPair(
                        line -> {
                            int from = Integer.parseInt(line.split(" ")[0]);
                            int to = Integer.parseInt(line.split(" ")[1]);
                            double dis = Double.parseDouble(line.split(" ")[2]);
                            return new Tuple2<>(from, new Tuple2<>(to, dis));
                        }
                );

        JavaPairRDD<Integer, Double> verticesRDD = sc.textFile(verticesFile)
                .mapToPair(
                        line -> {
                            int vertice = Integer.parseInt(line);
                            if (vertice == sourceVerticeID)
                                return new Tuple2<>(vertice, 0.0);
                            return new Tuple2<>(vertice, Double.POSITIVE_INFINITY);
                        }
                );

        int count = 0;
        while (count < MaxIteration) {
            // get new dis
            JavaPairRDD<Integer, Double> newVerticesRDD = verticesRDD
                    .join(edgesRDD)
                    .mapToPair(
                            line -> {
                                if (line._2._1 != Double.POSITIVE_INFINITY)
                                    return new Tuple2<>(line._2._2._1, line._2._1 + line._2._2._2);
                                return new Tuple2<>(line._2._2._1, Double.POSITIVE_INFINITY);
                            }
                    ).reduceByKey(
                            (v1, v2) -> min(v1, v2));

            JavaPairRDD<Integer, Tuple2<Double, Double>> filterRDD = newVerticesRDD.join(verticesRDD)
                    .filter(
                            each -> each._2._1 < each._2._2);

            if (filterRDD.isEmpty())
                break;

            // update to verticesRDD
            verticesRDD = verticesRDD.leftOuterJoin(newVerticesRDD).
                    mapValues(
                            v -> min(v._1, v._2.orElse(v._1)));
        }
        verticesRDD.saveAsTextFile(outFile);

    }
}

思路:

  1. 首先需要初始化每个顶点的距离,将原始点设置为0,其余设置为无穷
  2. 每次迭代得到新的顶点距离,并使用reduceByKey最小化,比较是否更新
  3. 然后将更新得到的顶点距离加入原始RDD中

运行

spark-submit  --class SingleSourceShortestPaths SingleSourceShortestPaths-1.0.jar
hdfs dfs -cat output/Graph/SparkSingleSourceShortestPaths/*

查看结果:

54623040420
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容