使用spark 程序 读取 mongodb

原因:有同事需要连接 mongodb ,保存数据到 大数据平台。之前尝试了 hive 建立外部表的方式。但是一直不成功。报错原因不明。尝试 mongo Hadoop 中 的 spark 例子,提交到 集群中会报 任务无法序列化的错误。因此采用了 mongo spark 连接器来做一个测试程序

安装 mongodb

主机环境

CentOS Linux release 7.4.1708 (Core)

关闭防火墙

关闭selinux

192.168.14.7

1.下载安装包

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-mongos-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-server-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-shell-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-tools-3.4.18-1.el7.x86_64.rpm

2.安装 httpd createrepo

yum -y install httpd

3.建立目录

mkdir /var/www/html/mongodb

4.移动rpm包到 /var/www/html/mongodb 目录

mv *rpm /var/www/html/mongodb

5.进入 mkdir /var/www/html/mongodb 目录,建立 repo 档案

cd /var/www/html/mongodb

createrepo .

6.启动 httpd 服务并设置开机启动

systemctl start httpd.service

systemctl enable httpd.service

7.建立 mongodb 的 repo 文件

vi /etc/yum.repos.d/mongodb.repo

[mongodb]

name=mongodb

baseurl=http://192.168.14.7/mongodb

enable=1

gpgcheck=0

8.建立 yum 缓存

yum makecache

9.安装 mongodb server 和 shell

yum -y install mongodb-org-server.x86_64 mongodb-org-shell.x86_64

10.修改 /etc/mongod.conf 配置文件

#  bindIp: 127.0.0.1

11.修改系统配置

echo never > /sys/kernel/mm/transparent_hugepage/defrag

echo never > /sys/kernel/mm/transparent_hugepage/enabled

12.启动 mongodb,并设置开机启动

systemctl start mongod.service

systemctl enable mongod.service

13.新建 test 库

use test

14.往 test 表中插入数据

db.test.insertOne({"123" : "123-val", "a" : 34, "b" : 36})

db.test.insertOne({"456" : "456-val", "a" : 45, "b" : 67})

15.检查 test 表中的数据

db.test.find()

spark集群搭建

主机环境

CentOS Linux release 7.4.1708 (Core)

关闭防火墙

关闭 selinux

192.168.14.8

1.安装 jdk

tar -zxvf jdk-8u171-linux-x64.tar.gz

mv jdk1.8.0_171/ /opt/jdk

2.安装 spark

tar -zxvf spark-2.4.5-bin-hadoop2.7.tgz

mv spark-2.4.5-bin-hadoop2.7/  /opt/spark

3.安装 maven

tar -zxvf apache-maven-3.5.2-bin.tar.gz

mv apache-maven-3.5.2/ /opt/maven

3.修改环境变量

vi ~/.bashrc

export JAVA_HOME=/opt/jdk

export SPARK_HOME=/opt/spark

export M2_HOME=/opt/maven

export PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$M2_HOME/bin

4.使环境变量生效

source ~/.bashrc

5.重命名 spark 配置文件

cd /opt/spark/conf/

mv spark-env.sh.template spark-env.sh

mv spark-defaults.conf.template spark-defaults.conf

6.修改 spark 配置

vi spark-defaults.conf

spark.master                    spark://192.168.14.8:7077

vi spark-env.sh

SPARK_MASTER_HOST=192.168.14.8

7.启动 spark master

start-master.sh

8.启动 slave

start-slave.sh spark://192.168.14.8:7077

9.查看 web 页面

浏览器访问 http://192.168.14.8:8080/

10.新建 maven 项目 进入 目录

mvn archetype:generate -DgroupId=com.packt.samples -DartifactId=com.packt.samples.archetype -Dversion=1.0.0 -DinteractiveMode=false -DarchetypeCatalog=internal

cd com.packt.samples

11.编辑 pom.xml

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-core_2.11</artifactId>

      <version>2.4.5</version>

    </dependency>

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-sql_2.11</artifactId>

      <version>2.4.5</version>

    </dependency>

        <dependency>

            <groupId>org.mongodb.spark</groupId>

            <artifactId>mongo-spark-connector_2.11</artifactId>

            <version>2.2.1</version>

        </dependency>

