mongo recordid src

mongo recordid src

抽象类RecordStore,封装了一系列,记录操作相关的接口,定义原文如下:

/**
 * An abstraction used for storing documents in a collection or entries in an index.
 *
 * In storage engines implementing the KVEngine, record stores are also used for implementing
 * catalogs.
 *
 * Many methods take an OperationContext parameter. This contains the RecoveryUnit, with
 * all RecordStore specific transaction information, as well as the LockState. Methods that take
 * an OperationContext may throw a WriteConflictException.
 *
 * This class must be thread-safe for document-level locking storage engines. In addition, for
 * storage engines implementing the KVEngine some methods must be thread safe, see DurableCatalog.
 * Only for MMAPv1 is this class not thread-safe.
 */
class RecordStore

insert 一条记录相关的源码记录。

插入记录的接口定义

 /**通过复制传递的record data 将指定的记录插入到RecordStore中,并更新'inOutRecords'的值让其包含
 * 插入数据的id
     * Inserts the specified records into this RecordStore by copying the passed-in record data and
     * updates 'inOutRecords' to contain the ids of the inserted records.
     */
    virtual Status insertRecords(OperationContext* opCtx, // 操作的上下文
                                 std::vector<Record>* inOutRecords, // inOutRecords对象
                                 const std::vector<Timestamp>& timestamps) = 0; // 时戳列表

    /**插入单条记录的简单封装
     * A thin wrapper around insertRecords() to simplify handling of single document inserts.
     */
    StatusWith<RecordId> insertRecord(OperationContext* opCtx,
                                      const char* data,
                                      int len,
                                      Timestamp timestamp) {
        std::vector<Record> inOutRecords{Record{RecordId(), RecordData(data, len)}}; //
        // Record{RecordId(), RecordData(data, len)} -> 通过构建recordid 和data构建Record 对象
        //然后构建inOutRecords数组,只包含一个Record对象
        Status status = insertRecords(opCtx, &inOutRecords, std::vector<Timestamp>{timestamp});
        //传入时戳,调用接口进行插入
        if (!status.isOK())
            return status;
        return inOutRecords.front().id;
    }

插入相关的数据结构

Record对象

/**
 * The data items stored in a RecordStore.
 */
struct Record {
    RecordId id;
    RecordData data;
};

RecordId对象

RecordId对象 是一个collection或RecordStore中记录的唯一标识,是一个有范围的值,
有几种取值分类,null类型,最小保留值,最大大保留值,同时还有一个hasher哈希值对象,可以生成recordid的哈希值

/**
 * The key that uniquely identifies a Record in a Collection or RecordStore.
 */
class RecordId {
public:
    // This set of constants define the boundaries of the 'normal' and 'reserved' id ranges.
    static constexpr int64_t kNullRepr = 0;
    static constexpr int64_t kMinRepr = LLONG_MIN;
    static constexpr int64_t kMaxRepr = LLONG_MAX;
    static constexpr int64_t kMinReservedRepr = kMaxRepr - (1024 * 1024);

    /**
     * Enumerates all ids in the reserved range that have been allocated for a specific purpose.
     */
    enum class ReservedId : int64_t { kWildcardMultikeyMetadataId = kMinReservedRepr };

    /**
     * Constructs a Null RecordId.
     */
    RecordId() : _repr(kNullRepr) {}

    explicit RecordId(int64_t repr) : _repr(repr) {}

    explicit RecordId(ReservedId repr) : RecordId(static_cast<int64_t>(repr)) {}

    /**
     * Construct a RecordId from two halves.
     * TODO consider removing.
     */
    RecordId(int high, int low) : _repr((uint64_t(high) << 32) | uint32_t(low)) {}

    /**
     * A RecordId that compares less than all ids that represent documents in a collection.
     */
    static RecordId min() {
        return RecordId(kMinRepr);
    }

    /**
     * A RecordId that compares greater than all ids that represent documents in a collection.
     */
    static RecordId max() {
        return RecordId(kMaxRepr);
    }

    /**
     * Returns the first record in the reserved id range at the top of the RecordId space.
     */
    static RecordId minReserved() {
        return RecordId(kMinReservedRepr);
    }

    bool isNull() const {
        return _repr == 0;
    }

    int64_t repr() const {
        return _repr;
    }

    /**
     * Valid RecordIds are the only ones which may be used to represent Records. The range of valid
     * RecordIds includes both "normal" ids that refer to user data, and "reserved" ids that are
     * used internally. All RecordIds outside of the valid range are sentinel values.
     */
    bool isValid() const {
        return isNormal() || isReserved();
    }

    /**
     * Normal RecordIds are those which fall within the range used to represent normal user data,
     * excluding the reserved range at the top of the RecordId space.
     */
    bool isNormal() const {
        return _repr > 0 && _repr < kMinReservedRepr;
    }

    /**
     * Returns true if this RecordId falls within the reserved range at the top of the record space.
     */
    bool isReserved() const {
        return _repr >= kMinReservedRepr && _repr < kMaxRepr;
    }

    int compare(RecordId rhs) const {
        return _repr == rhs._repr ? 0 : _repr < rhs._repr ? -1 : 1;
    }

    /**
     * Hash value for this RecordId. The hash implementation may be modified, and its behavior
     * may differ across platforms. Hash values should not be persisted.
     */
    struct Hasher {
        size_t operator()(RecordId rid) const {
            size_t hash = 0;
            // TODO consider better hashes
            boost::hash_combine(hash, rid.repr());
            return hash;
        }
    };

