Apache Spark连接MongoDB

大数据处理引擎Apache Spark与Mongodb相结合,构建一个复杂的实时分析系统。通过spark-mongodb连接器可以将spark与mongodb数据库连接起来。

image.png

1.前提

  • 安装并运行Mongodb
  • Spark 2.1
  • Scala 2.11

2.安装MongoDB(通过yum安装)

MongoDB安装教程

2.1配置yum源

vim /etc/yum.repos.d/mongodb-org-3.4.repo

添加以下内容:

[mongodb-org-3.4]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/3.4/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-3.4.asc

2.2安装

yum install -y mongodb-org

安装过程中出现验证错误的情况,则取消gpgcheck的验证。修改mongodb-org-3.4.repo文件后重新安装:

//取消验证
gpgcheck=0

2.3yum安装后的文件位置

安装完后配置文件在:/etc/mongod.conf

数据文件在:/var/lib/mongo

日志文件在:/var/log/mongodb

2.4启动服务

// 启动 
systemctl start  mongod.service
// 检查是否启动
systemctl status  mongod.service
// 关闭 
systemctl stop  mongod.service
// 重启服务
systemctl restart  mongod.service
// 设置开机启动
systemctl enable  mongod.service

2.5访问权限控制

MongoDB服务默认只绑定在本机IP上,即只有本机才能访问MongoDB,我们可以修改访问权限控制让外网也能访问。

修改配置文件/etc/mongod.conf将其中的bindip:127.0.0.1注释即可。

image.png

2.6进入mongodb命令行

image.png

3.运行mongo-spark

本文使用mongodb官方的MongoDB Connector for Apache Spark,先下载MongoDB Connector for Apache Spark ,MongoDB Connector for Apache Spark的使用方法非常简单,到spark的安装的bin目录下执行下面语句:

按照官方文档启动Spark Connector Scala Guide

./spark-shell  --conf "spark.mongodb.input.uri=mongodb://localhost:27017/test.test?authSource=admin" --conf "spark.mongodb.output.uri=mongodb://localhost:27017/test.test?authSource=admin" --packages org.mongodb.spark:mongo-spark-connector_2.10:2.2.1

//test是数据库名称。myCollection是集合名称
//读配置
The spark.mongodb.input.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.
//写配置
The spark.mongodb.output.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data. Connects to port 27017 by default.
//这里mongo-spark的包的要换成对应的版本号
The packages option specifies the Spark Connector’s Maven coordinates, in the format groupId:artifactId:version.
image.png

4.运行实例

导入MongoDB连接包

import com.mongodb.spark._

在读写mongodb数据库时会自动连接mongodb。

import org.bson.Document

使用Datasets and SQL,对mongodb数据库进行增删改查。

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().master("local").appName("MongoSparkConnectorIntro").config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.test").config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.test").getOrCreate()

import com.mongodb.spark._
import org.bson.Document
val df = MongoSpark.load(sparkSession)
df.printSchema() 

在Spark使用结束时,务必使用 :quit退出。否则将导致错误。

5.MongoDB的基本操作

传统数据库与Mongodb数据库结构区别图

image.png

创建数据库

// 如果数据库不存在则创建,否则切换到指定数据库(当没有进行操作然后离开的时候会自动删除该数据库)
use DATABASE_NAME  
db 查看当前数据库名
show dbs 查看所有数据库
show tables 查看集合

删除数据库

db.dropDatabase() 删除当前数据库
db.collection.drop() 删除集合,collection为集合名,例db.student.drop()

创建集合

>db.test.insert({"id":"1002", 
    "name": "lisi",
})
image.png

test是我们的集合名,如果该集合不在数据库中,Mongodb会自动创建该集合并插入文档。

//查看已插入的文档
db.test.find();

删除集合

db.test.drop();
image.png
插入文档
//单条文档插入
>db.test.insert({"id":10,"name":"zhangsan"})
//循环批量插入文档
>for(var i=1;i<=20;i++) {db.users.insert({"id":10,"name":"zhangsan"})}
更新文档

update()方法

