GraphX,GraphSON格式转换工具

摘要

转换器实现了:1.根据用户输入的SparkContext,和文件路径,读取GraphSON格式文件,转换为GraphX所接受的graphRDD;2.用户输入GraphX的graphRDD,在指定文件路径输出GraphSON格式文件。

相关版本

Apache TinkerPop 3.3.3
scala 2.11.8
spark-graphx 2.11

提示

假设读者较为熟悉GraphX中RDD相关操作,TinkerPop Gremlin图的遍历操作

背景

TinkerPop是一种图计算框架,用于图数据库和图分析处理。工业界已普遍采用TinkerPop进行图的存储,但TinkerPop实现的图计算的接口很少,不便于直接进行图分析处理。Spark GraphX已经实现了很多图计算的算法接口,但TinkerPop中无法直接使用。

如果在GraphX和TinkerPop之间存在桥梁,把TinkerPop中存储的图转换成GraphX图的格式,就可以利用GraphX丰富的图计算进行处理,最后将处理后的图转换回TinkerPop中进行存储。TinkerPop也没有直接转换为GraphX所需图格式的接口,如何自行搭建桥梁?我们利用TinkerPop输出读入文件的接口,将文件作为中转站,通过解析或构造文件内容,和GraphX的图进行相互转换。TinkerPop支持的文件格式很多,有Gryo,GraphSON,Script。我们选择GraphSON,是以json格式,文件便于我们直接查看,也便于程序解析。

现实中,普遍存在大量数据,如果数据量很大的情况下,如何确保数据转换间的高效,让数据在桥梁上快速往返。我们发现TinkerGraph中,提供了HDFS中的GraphSON文件到RDD之间转换的接口,从而利用Spark,Hadoop进行大数据高效处理。

总的来说,我们的工作意义在于结合两种系统各自的优点,TinkerPop图数据库存储的广泛性和GraphX图计算接口的多样性,从而满足更广泛的需求。

TinkerPop数据结构StarGraph

转换器中涉及到的重要数据结构就是StarGraph,它本身是一个很小的图,以单节点为中心,包含它自身的点属性,所有的邻接边(包括出边,入边)属性,以及邻接点的ID。每个点维护这样一个StarGraph,所有的点就构成了TinkerPop完整的图。
图对应一个完整的json文件,其中每一行就恰好是一个StarGraph。官方指导手册给出如下实例:
http://kelvinlawrence.net/book/Gremlin-Graph-Guide.html#_adjacency_list_format_graphson

{"id":0,"label":"airport","inE":{"route":[{"id":17,"outV":4}]}, ... }
{"id":2,"label":"airport","inE":{"route":[{"id":18,"outV":4}, ... ]}}
{"id":4,"label":"airport","inE":{"route":[{"id":15,"outV":2}]}, ... }
{"id":6,"label":"airport","inE":{"route":[{"id":16,"outV":4}, ... ]}}
{"id":8,"label":"airport","inE":{"route":[{"id":11,"outV":0}]}, ... }

其中每一行代表一个StarGraph,形如其名,包含中心点的id,标签,点属性,入边,出边,在转换器中解析,构造GraphSON文件都以每一个StarGraph为基本单位,展开如下:

{
    "id": 0,
    "label": "airport",
    "inE": {
        "route": [{
            "id": 17,
            "outV": 4
        }]
    },
    "outE": {
        "route": [{
            "id": 10,
            "inV": 2
        }, {
            "id": 11,
            "inV": 8
        }]
    },
    "properties": {
        "code": [{
            "id": 1,
            "value": "AUS"
        }]
    }
}

处理过程

从GraphSON转换为GraphX

大致上分为两步,第一步,系统有自带读取GraphSON文件的接口,得到以StarGraph为核心的中间RDD;第二步,处理中间RDD,分别生成GraphX所需要的VertexRDD和RDD[Edge]

读入GraphSON转换为JavaPairRDD

关键API
org.apache.tinkerpop.gremlin.spark.structure.io
Class InputFormatRDD

Modifier and Type Method and Description
<any> readGraphRDD(Configuration configuration, JavaSparkContext sparkContext)
Read the graphRDD from the underlying graph system.

