[nebula-graph] nebula spark for java

NebulaSparkConnector2.0

[参考链接]
[参考链接]:https://docs.nebula-graph.com.cn/spark-connector/sc-ug-what-is-spark-connector/

[代码]
[代码]:https://github.com/Loading-Life/nebula-spark-java-demo

说明

Nebula Spark Connector 是一个 Spark 连接器,提供了通过 Spark 标准形式读写 Nebula Graph 数据库的能力,由以下两部分组成:

  • Reader:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程读取 Nebula Graph 图数据,单次读取一个点或边类型的数据,并将读取的结果组装成 Spark 的 DataFrame。

  • Writer:为您提供了一个 Spark SQL 接口,您可以使用 Spark SQL 接口编程将 DataFrame 格式的数据逐条或批量写入 Nebula Graph。

适用场景

  • 在不同的 Nebula Graph 集群之间迁移数据。
  • 在同一个 Nebula Graph 集群内不同图空间之间迁移数据。
  • Nebula Graph 与其他数据源之间迁移数据。

环境

  • NebulaGraph : 2.0.0
  • Apache Spark™ : 2.4.4
  • Scala : 2.11.12
  • Java : 1.8

使用

  1. clone & insall

     $ git clone https://github.com/vesoft-inc/nebula-spark-utils.git
     $ cd nebula-spark-utils
     $ git checkout -b v2.0.0
     $ mvn clean install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
    
  2. 创建你的maven项目

  3. 添加maven依赖

     <dependency>
       <groupId>com.vesoft</groupId>
       <artifactId>nebula-spark</artifactId>
       <version>2.0.0</version>
     </dependency>
    
  4. 编写测试类并修改成你的配置

    • 数据样例

      dege

        {"src":12345,"dst":23456,"degree":34, "descr": "aaa","timep": "2020-01-01"}
        {"src":11111,"dst":22222,"degree":33, "descr": "aaa","timep": "2020-01-01"}
        {"src":11111,"dst":33333,"degree":32, "descr": "a\baa","timep": "2020-01-01"}
        {"src":11111,"dst":44444,"degree":31, "descr": "aaa","timep": "2020-01-01"}
        {"src":22222,"dst":55555,"degree":30, "descr": "a\naa","timep": "2020-01-01"}
        {"src":33333,"dst":44444,"degree":29, "descr": "aaa","timep": "2020-01-01"}
        {"src":33333,"dst":55555,"degree":28, "descr": "aa\ta","timep": "2020-01-01"}
        {"src":44444,"dst":22222,"degree":27, "descr": "aaa","timep": "2020-01-01"}
        {"src":44444,"dst":55555,"degree":26, "descr": "aaa","timep": "2020-01-01"}
        {"src":22222,"dst":66666,"degree":25, "descr": "aaa","timep": "2020-01-01"}
        {"src":21,"dst":22,"degree":24, "descr": "aaa","timep": "2021-04-09"}
      

      vertex

        te{"id":"aa","name":"Tom","age":10,"timep": "2020-01-01"}
        {"id":"\ns","name":"Bob","age":11,"timep": "2020-01-02"}
        {"id":14,"name":"Jane","age":12,"timep": "2020-01-03"}
        {"id":15,"name":"Jena","age":13,"timep": "2020-01-04"}
        {"id":16,"name":"Ni\tc","age":14,"timep": "2020-01-05"}
        {"id":17,"name":"Mei","age":15,"timep": "2020-01-06"}
        {"id":18,"name":"HH","age":16,"timep": "2020-01-07"}
        {"id":19,"name":"Ty\nler","age":17,"timep": "2020-01-08"}
        {"id":20,"name":"Ber","age":18,"timep": "2020-01-09"}
        {"id":21,"name":"Mercy","age":19,"timep": "2020-01-10"}
        {"id":22,"name":"why","age":27,"timep": "2021-04-09"}
      
    • write代码示例

        package com.loading.nebula;
      
        import com.facebook.thrift.protocol.TCompactProtocol;
        import com.vesoft.nebula.connector.NebulaConnectionConfig;
        import com.vesoft.nebula.connector.WriteNebulaEdgeConfig;
        import com.vesoft.nebula.connector.WriteNebulaVertexConfig;
        import org.apache.spark.SparkConf;
        import org.apache.spark.SparkContext;
        import org.apache.spark.sql.DataFrameWriter;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import org.apache.spark.sql.SparkSession;
        import org.apache.spark.storage.StorageLevel;
        import com.vesoft.nebula.connector.connector.package$;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        
        /**
         * desc:
         *
         * @author Lo_ading
         * @version 1.0.0
         * @date 2021/4/9
         */
        public class NebulaSparkWriterExample {
        
          private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class);
        
          public static void main(String[] args) {
        
            SparkConf sparkConf = new SparkConf();
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        
            Class<?>[] classes = {TCompactProtocol.class};
            sparkConf.registerKryoClasses(classes);
        
            SparkContext sparkContext = new SparkContext("local", "NebulaSparkWriterExample", sparkConf);
            SparkSession sparkSession = new SparkSession(sparkContext);
        
            writeData(sparkSession);
        
            sparkSession.close();
            System.exit(0);
        
          }
        
          private static void writeData(SparkSession sparkSession) {
        
            // build and setting nubula connection config
            NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig
                .builder()
                .withMetaAddress("your-meta-host:meta-port")
                .withGraphAddress("your-graph-host:graph-port")
                .withConenctionRetry(2)
                .build();
        
            //write vertex data
            System.out.println("Start to write nebula data [vertex]");
            
            //your data file
            Dataset<Row> vertexDataset = sparkSession.read().json("/data/vertex");
            vertexDataset.show();
            vertexDataset.persist(StorageLevel.MEMORY_ONLY_SER());
            WriteNebulaVertexConfig writeNebulaVertexConfig = WriteNebulaVertexConfig
                .builder()
                .withSpace("test")
                .withTag("person")
                .withVidField("id")
                .withVidAsProp(true)
                .withBatch(1000)
                .build();
            DataFrameWriter<Row> vertexDataFrameWriter = new DataFrameWriter<>(vertexDataset);
            package$.MODULE$.NebulaDataFrameWriter(vertexDataFrameWriter)
                .nebula(nebulaConnectionConfig, writeNebulaVertexConfig).writeVertices();
            System.out.println("End to write nebula data [vertex]");
        
            //写入edge数据
            System.out.println("Start to write nebula data [edge]");
            Dataset<Row> edgeDataset = sparkSession.read().json("/data/edge");
            edgeDataset.show();
            edgeDataset.persist(StorageLevel.MEMORY_ONLY_SER());
            WriteNebulaEdgeConfig writeNebulaEdgeConfig = WriteNebulaEdgeConfig
                .builder()
                .withSpace("test")
                .withEdge("friend")
                .withSrcIdField("src")
                .withDstIdField("dst")
                .withRankField("degree")
                .withSrcAsProperty(true)
                .withDstAsProperty(true)
                .withRankAsProperty(true)
                .withBatch(1000)
                .build();
            DataFrameWriter<Row> edgeDataFrameWriter = new DataFrameWriter<>(edgeDataset);
            package$.MODULE$.NebulaDataFrameWriter(edgeDataFrameWriter)
                .nebula(nebulaConnectionConfig, writeNebulaEdgeConfig).writeEdges();
            System.out.println("End to write nebula data [edge]");
      
          }
        
        
        }
      
    • read代码示例

        package com.loading.nebula;
        
        import com.facebook.thrift.protocol.TCompactProtocol;
        import com.vesoft.nebula.connector.NebulaConnectionConfig;
        import com.vesoft.nebula.connector.ReadNebulaConfig;
        import org.apache.spark.SparkConf;
        import org.apache.spark.SparkContext;
        import org.apache.spark.graphx.Edge;
        import org.apache.spark.rdd.RDD;
        import org.apache.spark.sql.DataFrameReader;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import org.apache.spark.sql.SparkSession;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import scala.Tuple2;
        import scala.collection.immutable.List;
        import com.vesoft.nebula.connector.connector.package$;
        import scala.collection.immutable.List$;
        import scala.collection.mutable.StringBuilder;
        
        /**
         * desc:
         *
         * @author Lo_ading
         * @version 1.0.0
         * @date 2021/4/9
         */
        public class NebulaSparkReaderExample {
        
          private final Logger logger = LoggerFactory.getLogger(NebulaSparkWriterExample.class);
        
          public static void main(String[] args) {
        
            SparkConf sparkConf = new SparkConf();
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        
            Class<?>[] classes = {TCompactProtocol.class};
            sparkConf.registerKryoClasses(classes);
        
            SparkContext sparkContext = new SparkContext("local", "NebulaSparkReaderExample", sparkConf);
            SparkSession sparkSession = new SparkSession(sparkContext);
        
            readData(sparkSession);
        
            sparkSession.close();
            System.exit(0);
        
          }
        
          private static void readData(SparkSession sparkSession) {
        
            // build connection config
            NebulaConnectionConfig nebulaConnectionConfig = NebulaConnectionConfig
                .builder()
                .withMetaAddress("your-meta-host:meta-port")
                .withConenctionRetry(2)
                .withTimeout(600)
                .build();
        
            readVertex(sparkSession, nebulaConnectionConfig);
            readEdges(sparkSession, nebulaConnectionConfig);
            readVertexGraph(sparkSession, nebulaConnectionConfig);
            readEdgeGraph(sparkSession, nebulaConnectionConfig);
          }
        
          private static void readVertex(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
        
            List<String> cols = List$.MODULE$.empty();
            cols.addString(new StringBuilder("name"));
            cols.addString(new StringBuilder("age"));
            cols.addString(new StringBuilder("id"));
            cols.addString(new StringBuilder("timep"));
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("person")
                .withNoColumn(false)
                .withReturnCols(cols)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                .nebula(nebulaConnectionConfig, readNebulaConfig).loadVerticesToDF();
            System.out.println("Vertices schema");
            dataset.printSchema();
            dataset.show(20);
            System.out.println("Vertices nums:" + dataset.count());
        
          }
        
          private static void readEdges(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("friend")
                .withNoColumn(true)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            Dataset<Row> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToDF();
            System.out.println("Edge schema");
            dataset.printSchema();
            dataset.show(20);
            System.out.println("Edge nums:" + dataset.count());
          }
        
          private static void readVertexGraph(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
            List<String> cols = List$.MODULE$.empty();
            cols.addString(new StringBuilder("name"));
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("person")
                .withReturnCols(cols)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            RDD<Tuple2<Object, List<Object>>> vertexRdd = package$.MODULE$
                .NebulaDataFrameReader(dataFrameReader).nebula(nebulaConnectionConfig, readNebulaConfig)
                .loadVerticesToGraphx();
            System.out.println("Vertices RDD nums:" + vertexRdd.count());
          }
        
          private static void readEdgeGraph(SparkSession sparkSession,
              NebulaConnectionConfig nebulaConnectionConfig) {
            ReadNebulaConfig readNebulaConfig = ReadNebulaConfig
                .builder()
                .withSpace("test")
                .withLabel("friend")
                .withNoColumn(true)
                .build();
            DataFrameReader dataFrameReader = new DataFrameReader(sparkSession);
            RDD<Edge<Tuple2<Object, List<Object>>>> dataset = package$.MODULE$.NebulaDataFrameReader(dataFrameReader)
                .nebula(nebulaConnectionConfig, readNebulaConfig).loadEdgesToGraphx();
            System.out.println("Edge RDD nums:" + dataset.count());
        
          }
        
        }
      
  5. 运行demo

2.0版本的写入默认为为批量,构造writeNebulaEdgeConfig时使用withBatch设置批量操作条数
写入时需提前创建好 图空间Edge, Vertex, 及相关属性

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容