本文将通过一个简单样例来讲解,Spark GraphX中的一些基本概念和常规操作。
样例
首先需要在pom中配置GraphX的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.10</artifactId>
<version>1.6.3</version>
</dependency>
完整的样例代码
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object HelloGraphX {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Hello GraphX")
val sc = new SparkContext(conf)
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
println("Count all users which are prof:"+validGraph.vertices.filter { case (id, (name, pos)) => pos == "prof" }.count)
println("Count all the edges where src < dst:"+graph.edges.filter(e => e.srcId < e.dstId).count)
sc.stop()
}
}
最后会将结果输出到driver的日志中
rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the colleague of John Doe
franklin is the pi of jgonzal
......
istoica is the colleague of franklin
rxin is the collab of jgonzal
franklin is the advisor of rxin
franklin is the pi of jgonzal
......
Count all users which are prof:2
......
Count all the edges where src < dst:3
详解
-
样例中的第一步是构建了一个图,这里用到了最简单的构建图的方式,就是通过Graph的构造方法,分别将顶点的RDD和边的RDD作为参数传入。构建的关系图如下:
由于边的定义中有Edge(5L, 0L, "colleague")
,但是并没有定义0这个顶点,所以在构建图的时候会使用Graph构造方法中的第三个参数,将顶点0的属性默认设置为("John Doe", "Missing")
。
- 过滤掉属性为"Missing"的顶点,得到一个新的图,然后通过下面的代码,将这个图遍历一次
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
- 通过
graph.vertices
和graph.edges
可以将一个图分别拆分成顶点视图和边的视图,他们的返回值分别是VertexRDD
和EdgeRDD
。所以通过RDD的一些方法,可以进一步对顶点或者边进行过滤。用户中属性是pro的只有5和2,所以Count all users which are prof:2
;同样在Edge Table中DstId大于SrcId的有3个。
总结
- 通过一个简单的例子,展示了如何使用GraphX的API构建一个图,以及基本的操作
- 本文测试环境是基于HDP-2.6.0.3,文中的样例参考http://spark.apache.org/docs/1.6.3/graphx-programming-guide.html
- 在生产环境中构建图的数据源更多可能是来自某些外部的文件,也就是说需要通过下面这种方式加载一个图,这种情况的完整代码请参加附录
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("/tmp/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "/tmp/followers.txt")
附录
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
object GraphXExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"GraphX Example")
val sc = new SparkContext(conf)
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("/tmp/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "/tmp/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(s"The result is :${userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")}")
sc.stop()
}
}
样例中有两个数据文件
users.txt
1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys
followers.txt
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7
要将这两个数据文件放到hdfs的/tmp
路径下面。最后会将结果输出到driver的日志中
The result is :(1,(1.453834747463902,List(BarackObama, Barack Obama)))
(2,(1.3857595353443166,List(ladygaga, Goddess of Love)))
(7,(1.2892158818481694,List(odersky, Martin Odersky)))
(3,(0.9936187772892124,List(jeresig, John Resig)))
(6,(0.697916749785472,List(matei_zaharia, Matei Zaharia)))