输入部分代码如下,inputGraphFilePath为用户输入的文件路径,jsc为用户传入的sparkContext。官方文档中没有给出返回类型的明确格式,查看源码后得知,返回得到的vertexWritableJavaPairRDD中每一个元素的格式是 Tuple2[AnyRef, VertexWritable],其中VertexWritable通过get方法就可以返回Vertex类型

val inputGraphConf = new BaseConfiguration
inputGraphConf.setProperty("gremlin.graph", classOf[HadoopGraph].getName)
inputGraphConf.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[GraphSONInputFormat].getName)
inputGraphConf.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputGraphFilePath)
inputGraphConf.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputGraphFilePath)
val jsc = JavaSparkContext.fromSparkContext(sc)
val graphRDDInput = new InputFormatRDD
val vertexWritableJavaPairRDD = graphRDDInput.readGraphRDD(inputGraphConf, jsc)
解析JavaPairRDD

从JavaPairRDD中得到的Vertex类型,每一个Vertex可以视作一个StarGraph,正如上面所提到的,从StarGraph中利用遍历对象可以获取到中心点ID,属性,所有边属性,邻接点ID,这些信息足以构建GraphX所需的VertexRDD和RDD[Edge]。

涉及到StarGraph构建API
org.apache.tinkerpop.gremlin.structure.util.star.StarGraph
Class StarGraph

Modifier and Type Method and Description
static StarGraph of(Vertex vertex)
Creates a new StarGraph from a Vertex.

构造VertexRDD的过程如下:

val vertexRDD:RDD[(Long,HashMap[String,java.io.Serializable])] = vertexWritableJavaPairRDD.rdd.map((tuple2: Tuple2[AnyRef, VertexWritable]) => {
 
      // Get the center vertex
      val v = tuple2._2.get
      val g = StarGraph.of(v)
      // In case the vertex id in TinkerGraph is not long type
      val vid = convertStringIDToLongID(v.id().toString)
 
      // Pass the vertex properties to GraphX vertex value map and remain the original vertex id
      var graphxValueMap : HashMap[String,java.io.Serializable] = new HashMap[String,java.io.Serializable]()
      graphxValueMap.put("originalID",v.id().toString)
      graphxValueMap.putAll(g.traversal.V(v.id).valueMap().next(1).get(0))
      (vid,graphxValueMap)
    })

注:
1.JavaPairRDD转换到rdd.RDD可以使用自带的rdd方法;
2.TinkerGraph中点ID属性类型可以是整形,也可以是字符串,在这里统一按字符串处理,使用Hashing工具转换为GraphX VertexID接收的Long型,避免信息丢失,原有TinkerGraph ID作为点属性以“originalID”为键存储在HashMap中。


构造RDD[Edge]的过程如下

val edge = vertexWritableJavaPairRDD.rdd.flatMap((tuple2: Tuple2[AnyRef, VertexWritable]) => {
      val v = tuple2._2.get
      val g = StarGraph.of(v)
      val edgelist:util.List[Edge] = g.traversal.V(v.id).outE().toList
 
      // Put all edges of the center vertex into the list
      val list = new collection.mutable.ArrayBuffer[graphx.Edge[util.HashMap[String,java.io.Serializable]]]()
      var x = 0
      for(x <- 0 until edgelist.size()){
        var srcId = edgelist.get(x).inVertex.id().toString
        var dstId = edgelist.get(x).outVertex.id().toString
        val md1 = convertStringIDToLongID(srcId)
        val md2 = convertStringIDToLongID(dstId)
        // Get the properties of the edge
        var edgeAttr = new util.HashMap[String,java.io.Serializable]()
        edgelist.get(x).properties().asScala.foreach((pro:Property[Nothing])=>
        {edgeAttr.put(pro.key(),pro.value().toString)})
        list.append(graphx.Edge(md1,md2,edgeAttr))
      }
      list
    })
val edgeRDD = edge.distinct()

注:
1.一个中心点可能有多个邻接边,利用ArrayBuffer将每个边按照Edge格式存储到list,flatMap将每一个list展开;
2.为了区别TinkerPop的Edge和GraphX的Edge,将GraphX的Edge格式用graphx.Edge表示。


构造GraphX graphRDD

已经具备了VertexRDD和RDD[Edge],构造GraphX轻而易举

graphx.Graph[util.HashMap[String,java.io.Serializable],
      HashMap[String,java.io.Serializable]](vertexRDD,edgeRDD,new HashMap[String,java.io.Serializable]())

