Spring MongoTemplate批量操作源码跟踪与最佳实践

MongoTemplate介绍

MongoTemplate是Spring-data-mongodb实现的接口,用于对mongodb数据库的操作。绝大部分操作都包含在内。本文使用的包版本为spring-data-mongodb-2.1.5.RELEASE.jar,其他版本实现核心逻辑大致不变

MongoDb批量操作

db.collection.insertMany()

给定一个文档数组,insertMany() 将数组中的每个文档插入集合中。默认情况下,按顺序插入文档。如果ordered设置为false,则文档将以无序格式插入,并且可以通过重新排序来提高性能。如果使用无序,则应用程序不应依赖于插入的顺序。每个组中的操作数不能超过maxWriteBatchSize数据库的值。从MongoDB 3.6开始,此值为100,000。该值显示在isMaster.maxWriteBatchSize字段中。当然,很多高级API会对insert内容进行分割。

db.collection.bulkWrite()

批量操作,可以批量执行一组语句,与redis事务类似,默认为有序。当ordered设置为true的时候,当执行语句遇到错误则中断后续执行,当ordered设置为false的时候遇到错误继续执行。需要注意的是bulkWrite()只支持以下操作

  • insertOne
  • updateOne
  • updateMany
  • deleteOne
  • deleteMany
  • replaceOne

MongoTemplate中的批量操作

insertAll方法

方法定义为:
public <T> Collection<T> insertAll(Collection<? extends T> objectsToSave)
用于插入默认文档,入参与返回均为集合类型。
实现源码如下,部分代码已打注释

protected <T> Collection<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<T> writer) {
      //创建以collection为key的插入内容的List 
      Map<String, List<T>> elementsByCollection = new HashMap();
       //创建用于保存结果的数组
       List<T> savedObjects = new ArrayList(listToSave.size());
       //开始遍历
       Iterator var5 = listToSave.iterator();
       while(var5.hasNext()) {
           T element = var5.next();
           if (element != null) {
               MongoPersistentEntity<?> entity = (MongoPersistentEntity)this.mappingContext.getRequiredPersistentEntity(element.getClass());
               String collection = entity.getCollection();
               List<T> collectionElements = (List)elementsByCollection.get(collection);
              //拿到以collection为key的插入内容的List ,如果为空则新建
               if (null == collectionElements) {
                   collectionElements = new ArrayList();
                   elementsByCollection.put(collection, collectionElements);
               }
               //把插入内容逐条add到List
               ((List)collectionElements).add(element);
           }
       }
        //遍历map,组个collection调用this.doInsertBatch方法批量插入
       var5 = elementsByCollection.entrySet().iterator();
       while(var5.hasNext()) {
           Entry<String, List<T>> entry = (Entry)var5.next();
           savedObjects.addAll(this.doInsertBatch((String)entry.getKey(), (Collection)entry.getValue(), this.mongoConverter));
       }

       return savedObjects;
   }

从源码可以看出,insertAll核心逻辑为把方法入参的list按照collection分组,并按组执行this.doInsertBatch方法。这个方法为protected方法,只能内部及子类、同包调用。这个方法实现后续会说到

insert方法

MongoTemplate自带的insert方法也有批量插入的实现,其中的一个接口如下:

<T> Collection<T> insert(Collection<? extends T> var1, String var2);

入参为要插入mongodb的集合以及collection的name
方法实现为:

public <T> Collection<T> insert(Collection<? extends T> batchToSave, String collectionName) {
        Assert.notNull(batchToSave, "BatchToSave must not be null!");
        Assert.notNull(collectionName, "CollectionName must not be null!");
        return this.doInsertBatch(collectionName, batchToSave, this.mongoConverter);
    }

可以看出这个方法直接断言了入参不为空,然后调用了和insertAll分类后调用的同一个方法doInsertBatch();从这个角度看如果只插入一个collection,二者的实际底层实现是一样的

doInsertBatch方法

