hugegraph 整合 spark+graphX

当数据量太大的时候,对hugegraph 进行一些统计查询或者算法遍历的时候,经常会超时,或者时间很久。
这个时候需要借助与大数据相关的技术。hugegraph 之前提供了一个hugegraph-spark的组件,
但是变态的是,目前该组件已经商业化,还好其团队在git上大概介绍了实现的思路。

  • 使用hugegraph 的切片查询功能。 因为底层使用cassandra,可以先查出整个集群有多少个切片。
  • 然后使用并行框架并发的去遍历每个切片,这样就可以快速的把整个集群的节点和边获取出来。
  • 把所有的节点和边导入到HDFS中。
  • 编写代码把导入的文件转化为graphx 支持的RDD格式,并生成graph对象,然后就可以调用graph对象的方法。
  • 把整个程序打包成jar包提交给spark平台,就可以分布式的调用graphx的相关方法,比如统计节点。

我们构建了一个3000万个节点,4000万边的数据集。在10个节点的spark集群上,大概需要1分钟。

大概的实现:

  • 使用java+scala的混合方式。
  • java线程池读遍历hugegraph分片,并导入数据到hdfs
edgeShards.forEach(shard->
            {
                try
                {
                    producerSemaphore.acquire();
                } catch (InterruptedException e)
                {
                    throw new RuntimeException("can't acquire consumer semaphore");
                }

                HugeEdgeHandler edgeHandler = new HugeEdgeHandler(shard,graphQueryDao,graphSerializer,
                        hadoopProperties,hdfsDir);
                CompletableFuture.runAsync(
                        edgeHandler,
                        producerExecutor
                ).whenComplete((r,e)->{ producerSemaphore.release();});
            }
        );
        
    @Override
    public void run()
    {
        //TODO add Retry
        List<Edge> edges = graphQueryDao.getEdgesByShard(this.currentShard);
        log.info("add {} edges to queue using shard {}", edges.size(), this.currentShard);
        Collection<String> lines = edges.stream().map(e-> graphxLine(graphSerializer.writeEdge(e))).collect(Collectors.toList());
        HadoopFileUtils.writeFromLocalToHdfs(hadoopProperties, lines.iterator(), getHdfsFileName());
        log.info("write {} lines into  hdfs file {}",lines.size(), getHdfsFileName());
    }
  • 构建GraphX的RDD对象,生成Graph并调用算法,这部分由scala实现。
object GraphXTraversal {
  def run(context : SparkContext, vertexFileName: String, edgeFileName: String, traversalFileName: String): Unit = {
    val vertexRdd : RDD[(VertexId, String)] = context.textFile(vertexFileName)
      .map(line => {
        val parts = line.split("\t")
        (parts(1).toLong, parts(0))
      })
    val edgeRdd : RDD[Edge[String]] = context.textFile(edgeFileName)
      .map(line => {
        val parts = line.split("\t")
        new Edge(parts(1).toLong, parts(2).toLong, parts(0))
      })

    val graph : Graph[String, String]  =  Graph(vertexRdd, edgeRdd)

    val vConunt = graph.vertices.count();
    val eCount = graph.edges.count();
    val output=context.makeRDD(List(vConunt,eCount));
    output.saveAsTextFile(traversalFileName);
  }


}
  • java+scala混合打包,pom.xml 的配置是关键,需要加入scala插件,并且显示指定main方法,否则 spark-submit的时候会报 class not found 的问题。
<plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.4.1</version>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <skipAssembly>false</skipAssembly>
            </configuration>
            <executions>
                <execution>
                    <id>package</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

  • spark 集群自己调用hugegraph并生成rdd
    使用如下的方式 让spark 的集群并发获取shard的信息直接构建,RDD,这样是最快的。
 JavaRDD<Edge<String>> edgeRDD = rddShards.flatMap((shard)->
        {
            HugeClient  Xcv =new HugeClient("http://10.20.205.167:8080", "hugegraph",1200);
            List<com.baidu.hugegraph.structure.graph.Edge> baiduEdges =  Xcv.traverser().edges(shard);
            List<Edge<String>> sparkEdge = Lists.newArrayList();
            baiduEdges.stream().forEach(e->
            {
                sparkEdge.add(new Edge<String>(Long.parseLong(String.valueOf(e.source()))
                        ,Long.parseLong(String.valueOf(e.target())),e.label()));
            });
            return sparkEdge.iterator();
        }
        );

但是打包运行的时候遇到了各种问题

  • 其中一个是 Hugeclient checkVersion失败。

原因 hugegraph的组件启动的时候会去 自己的包里面 manifest.mf 里面找一个 implemnt-version的变量。
看和程序的版本是否一致。 hugegraph-client, hugegraph-common 的包里都有这个文件。

但是当使用 maven assembly 和maven shade的时候, 依赖的包都会被解压, manifest 都没有copy过来,或者说被覆盖了。

造成 hugegraph启动的时候一直就是 version 无法match。

  • 解决的办法, 不能unpack 依赖的jar 包,否则manifest信息会丢失。
    在不unpack 的情况下,只能将jar 拷贝到目录,然后指定 classpath。
 <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                         <addClasspath>true</addClasspath>
                         <classpathPrefix>lib/</classpathPrefix>
                         <mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
                     </manifest>
                   </archive>
                </configuration>
            </plugin>
        <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-dependency-plugin</artifactId>
             <version>2.10</version>
             <executions>
                 <execution>
                     <id>copy-dependencies</id>
                     <phase>package</phase>
                     <goals>
                         <goal>copy-dependencies</goal>
                     </goals>
                     <configuration>
                         <outputDirectory>${project.build.directory}/lib</outputDirectory>
                     </configuration>
                 </execution>
             </executions>
         </plugin>

在spak-submit的时候,指定 --jar a.jar,b.jar myapplication.jar

--jar 参数一定要在 myapplication.jar 之前。 为啥还有这种限制,真的是无语了。坑巨多。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容