注:
这里需要显式指定点属性,边属性的类型,就是GraphX官方文档中的VD和ED,我们转换器中,属性都使用util.HashMap[String,java.io.Serializable]类型来存储。

至此,我们从TinkerPop出发,顺利到达GraphX,之后可以利用丰富的图计算算子进行图分析处理,可以将得到的结果作为新的属性添加到点或边属性中,从而生成新的GraphX graphRDD。在GraphX玩久了突然想家,如何找到回家的路,转换回GraphSON,且听下节分解。

从GraphX转换为GraphSON

往返路线具有对称性,枢纽也是TinkerPop的StraGraph,解析GraphX的graphRDD,把元素构造成StarGraph的形式,再利用TinkerPop写文件的接口。

解析GraphX graphRDD

为了生成StarGraph,一定需要按每个点ID进行join的操作,从而生成中心点及其邻接边,邻接点的结构。
关键API
org.apache.spark.graphx
abstract class Graph[VD, ED] extends Serializable

Modifier and Type Method and Description
Graph[VD2, ED] outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) ⇒ VD2)(implicit arg0:ClassTag[U], arg1: ClassTag[VD2], eq: =:=[VD, VD2] = null)

Joins the vertices with entries in the table RDD and merges the results using mapFunc. The input table should contain at most one entry for each vertex. If no entry in other is provided for a particular vertex in the graph, the map function receives None.

class GraphOps[VD, ED] extends Serializable

Modifier and Type Method and Description
VertexRDD[Array[Edge[ED]]] collectEdges(edgeDirection: EdgeDirection)

Returns an RDD that contains for each vertex v its local edges, i.e., the edges that are incident on v, in the user-specified direction.

collectEdges将一个点所有的出边都加入到Array中,作为一个点的新属性,但点的原本属性被丢弃。我们于是使用outerJoinVertices,把点属性和边Array信息聚合构建StarGraph,并将之存储在每个点的属性中。

    // Tuple2 of the src vertex id and the array of all its out Edge
    val vertexRDDWithEdgeProperties = graphRDD.collectEdges(EdgeDirection.Out)
 
    // Join the vertex id ,vertex attribute and the array of all its out edges(as adjacent edges)
    val tinkerPopVertexRDD = graphRDD.outerJoinVertices(vertexRDDWithEdgeProperties) {
      case (centerVertexID, centerVertexAttr, adjs) => {
        // Create the StarGraph and its center
        val graph = StarGraph.open
        val cache = new util.HashMap[Long, Vertex]
        val centerVertex:Vertex = getOrCreateVertexForStarGraph(graph,cache,
                                                                centerVertexID,true,centerVertexAttr)
        // Add adjacent edges
        adjs.get.map(edge => {
 
          // Create the adjacent vertex
          val anotherVertexID = edge.dstId
          val edgeProperties = edge.attr
          val srcV = centerVertex
          val dstV :Vertex = getOrCreateVertexForStarGraph(graph,cache,anotherVertexID,false, null)
 
          // For both direction, add an edge between the both vertices
          val outedgeID:lang.Long = hashEdgeID(edge.srcId.toString,edge.dstId.toString)
          val outedge = srcV.addEdge(DEFAULT_EDGE_LABEL,dstV,T.id,outedgeID)
          if (outedge != null && edgeProperties.size > 0) addProperties(outedge, edgeProperties)
 
          val inedgeID:lang.Long = hashEdgeID(edge.dstId.toString,edge.srcId.toString)
          val inedge = dstV.addEdge(DEFAULT_EDGE_LABEL,srcV,T.id,inedgeID)
          if (inedge != null && edgeProperties.size > 0) addProperties(inedge, edgeProperties)
        })
 
        // Return the center vertex
        graph.getStarVertex
      }
    }.vertices.map {case(vid, vertex) => vertex}
 

注:
1.添加邻接边的过程中,需要指定边ID值,不同的边ID不同,一条边需要对称地添加在起点和终点,保证相同的ID值,才可以被TinkerPop Gremlin读入识别(写入GraphSON文件时,不会检查ID唯一性问题,在读取文件,创建Graph时候,会进行识别,如果有重复点ID或者边ID出现,会报错)。
2.对于无向图,两个点之间视为有两条方向不同的有向边,这两条边的ID生成策略如下,设两个点的ID分别是A,B,转换为字符串后,A到B的ID值为“A”拼接"B"的哈希值,B到A的ID值为“B”拼接“A”的哈希值。


