当数据量太大的时候,对hugegraph 进行一些统计查询或者算法遍历的时候,经常会超时,或者时间很久。
这个时候需要借助与大数据相关的技术。hugegraph 之前提供了一个hugegraph-spark的组件,
但是变态的是,目前该组件已经商业化,还好其团队在git上大概介绍了实现的思路。
- 使用hugegraph 的切片查询功能。 因为底层使用cassandra,可以先查出整个集群有多少个切片。
- 然后使用并行框架并发的去遍历每个切片,这样就可以快速的把整个集群的节点和边获取出来。
- 把所有的节点和边导入到HDFS中。
- 编写代码把导入的文件转化为graphx 支持的RDD格式,并生成graph对象,然后就可以调用graph对象的方法。
- 把整个程序打包成jar包提交给spark平台,就可以分布式的调用graphx的相关方法,比如统计节点。
我们构建了一个3000万个节点,4000万边的数据集。在10个节点的spark集群上,大概需要1分钟。
大概的实现:
- 使用java+scala的混合方式。
- java线程池读遍历hugegraph分片,并导入数据到hdfs
edgeShards.forEach(shard->
{
try
{
producerSemaphore.acquire();
} catch (InterruptedException e)
{
throw new RuntimeException("can't acquire consumer semaphore");
}
HugeEdgeHandler edgeHandler = new HugeEdgeHandler(shard,graphQueryDao,graphSerializer,
hadoopProperties,hdfsDir);
CompletableFuture.runAsync(
edgeHandler,
producerExecutor
).whenComplete((r,e)->{ producerSemaphore.release();});
}
);
@Override
public void run()
{
//TODO add Retry
List<Edge> edges = graphQueryDao.getEdgesByShard(this.currentShard);
log.info("add {} edges to queue using shard {}", edges.size(), this.currentShard);
Collection<String> lines = edges.stream().map(e-> graphxLine(graphSerializer.writeEdge(e))).collect(Collectors.toList());
HadoopFileUtils.writeFromLocalToHdfs(hadoopProperties, lines.iterator(), getHdfsFileName());
log.info("write {} lines into hdfs file {}",lines.size(), getHdfsFileName());
}
- 构建GraphX的RDD对象,生成Graph并调用算法,这部分由scala实现。
object GraphXTraversal {
def run(context : SparkContext, vertexFileName: String, edgeFileName: String, traversalFileName: String): Unit = {
val vertexRdd : RDD[(VertexId, String)] = context.textFile(vertexFileName)
.map(line => {
val parts = line.split("\t")
(parts(1).toLong, parts(0))
})
val edgeRdd : RDD[Edge[String]] = context.textFile(edgeFileName)
.map(line => {
val parts = line.split("\t")
new Edge(parts(1).toLong, parts(2).toLong, parts(0))
})
val graph : Graph[String, String] = Graph(vertexRdd, edgeRdd)
val vConunt = graph.vertices.count();
val eCount = graph.edges.count();
val output=context.makeRDD(List(vConunt,eCount));
output.saveAsTextFile(traversalFileName);
}
}
- java+scala混合打包,pom.xml 的配置是关键,需要加入scala插件,并且显示指定main方法,否则 spark-submit的时候会报 class not found 的问题。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<archive>
<manifest>
<mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<skipAssembly>false</skipAssembly>
</configuration>
<executions>
<execution>
<id>package</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
- spark 集群自己调用hugegraph并生成rdd
使用如下的方式 让spark 的集群并发获取shard的信息直接构建,RDD,这样是最快的。
JavaRDD<Edge<String>> edgeRDD = rddShards.flatMap((shard)->
{
HugeClient Xcv =new HugeClient("http://10.20.205.167:8080", "hugegraph",1200);
List<com.baidu.hugegraph.structure.graph.Edge> baiduEdges = Xcv.traverser().edges(shard);
List<Edge<String>> sparkEdge = Lists.newArrayList();
baiduEdges.stream().forEach(e->
{
sparkEdge.add(new Edge<String>(Long.parseLong(String.valueOf(e.source()))
,Long.parseLong(String.valueOf(e.target())),e.label()));
});
return sparkEdge.iterator();
}
);
但是打包运行的时候遇到了各种问题
- 其中一个是 Hugeclient checkVersion失败。
原因 hugegraph的组件启动的时候会去 自己的包里面 manifest.mf 里面找一个 implemnt-version的变量。
看和程序的版本是否一致。 hugegraph-client, hugegraph-common 的包里都有这个文件。
但是当使用 maven assembly 和maven shade的时候, 依赖的包都会被解压, manifest 都没有copy过来,或者说被覆盖了。
造成 hugegraph启动的时候一直就是 version 无法match。
- 解决的办法, 不能unpack 依赖的jar 包,否则manifest信息会丢失。
在不unpack 的情况下,只能将jar 拷贝到目录,然后指定 classpath。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.datayes.kgraph.job.GraphTraveralJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
在spak-submit的时候,指定 --jar a.jar,b.jar myapplication.jar
--jar 参数一定要在 myapplication.jar 之前。 为啥还有这种限制,真的是无语了。坑巨多。