首先这个方法为protected 方法,不能从外部直接调用。这个方法为一个通用的批量插入方法。实现如下

    protected <T> Collection<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave, MongoWriter<T> writer) {
        Assert.notNull(writer, "MongoWriter must not be null!");
      //这个是用于插入的list
        List<Document> documentList = new ArrayList();
        List<T> initializedBatchToSave = new ArrayList(batchToSave.size());
        Iterator var6 = batchToSave.iterator();

        Object saved;
       //遍历要插入的集合,并做一系列的处理
        while(var6.hasNext()) {
            T uninitialized = var6.next();
            BeforeConvertEvent<T> event = new BeforeConvertEvent(uninitialized, collectionName);
            T toConvert = ((BeforeConvertEvent)this.maybeEmitEvent(event)).getSource();
            AdaptibleEntity<T> entity = this.operations.forEntity(toConvert, this.mongoConverter.getConversionService());
            entity.assertUpdateableIdIfNotSet();
            saved = entity.initializeVersionProperty();
            Document document = entity.toMappedDocument(writer).getDocument();
            this.maybeEmitEvent(new BeforeSaveEvent(saved, document, collectionName));
            documentList.add(document);
            initializedBatchToSave.add(saved);
        }
       //这里是关键,处理完之后调用insertDocumentList执行批量插入操作,并返回_id主键列表
        List<Object> ids = this.insertDocumentList(collectionName, documentList);
        List<T> savedObjects = new ArrayList(documentList.size());
        int i = 0;

        for(Iterator var17 = initializedBatchToSave.iterator(); var17.hasNext(); ++i) {
            T obj = var17.next();
            if (i < ids.size()) {
                saved = this.populateIdIfNecessary(obj, ids.get(i));
                this.maybeEmitEvent(new AfterSaveEvent(saved, (Document)documentList.get(i), collectionName));
                savedObjects.add(saved);
            } else {
                savedObjects.add(obj);
            }
        }

        return savedObjects;
    }
/**
这个是真正执行批量插入的方法
**/
protected List<Object> insertDocumentList(String collectionName, List<Document> documents) {
        if (documents.isEmpty()) {
            return Collections.emptyList();
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Inserting list of Documents containing {} items", documents.size());
            }

            this.execute(collectionName, (collection) -> {
                MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.INSERT_LIST, collectionName, (Class)null, (Document)null, (Document)null);
                WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
                if (writeConcernToUse == null) {
                    collection.insertMany(documents);
                } else {
                    collection.withWriteConcern(writeConcernToUse).insertMany(documents);
                }

                return null;
            });
            return MappedDocument.toIds(documents);
        }
    }

从源码中可以看出在doInsertBatch对集合进行处理之后调用了insertDocumentList来执行批量操作,并使用了MongoCollection的insertMany方法即java实现db.collection.insertMany()的接口。在这里执行批量插入后返回插入后主键 “_id”的集合。

insert方法和insertAll

可以看出外部可以调用的两个批量插入操作方法实际上底部逻辑是一样的,如果是针对某一MongoDB的集合进行批量操作,使用insert方法效率稍微高一些,免去了不必要的分类操作。这两个方法中核心的批量插入数据库方法底层都是调用的db.collection.insertMany()方法。

bulkOps方法

为批量操作方法,底层使用db.collection.bulkWrite();bulkOps方法定义之一如下

public BulkOperations bulkOps(BulkMode bulkMode, String collectionName)

其中BulkMode为批量模式,下方代码可以看到主要是有ordered和unordered两种模式,对应mongoDb的ordered和unordered,如果使用ordered,则进行顺序操作,如果使用unordered则不按顺序会根据情况重新排序。当使用ordered的时候如果前一条操作命令失败则终止,如果使用unordered模式,执行失败的语句会跳过,直至全部语句执行完毕。理论上来说unordered语句效率高于ordered语句

public static enum BulkMode {
        ORDERED,
        UNORDERED;

        private BulkMode() {
        }
    }

bulkOps()方法返回一个BulkOperations类型的对象,通过操作这个对象来添加批量语句。BulkOperations对象接口定义如下

public interface BulkOperations {
    BulkOperations insert(Object var1);

    BulkOperations insert(List<? extends Object> var1);

    BulkOperations updateOne(Query var1, Update var2);

    BulkOperations updateOne(List<Pair<Query, Update>> var1);

    BulkOperations updateMulti(Query var1, Update var2);

    BulkOperations updateMulti(List<Pair<Query, Update>> var1);

    BulkOperations upsert(Query var1, Update var2);

    BulkOperations upsert(List<Pair<Query, Update>> var1);

    BulkOperations remove(Query var1);

    BulkOperations remove(List<Query> var1);

    BulkWriteResult execute();

    public static enum BulkMode {
        ORDERED,
        UNORDERED;

        private BulkMode() {
        }
    }
}