    /// members for Sorter
    struct SorterDeserializeSettings {};  // unused
    void serializeForSorter(BufBuilder& buf) const {
        buf.appendNum(static_cast<long long>(_repr));
    }
    static RecordId deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
        return RecordId(buf.read<LittleEndian<int64_t>>());
    }
    int memUsageForSorter() const {
        return sizeof(RecordId);
    }
    RecordId getOwned() const {
        return *this;
    }

private:
    int64_t _repr;
};

RecordData 对象

/**
 * A replacement for the Record class. This class represents data in a record store.
 * The _ownedData attribute is used to manage memory ownership.
 */
class RecordData {
public:
    RecordData() : _data(NULL), _size(0) {}
    RecordData(const char* data, int size) : _data(data), _size(size) {}

    RecordData(SharedBuffer ownedData, int size)
        : _data(ownedData.get()), _size(size), _ownedData(std::move(ownedData)) {}

    const char* data() const {
        return _data;
    }

    int size() const {
        return _size;
    }

    /**
     * Returns true if this owns its own memory, and false otherwise
     */
    bool isOwned() const {
        return _ownedData.get();
    }

    SharedBuffer releaseBuffer() {
        return std::move(_ownedData);
    }

    BSONObj toBson() const& {
        return isOwned() ? BSONObj(_ownedData) : BSONObj(_data);
    }

    BSONObj releaseToBson() {
        return isOwned() ? BSONObj(releaseBuffer()) : BSONObj(_data);
    }

    BSONObj toBson() && {
        return releaseToBson();
    }

    RecordData getOwned() const {
        if (isOwned())
            return *this;
        auto buffer = SharedBuffer::allocate(_size);
        memcpy(buffer.get(), _data, _size);
        return RecordData(buffer, _size);
    }

    void makeOwned() {
        if (isOwned())
            return;
        *this = getOwned();
    }

private:
    const char* _data;
    int _size;
    SharedBuffer _ownedData;
};

wiredTiger 中对 insert 接口的实现

virtual Status insertRecords

Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx,
                                            std::vector<Record>* records,
                                            const std::vector<Timestamp>& timestamps) {
    return _insertRecords(opCtx, records->data(), timestamps.data(), records->size());
}


Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
                                             Record* records,
                                             const Timestamp* timestamps,
                                             size_t nRecords) {
    dassert(opCtx->lockState()->isWriteLocked()); // 获取到写锁

    // We are kind of cheating on capped collections since we write all of them at once ....
    // Simplest way out would be to just block vector writes for everything except oplog ?
    int64_t totalLength = 0;
    for (size_t i = 0; i < nRecords; i++)
        totalLength += records[i].data.size();

    // caller will retry one element at a time
    // 如果store engine 是内存型的并且插入数据总的大小大于内存大小,之间退出
    /*
     // The capped settings should not be updated once operations have started
    const bool _isCapped;
    // True if the storage engine is an in-memory storage engine
    */
    if (_isCapped && totalLength > _cappedMaxSize)
        return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");

    WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); // 通过表id以及uri等参数获取wtCursor对象
    curwrap.assertInActiveTxn();
    WT_CURSOR* c = curwrap.get(); 
    invariant(c);

    RecordId highestId = RecordId(); // 默认recordid
    dassert(nRecords != 0);
    for (size_t i = 0; i < nRecords; i++) {
        auto& record = records[i];
        if (_isOplog) {// 是读oplog进行表数据更新
            StatusWith<RecordId> status =
                oploghack::extractKey(record.data.data(), record.data.size());
            if (!status.isOK())
                return status.getStatus();
            record.id = status.getValue();
        } else {// 直接插入新的记录
            record.id = _nextId(opCtx);
            //获取opCtx下的有效的recordid
            // 会先找到最近一个在使用的id,并在该基础上+1

        }
        dassert(record.id > highestId);
        highestId = record.id;
    }

    for (size_t i = 0; i < nRecords; i++) {
        auto& record = records[i];
        Timestamp ts;
        if (timestamps[i].isNull() && _isOplog) {
            // If the timestamp is 0, that probably means someone inserted a document directly
            // into the oplog.  In this case, use the RecordId as the timestamp, since they are
            // one and the same. Setting this transaction to be unordered will trigger a journal
            // flush. Because these are direct writes into the oplog, the machinery to trigger a
            // journal flush is bypassed. A followup oplog read will require a fresh visibility
            // value to make progress.
            ts = Timestamp(record.id.repr());
            opCtx->recoveryUnit()->setOrderedCommit(false);
        } else {
            ts = timestamps[i];
        }
        if (!ts.isNull()) {
            LOG(4) << "inserting record with timestamp " << ts;
            fassert(39001, opCtx->recoveryUnit()->setTimestamp(ts));
        }
        setKey(c, record.id);
        WiredTigerItem value(record.data.data(), record.data.size());
        c->set_value(c, value.Get());
        int ret = WT_OP_CHECK(c->insert(c));
        if (ret)
            return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord");
    }

    _changeNumRecords(opCtx, nRecords);
    _increaseDataSize(opCtx, totalLength);

    if (_oplogStones) {
        _oplogStones->updateCurrentStoneAfterInsertOnCommit(
            opCtx, totalLength, highestId, nRecords);
    } else {
        _cappedDeleteAsNeeded(opCtx, highestId);
    }

    return Status::OK();
}



©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352

推荐阅读更多精彩内容