db.collection.update(
    <query>,  // 查询条件
    <update>,  //update的对象和一些更新的操作符
    {
        upsert: <boolean>,  // 如果不存在update的记录,是否插入objNew,true为插入,默认是false,不插入
        multi: <boolean>,   // mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新。
        writeConcern: <document>  // 可选,抛出异常的级别
    }
)

所有修改器包括:
 $set 对某一个键值进行修改,如果没有就添加 ex:db.person.update({_id:1},{$set:{name:"lwd"}})
 $inc 只使用于数值类型,对其进行加减操作 ex:db.person.update({_id:1},{$inc:{id:1}})
 $unset 删除某指定的键 ex:db.person.update({_id:1},{$unset{id:1}})
 $push 只使用于数组类型,在数组里面加值 ex:db.person.update({_id=1},{$push:{class:"1"}})
 $pushAll 只使用于数组类型,批量在数组价值 ex:db.person.update({_id=1},{$pullAll:class:["1","2","3"]})
 $addToSet 只使用于数组类型,当没有的时候才会进行操作 ex:db.person.update({_id=1},{$addToSet:{name:"lwd"}})
 $pop 只使用于数组,删除某一个元素-1为第一个 1为最后一个 ex:db.person.update({_id=1},{$pop:{name:1}})
 $pull 删除某一个元素 直接写要删除的东西 ex:db.person.update({_id=1},{$pull:{name:"lwd"}})
 $pullAll 一次性删除某元素 ex:db.person.update({_id:1},{$pullAll:{class:["1","2"]}})
 $addToSet与$each批量数组更新 ex: db.person.update({_id:1},{$adaToSet:{class:{$each:["1","2","3"]}}})

例子
db.test.update({"name":"lisi"},{$set:{"name":"wangwu"}})

以上语句只会修改第一条发现的文档,如果你要修改多条相同的文档,则需要设置multi参数为true

image.png

save()方法

db.collection.save(
    <document>, // 传入文档用来替换之前的文档
    {
        writeConcern: <document>
    }
)

//实例
db.test.save(
    {"id":"1003","name":"zhaoliu"}
)
image.png

save()方法与insert的区别:

save()方法可以插入相同的id的数据(id为系统自动生成的那个_id),而insert插入会出错。

image.png
删除文档

remove:删除表中的记录(根据条件删除)

db.collection.remove(
    <query>,       // 可选,删除的文档的条件
    {
        justOne:<boolean>,   // 可选,如果为true或1,则只删除一个文档
        writeConcern: <document>  // 可选,抛出异常级别
    }
)

例子
db.test.remove({"id":"1003"})

db.test.remove({}) 删除所有数据

image.png
查询文档
db.test.find() 查看已插入的文档
db.test.find().pretty() 输出好看的格式
db.test.findOne() 只返回一个文档,一般是返回第一个文档
指定返回的键:db.文档名.find({条件},{键指定}) 
image.png

条件操作符

(>) 大于 – $gt
(<) 小于 – $lt
(>=) 大于等于 – $gte
(<= ) 小于等于 – $lte
db.test.find({"id" : {$gt : "1002"}}) 查找likes大于100的数据
image.png

AND条件

传入多个键值然后逗号隔开
db.test.find({key1: value1, key2: value2}).pretty()
db.test.find({"id":"1002","name":"wangwu"})
image.png

OR条件

 >db.test.find({$or:[{"id":"1002"}, {"name":"jerry"}]}).pretty()
image.png

limit and skip

db.test.find().limit(2) 只读取两条
db.test.find().limit(1).skip(1) 跳过第一条,只显示第二条
skip默认为0
image.png

sort()方法

1升序,-1降序
db.test.find().sort({"id": 1})
//第一个括弧中时查询条件,第二个括弧中显示内容:1会显示出来字段,0不会显示字段
db.test.find({}, {"id": 1, "_id": 0}).sort({"id": -1})
image.png

参考资料

https://www.mongodb.com/products/spark-connector
https://docs.mongodb.com/spark-connector/current/
https://university.mongodb.com/activate?key=03adb7a09af048bdb68e9703bb11c512&utm_campaign=verification&utm_medium=email&utm_source=university
https://github.com/mongodb/mongo-spark?jmp=hero
http://blog.csdn.net/chenguohong88/article/details/77850882
http://www.36dsj.com/archives/80662

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容