mongoDb的db.collection.bulkWrite()批量操作的语句只支持以下命令 insertOne、 updateOne、updateMany、deleteOne、deleteMany、 replaceOne。可以看出BulkOperations 包含了所有的操作并且多出一个 insert(List<? extends Object> var1)方法,因为mongoDb的db.collection.bulkWrite()不支持insertMany,所以推断insert(List<? extends Object> var1)应该是java层做的封装。查看源码,在insert(List<? extends Object> var1)方法中使用了foreach直接把单行插入操作方法BulkOperations insert(Object var1)封装成多行。

public BulkOperations insert(List<? extends Object> documents) {
        Assert.notNull(documents, "Documents must not be null!");
        documents.forEach(this::insert);
        return this;
    }

bulkWrite()方法与insert/insertAll批量插入方法的区别

  • bulkWrite()原理是一次性提交多行语句然后一次性执行,横向对比来说,类似redis的事务、mysql的source导入执行sql文件。
  • insert/insertAll原理是语句级别的多行插入。类似mysql的insetNSERT INTO 表名([列名],[列名]) VALUES
    ([列值],[列值])),([列值],[列值])),([列值],[列值])),........([列值],[列值]));

单从批量插入效率来看,理论上insert/insertAll会快于bulkWrite()。后文将做测试来测试效率。
从排序来看,因为Spring的MongoTemplate类中的insert以及insertAll方法底层使用了MongoCollection的void insertMany(List<? extends TDocument> var1);方法。其中的实现如下:

public void insertMany(List<? extends TDocument> documents) {
        this.insertMany(documents, new InsertManyOptions());
    }

其中操作直接new了一个InsertManyOptions。而InsertManyOptions的默认ordered选项为true。

public final class InsertManyOptions {
    private boolean ordered = true;
    private Boolean bypassDocumentValidation;

    public InsertManyOptions() {
    }
    ...
    ...
    ...
    }

而insertMany方法最终会调用mongodb的db.collection.insertMany(document,writeConcern,ordered)方法,所以在spring-data-mongodb-2.1.5.RELEASE.jar这个版本的包使用insert/insertAll执行批量操作一定是有序操作。而使用bulkWrite()则可以选择有序也可以选择非有序。

MongoTemplate语句使用demo

//批量插入
 mongoTemplate.insert(new ArrayList<>(),"collectionDemoName");
//批量插入
 mongoTemplate.insertAll(new ArrayList<>());
//初始化BulkOperations 对象为ORDERED模式
BulkOperations orderedOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, "collectionDemoName");
orderedOperations.insert("demo1");
orderedOperations.insert("demo2");
orderedOperations.insert(new ArrayList<String>(Arrays.asList("demo3", "demo4")));
//提交批量执行
orderedOperations.execute();
//初始化BulkOperations 对象为UNORDERED模式
BulkOperations unorderedOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, "collectionDemoName");
orderedOperations.insert("demo5");
orderedOperations.insert("demo6");
orderedOperations.insert(new ArrayList<String>(Arrays.asList("demo7", "demo8")));
//提交批量执行
orderedOperations.execute();

批量插入效率测试

总结

mongoTemplate方法 mongoDb方法 特点 建议
insert(Collection<? extends T> var1, String var2) db.collection.insertMany 批量插入,效率高,有序 在追求高效插入的时候使用
insertAll(Collection<? extends T> var1) db.collection.insertMany 批量查询,自动分类,有序 在批量插入到多个集合的时候使用
bulkOps(BulkMode bulkMode, String collectionName).execute() db.collection.bulkWrite() 灵活,批量提交语句,有序可选 在需要批量提交不同语句,或者需要批量灵活使用语句的场景
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 一、MongoDB简介 1.概述 ​ MongoDB是一个基于分布式文件存储的数据库,由C++语言编写。旨在为WE...
    郑元吉阅读 977评论 0 2
  • MongoDB介绍 MongoDB是一个基于分布式文件存储的开源文档数据库。由C++语言编写。旨在为WEB应用提供...
    小厨笔记阅读 1,305评论 0 2
  • MongoDB讲义 为0 何要学习MongoDB 灵活的数据模型 MongoDB的文档数据模型使开发人员和数...
    小虎_9f76阅读 435评论 1 3
  • 客户端https://robomongo.org/ 连接mongodb://[username:password@...
    加勒比兔Z阅读 324评论 0 0
  • 1. 简介 数据库分类(1)关系型数据库,是指采用了关系模型来组织数据的数据库。关系型数据库遵循ACID规则。(2...
    nimw阅读 1,077评论 0 0