NebulaSparkConnector2.0
[参考链接]
[参考链接]:https://docs.nebula-graph.com.cn/spark-connector/sc-ug-what-is-spark-connector/
[代码]
[代码]:https://github.com/Loading-Life/nebula-spark-java-demo
说明
Nebula Spark Connector 是一个 Spark 连接器,提供了通过 Spark 标准形式读写 Nebula Graph 数据库的能力,由以下两部分组成:
Reader:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程读取 Nebula Graph 图数据,单次读取一个点或边类型的数据,并将读取的结果组装成 Spark 的 DataFrame。
Writer:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程将 DataFrame 格式的数据逐条或批量写入 Nebula Graph。
适用场景
- 在不同的 Nebula Graph 集群之间迁移数据。
- 在同一个 Nebula Graph 集群内不同图空间之间迁移数据。
- Nebula Graph 与其他数据源之间迁移数据。
环境
- NebulaGraph : 2.0.0
- Apache Spark™ : 2.4.4
- Scala : 2.11.12
- Java : 1.8
使用
-
clone & insall
$ git clone https://github.com/vesoft-inc/nebula-spark-utils.git $ cd nebula-spark-utils $ git checkout -b v2.0.0 $ mvn clean install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
创建你的maven项目
-
添加maven依赖
<dependency> <groupId>com.vesoft</groupId> <artifactId>nebula-spark</artifactId> <version>2.0.0</version> </dependency>
-
编写测试类并修改成你的配置
-
数据样例
dege
{"src":12345,"dst":23456,"degree":34, "descr": "aaa","timep": "2020-01-01"} {"src":11111,"dst":22222,"degree":33, "descr": "aaa","timep": "2020-01-01"} {"src":11111,"dst":33333,"degree":32, "descr": "a\baa","timep": "2020-01-01"} {"src":11111,"dst":44444,"degree":31, "descr": "aaa","timep": "2020-01-01"} {"src":22222,"dst":55555,"degree":30, "descr": "a\naa","timep": "2020-01-01"} {"src":33333,"dst":44444,"degree":29, "descr": "aaa","timep": "2020-01-01"} {"src":33333,"dst":55555,"degree":28, "descr": "aa\ta","timep": "2020-01-01"} {"src":44444,"dst":22222,"degree":27, "descr": "aaa","timep": "2020-01-01"} {"src":44444,"dst":55555,"degree":26, "descr": "aaa","timep": "2020-01-01"} {"src":22222,"dst":66666,"degree":25, "descr": "aaa","timep": "2020-01-01"} {"src":21,"dst":22,"degree":24, "descr": "aaa","timep": "2021-04-09"}
vertex
te{"id":"aa","name":"Tom","age":10,"timep": "2020-01-01"} {"id":"\ns","name":"Bob","age":11,"timep": "2020-01-02"} {"id":14,"name":"Jane","age":12,"timep": "2020-01-03"} {"id":15,"name":"Jena","age":13,"timep": "2020-01-04"} {"id":16,"name":"Ni\tc","age":14,"timep": "2020-01-05"} {"id":17,"name":"Mei","age":15,"timep": "2020-01-06"} {"id":18,"name":"HH","age":16,"timep": "2020-01-07"} {"id":19,"name":"Ty\nler","age":17,"timep": "2020-01-08"} {"id":20,"name":"Ber","age":18,"timep": "2020-01-09"} {"id":21,"name":"Mercy","age":19,"timep": "2020-01-10"} {"id":22,"name":"why","age":27,"timep": "2021-04-09"}
-
write代码示例
package com.loading.nebula; import com.facebook.thrift.protocol.TCompactProtocol; import com.vesoft.nebula.connector.NebulaConnectionConfig; import com.vesoft.nebula.connector.WriteNebulaEdgeConfig; import com.vesoft.nebula.connector.WriteNebulaVertexConfig; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.storage.StorageLevel; import com.vesoft.nebula.connector.connector.package$; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * desc: * * @author Lo_ading * @version 1.0.0 * @date 2021/4/9 */ public class NebulaSparkWriterExample { private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class); public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); Class<?>[] classes = {TCompactProtocol.class}; sparkConf.registerKryoClasses(classes); SparkContext sparkContext = new SparkContext("local", "NebulaSparkWriterExample", sparkConf); SparkSession sparkSession = new SparkSession(sparkContext); writeData(sparkSession); sparkSession.close(); System.exit(0); } private static void writeData(SparkSession sparkSession) { // build and setting nubula connection config NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig .builder() .withMetaAddress("your-meta-host:meta-port") .withGraphAddress("your-graph-host:graph-port") .withConenctionRetry(2) .build(); //write vertex data System.out.println("Start to write nebula data [vertex]"); //your data file Dataset<Row> vertexDataset = sparkSession.read().json("/data/vertex"); vertexDataset.show(); vertexDataset.persist(StorageLevel.MEMORY_ONLY_SER()); WriteNebulaVertexConfig writeNebulaVertexConfig = WriteNebulaVertexConfig .builder() .withSpace("test") .withTag("person") .withVidField("id") .withVidAsProp(true) .withBatch(1000) .build(); DataFrameWriter<Row> vertexDataFrameWriter = new DataFrameWriter<>(vertexDataset); package$.MODULE$.NebulaDataFrameWriter(vertexDataFrameWriter) .nebula(nebulaConnectionConfig, writeNebulaVertexConfig).writeVertices(); System.out.println("End to write nebula data [vertex]"); //写入edge数据 System.out.println("Start to write nebula data [edge]"); Dataset<Row> edgeDataset = sparkSession.read().json("/data/edge"); edgeDataset.show(); edgeDataset.persist(StorageLevel.MEMORY_ONLY_SER()); WriteNebulaEdgeConfig writeNebulaEdgeConfig = WriteNebulaEdgeConfig .builder() .withSpace("test") .withEdge("friend") .withSrcIdField("src") .withDstIdField("dst") .withRankField("degree") .withSrcAsProperty(true) .withDstAsProperty(true) .withRankAsProperty(true) .withBatch(1000) .build(); DataFrameWriter<Row> edgeDataFrameWriter = new DataFrameWriter<>(edgeDataset); package$.MODULE$.NebulaDataFrameWriter(edgeDataFrameWriter) .nebula(nebulaConnectionConfig, writeNebulaEdgeConfig).writeEdges(); System.out.println("End to write nebula data [edge]"); } }
-
read代码示例
package com.loading.nebula; import com.facebook.thrift.protocol.TCompactProtocol; import com.vesoft.nebula.connector.NebulaConnectionConfig; import com.vesoft.nebula.connector.ReadNebulaConfig; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; import scala.collection.immutable.List; import com.vesoft.nebula.connector.connector.package$; import scala.collection.immutable.List$; import scala.collection.mutable.StringBuilder; /** * desc: * * @author Lo_ading * @version 1.0.0 * @date 2021/4/9 */ public class NebulaSparkReaderExample { private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class); public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); Class<?>[] classes = {TCompactProtocol.class}; sparkConf.registerKryoClasses(classes); SparkContext sparkContext = new SparkContext("local", "NebulaSparkReaderExample", sparkConf); SparkSession sparkSession = new SparkSession(sparkContext); readData(sparkSession); sparkSession.close(); System.exit(0); } private static void readData(SparkSession sparkSession) { // build connection config NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig .builder() .withMetaAddress("your-meta-host:meta-port") .withConenctionRetry(2) .withTimeout(600) .build(); readVertex(sparkSession, nebulaConnectionConfig); readEdges(sparkSession, nebulaConnectionConfig); readVertexGraph(sparkSession, nebulaConnectionConfig); readEdgeGraph(sparkSession, nebulaConnectionConfig); } private static void readVertex(SparkSession sparkSession, NebulaConnectionConfig nebulaConnectionConfig) { List<String> cols = List$.MODULE$.empty(); cols.addString(new StringBuilder("name")); cols.addString(new StringBuilder("age")); cols.addString(new StringBuilder("id")); cols.addString(new StringBuilder("timep")); ReadNebulaConfig readNebulaConfig = ReadNebulaConfig .builder() .withSpace("test") .withLabel("person") .withNoColumn(false) .withReturnCols(cols) .build(); DataFrameReader dataFrameReader = new DataFrameReader(sparkSession); Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader) .nebula(nebulaConnectionConfig, readNebulaConfig).loadVerticesToDF(); System.out.println("Vertices schema"); dataset.printSchema(); dataset.show(20); System.out.println("Vertices nums:" + dataset.count()); } private static void readEdges(SparkSession sparkSession, NebulaConnectionConfig nebulaConnectionConfig) { ReadNebulaConfig readNebulaConfig = ReadNebulaConfig .builder() .withSpace("test") .withLabel("friend") .withNoColumn(true) .build(); DataFrameReader dataFrameReader = new DataFrameReader(sparkSession); Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader) .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToDF(); System.out.println("Edge schema"); dataset.printSchema(); dataset.show(20); System.out.println("Edge nums:" + dataset.count()); } private static void readVertexGraph(SparkSession sparkSession, NebulaConnectionConfig nebulaConnectionConfig) { List<String> cols = List$.MODULE$.empty(); cols.addString(new StringBuilder("name")); ReadNebulaConfig readNebulaConfig = ReadNebulaConfig .builder() .withSpace("test") .withLabel("person") .withReturnCols(cols) .build(); DataFrameReader dataFrameReader = new DataFrameReader(sparkSession); RDD<Tuple2<Object, List<Object>>> vertexRdd = package$.MODULE$ .NebulaDataFrameReader(dataFrameReader).nebula(nebulaConnectionConfig, readNebulaConfig) .loadVerticesToGraphx(); System.out.println("Vertices RDD nums:" + vertexRdd.count()); } private static void readEdgeGraph(SparkSession sparkSession, NebulaConnectionConfig nebulaConnectionConfig) { ReadNebulaConfig readNebulaConfig = ReadNebulaConfig .builder() .withSpace("test") .withLabel("friend") .withNoColumn(true) .build(); DataFrameReader dataFrameReader = new DataFrameReader(sparkSession); RDD<Edge<Tuple2<Object, List<Object>>>> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader) .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToGraphx(); System.out.println("Edge RDD nums:" + dataset.count()); } }
-
运行demo
2.0版本的写入默认为为批量,构造writeNebulaEdgeConfig时使用withBatch设置批量操作条数
写入时需提前创建好图空间
,Edge
,Vertex
, 及相关属性