在解析过程中,生成StarGraph是很重要的一部分
org.apache.tinkerpop.gremlin.structure.util.star.StarGraph
Class StarGraph

Modifier and Type Method and Description
static StarGraph open()
Creates an empty StarGraph.
StarGraph.StarVertex getStarVertex()
Gets the Vertex representative of the StarGraph.
Vertex addVertex(Object... keyValues)
Add a Vertex to the graph given an optional series of key/value pairs.

解析过程中创建StrarGraph的getOrCreateVertexForStarGraph函数如下

  def getOrCreateVertexForStarGraph(graph:StarGraph, cache:util.HashMap[Long, Vertex],
                      name: Long,isCenter: Boolean,
                      properties :util.HashMap[String, java.io.Serializable]):Vertex = {
 
    // Get the vertex contained in the cache or create one
    // Return the vertex
    if (cache.containsKey(name) && !isCenter) cache.get(name)
    else if (!cache.containsKey(name) && !isCenter) {
      val v = graph.addVertex(T.id, name:lang.Long, T.label, DEFAULT_VERTEX_LABEL)
      cache.put(name, v)
      v
    } else if (cache.containsKey(name) && isCenter) {
      val v = cache.get(name)
 
      // Add the properties only if the vertex is center vertex
      properties.asScala.foreach(pro => {
        v.property(pro._1, pro._2)
      })
      cache.replace(name, v)
      v
    } else {
      val v = graph.addVertex(T.id, name:lang.Long, T.label, DEFAULT_VERTEX_LABEL)
      properties.asScala.foreach(pro => {
        v.property(pro._1, pro._2)
      })
      cache.put(name, v)
      v
    }
  }

注:
点属性以HashMap形式保存,引入了cache,避免重复创建。中心点的邻接点(isCenter = false)创建时,不需要写入点属性,每个点都会被遍历为中心点,避免重复写入点属性。


写入GraphSON

和读入过程对称,需要构造Tuple2[AnyRef, VertexWritable]的JavaPairRDD。

org.apache.tinkerpop.gremlin.spark.structure.io
Class OutputFormatRDD

Modifier and Type Method and Description
void writeGraphRDD(Configuration configuration, <any> graphRDD)
Write the graphRDD to an output location.

根据vertex:VertexRDD构建元素是Tuple2[AnyRef, VertexWritable]的JavaPairRDD,输出到指定文件路径,完成最后的收尾工作。

// Change the form for adapting to the java interface
val tinkergraphRDD = tinkerPopVertexRDD.map(vertex => (AnyRef :AnyRef, new VertexWritable(vertex))).toJavaRDD()
 
///////// Output the VertexRDD
val outputConf = new BaseConfiguration
val tmpOutputPath = outputFilePath + "~"
val hadoopConf = new Configuration
val path = URI.create(outputFilePath)
outputConf.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, classOf[GraphSONOutputFormat].getName)
outputConf.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, tmpOutputPath)
FileSystem.get(path,hadoopConf).delete(new Path(tmpOutputPath), true)
FileSystem.get(path,hadoopConf).delete(new Path(outputFilePath), true)
FileSystem.get(path,hadoopConf).deleteOnExit(new Path(tmpOutputPath))
val formatRDD = new OutputFormatRDD
formatRDD.writeGraphRDD(outputConf, JavaPairRDD.fromJavaRDD(tinkergraphRDD))
sc.stop()
FileSystem.get(path,hadoopConf).rename(new Path(tmpOutputPath, "~g"), new Path(outputFilePath))
FileSystem.get(path,hadoopConf).delete(new Path(tmpOutputPath), true)

注:
1.从rdd.RDD到JavaPairRDD,需要经过JavaRDD的中转,利用JavaPairRDD自带的fromJavaRDD方法;
2.writeGraphRDD的过程最后产生的文件名和用户提供的有微小差别,需要特别处理。

尾声

至此,我们已经完成了从GraphSON到GraphX之间图数据的旅程,意味着我们实现了通过GraphSON作为中转站,两个图框架的对接。下一步,我们的目标是直接从图数据库到GraphX的转换,希望可以绕开中转站,实现无缝对接。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容