<build>

  <plugins>

    <plugin>

      <groupId>org.apache.maven.plugins</groupId>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.4</version>

      <configuration>

        <descriptorRefs>

          <descriptorRef>jar-with-dependencies</descriptorRef>

        </descriptorRefs>

        <archive>

          <manifest>

            <addClasspath>true</addClasspath>

            <mainClass>com.packt.samples.App</mainClass>

          </manifest>

        </archive>

      </configuration>

      <executions>

        <execution>

          <id>assemble-all</id>

          <phase>package</phase>

          <goals>

            <goal>single</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

12.编辑源代码

vi src/main/java/com/packt/samples/App.java

package com.packt.samples;

import com.mongodb.MongoClient;

import com.mongodb.MongoClientURI;

import com.mongodb.client.MongoDatabase;

import com.mongodb.spark.MongoConnector;

import com.mongodb.spark.MongoSpark;

import com.mongodb.spark.config.ReadConfig;

import com.mongodb.spark.config.WriteConfig;

import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

import com.mongodb.spark.sql.helpers.StructFields;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructType;

import org.bson.Document;

import org.bson.types.ObjectId;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import static java.lang.String.format;

import static java.util.Arrays.asList;

import static java.util.Collections.singletonList;

public final class App {

    public static void main(final String[] args) throws InterruptedException {

        //构建 java spark 上下文对象

        JavaSparkContext jsc = createJavaSparkContext(args);

        //删除测试库

        dropDatabase(getMongoClientURI(args));

        // Add Sample Data,制造一些测试数据

        List<String> characters = asList(

            "{'name': 'Bilbo Baggins', 'age': 50}",

            "{'name': 'Gandalf', 'age': 1000}",

            "{'name': 'Thorin', 'age': 195}",

            "{'name': 'Balin', 'age': 178}",

            "{'name': 'Kíli', 'age': 77}",

            "{'name': 'Dwalin', 'age': 169}",

            "{'name': 'Óin', 'age': 167}",

            "{'name': 'Glóin', 'age': 158}",

            "{'name': 'Fíli', 'age': 82}",

            "{'name': 'Bombur'}"

        );

        //将测试数据集合先并行和,构造成 rdd,然后将 集合中的字符串数据,转换成 Document 对象,保存到 mongodb 中

        MongoSpark.save(jsc.parallelize(characters).map(new Function<String, Document>() {

            @Override

            public Document call(final String json) throws Exception {

                return Document.parse(json);

            }

        }));

        // Load inferring schema

        //将 jsc 连接的 mongodb 中的 rdd 转换成 dataset

        Dataset<Row> df = MongoSpark.load(jsc).toDF();

        //打印出这个 dataset 的 schema

        df.printSchema();

        //展示 dataset 中的数据

        df.show();

        //将这个 dataset 注册为临时表

        df.registerTempTable("characters2");

        //构造一个 spark session

        SparkSession sparkSession = SparkSession.builder().getOrCreate();

        //使用 sql 方法查询 dataset 中的数据

        Dataset<Row> centenarians2 = sparkSession.sql("SELECT name, age FROM characters2 WHERE age >= 100");

        //展示查询的结果

        centenarians2.show();

    }

    private static JavaSparkContext createJavaSparkContext(final String[] args) {

        //先获取 mongodb 地址字符串

        String uri = getMongoClientURI(args);

        //删除测试库

        dropDatabase(uri);

        //构建 spark conf 对象

        SparkConf conf = new SparkConf()

                .setAppName("MongoSparkConnectorTour")

                .set("spark.app.id", "MongoSparkConnectorTour")

                .set("spark.mongodb.input.uri", uri)

                .set("spark.mongodb.output.uri", uri);

        //构建 java spark 上下文对象

        return new JavaSparkContext(conf);

    }

    private static String getMongoClientURI(final String[] args) {

        String uri;

        if (args.length == 0) {

            uri = "mongodb://192.168.14.7/test.coll"; // 测试库地址

        } else {

            uri = args[0];

        }

        return uri;

    }

    private static void dropDatabase(final String connectionString) {

        MongoClientURI uri = new MongoClientURI(connectionString);

        new MongoClient(uri).dropDatabase(uri.getDatabase());

    }

}

13.编译

mvn clean package -DskipTests

14.提交任务

spark-submit --class com.packt.samples.App --master spark://192.168.14.8:7077  --executor-memory 1G --total-executor-cores 1 /root/com.packt.samples.archetype/target/com.packt.samples.archetype-1.0.0-jar-with-dependencies.jar

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容