读《PostgreSQL源码解读》之插入行数据(三)

0. 前言

本文是对作者EthanHe的PostgreSQL源码解读系列文章(https://www.jianshu.com/u/6b8fc3f18f72)的读后总结,稍加了一些自我理解和自身的测试,并非全部原创,仅用于加深印象和方便后续查阅。

“读《PostgreSQL源码解读》之插入行数据” 系列文章共计12篇,主要分析了PG插入行数据部分的源码,同时结合gdb跟踪源码进行测试和辅佐分析。本文主要研究的是PG插入一行数据过程的方法(heap_insert)的源码逻辑。

本系列文章和后续研究PG源码的研究方法如下【摘自EthanHe的博文】:

通过psql从插入一行数据的最小方法/函数(PageAddItemExtended)为出发点,深入理解该函数后,使用gdb跟踪该函数的调用栈,根据调用栈的函数信息逐步上溯到最顶层的调用入口函数或主函数,每上溯一层就把该层函数相关的数据结构、宏定义和依赖的子函数完全彻底的理解清楚。通过这么一个过程,把插入数据相关联的知识体系建立起来,比如Page存储结构、Buffer的管理、WAL日志相关管理、SQL解析执行、前后台接口等相关知识。有了这个脉络,有了相关的数据结构作为基础,再来理解其他操作,比如UPDATE/DELETE等DML、CREATE TABLE/ALTER TABLE等DDL语句、SELECT等查询语句等就相对容易很多。

由于PG源码十分浩瀚,与其自顶而下,陷入巨大的迷茫,不如采用EthanHe这种方法,自下而上,逐步溯源。弄懂一条分支,自然会对其他分支触类旁通。不失为一种很好的研究方法。

1. 数据结构、宏和函数声明

\src\include\c.h
typedef uint32 CommandId;
typedef uint32 TransactionId;

\src\backend\access\transam\xact.c
/*
 *  GetCurrentTransactionId函数
 *
 *  返回当前事务id
 */
TransactionId
GetCurrentTransactionId(void)
{
    TransactionState s = CurrentTransactionState;

    if (!TransactionIdIsValid(s->transactionId))
        AssignTransactionId(s);
    return s->transactionId;
}

\src\include\access\heapam.h
/*  heap_insert函数的"options" 标志位取值 */
#define HEAP_INSERT_SKIP_WAL    0x0001
#define HEAP_INSERT_SKIP_FSM    0x0002
#define HEAP_INSERT_FROZEN      0x0004
#define HEAP_INSERT_SPECULATIVE 0x0008
#define HEAP_INSERT_NO_LOGICAL  0x0010

typedef struct BulkInsertStateData *BulkInsertState;

\src\include\access\hio.h
/*
 * 批量插入的状态 --- heapam.c and hio.c私有
 *
 * 如果current_buf 有效, 我们即锁定了此buffer
 *
 * "typedef struct BulkInsertStateData *BulkInsertState" is in heapam.h
 */
typedef struct BulkInsertStateData
{
    BufferAccessStrategy strategy;  /* our BULKWRITE strategy object */
    Buffer      current_buf;    /* current insertion target page */
}           BulkInsertStateData;

\src\include\access\htup_details.h
元组的最大长度,包括了元组头和因对齐长度而补齐的零
#define MaxHeapTupleSize  (BLCKSZ - MAXALIGN(SizeOfPageHeaderData + sizeof(ItemIdData)))

获取元组的OID
#define HeapTupleGetOid(tuple) \
        HeapTupleHeaderGetOid((tuple)->t_data)
设置元组的OID
#define HeapTupleSetOid(tuple, oid) \
        HeapTupleHeaderSetOid((tuple)->t_data, (oid))

从元组头中获取元组的OID
#define HeapTupleHeaderGetOid(tup) \
( \
    ((tup)->t_infomask & HEAP_HASOID) ? \
        *((Oid *) ((char *)(tup) + (tup)->t_hoff - sizeof(Oid))) \
    : \
        InvalidOid \
)

设置元组头的OID
#define HeapTupleHeaderSetOid(tup, oid) \
do { \
    Assert((tup)->t_infomask & HEAP_HASOID); \
    *((Oid *) ((char *)(tup) + (tup)->t_hoff - sizeof(Oid))) = (oid); \
} while (0)

\src\include\utils\rel.h
/*
 * RelationGetTargetBlock
 *      获取缓存的relation中的当前插入目标块号.
 */
#define RelationGetTargetBlock(relation) \
    ( (relation)->rd_smgr != NULL ? (relation)->rd_smgr->smgr_targblock : InvalidBlockNumber )


\src\backend\storage\freespace\freespace.c
/*
 * GetPageWithFreeSpace - 获取具有不低于指定空闲空间大小的
 *      文件块的块号.
 */
BlockNumber
GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
{
    uint8       min_cat = fsm_space_needed_to_cat(spaceNeeded);

    return fsm_search(rel, min_cat);
}


/*
 * 返回指定的空闲空闲大小所处的级别(取值范围0 - 255)
 */
static uint8
fsm_space_needed_to_cat(Size needed)
{
    int         cat;

    /* 指定的空闲空间级别不能大于最大值255 */
    if (needed > MaxFSMRequestSize)
        elog(ERROR, "invalid FSM request size %zu", needed);

    if (needed == 0)
        return 1;

        /*FSM级别的计算方法,其中FSM_CAT_STEP等于32*/
    cat = (needed + FSM_CAT_STEP - 1) / FSM_CAT_STEP;

    if (cat > 255)
        cat = 255;

    return (uint8) cat;
}

/*
 * 对FSM 块构成的三层大根堆进行搜索,查找空闲空间级别不低于min_cat的FSM块所对应的文件块块号BlockNumber.
 */
static BlockNumber
fsm_search(Relation rel, uint8 min_cat)
{
    int         restarts = 0;
    FSMAddress  addr = FSM_ROOT_ADDRESS;

    for (;;)
    {
        int         slot;
        Buffer      buf;
        uint8       max_avail = 0;

        /* Read the FSM page. */
        buf = fsm_readbuf(rel, addr, false);

        /* Search within the page */
        if (BufferIsValid(buf))
        {
            LockBuffer(buf, BUFFER_LOCK_SHARE);
            slot = fsm_search_avail(buf, min_cat,
                                    (addr.level == FSM_BOTTOM_LEVEL),
                                    false);
            if (slot == -1)
                max_avail = fsm_get_max_avail(BufferGetPage(buf));
            UnlockReleaseBuffer(buf);
        }
        else
            slot = -1;

        if (slot != -1)
        {
            if (addr.level == FSM_BOTTOM_LEVEL)
                return fsm_get_heap_blk(addr, slot);

            addr = fsm_get_child(addr, slot);
        }
        else if (addr.level == FSM_ROOT_LEVEL)
        {
            return InvalidBlockNumber;
        }
        else
        {
            uint16      parentslot;
            FSMAddress  parent;

            parent = fsm_get_parent(addr, &parentslot);
            fsm_set_and_search(rel, parent, parentslot, max_avail, 0);

            if (restarts++ > 10000)
                return InvalidBlockNumber;

            addr = FSM_ROOT_ADDRESS;
        }
    }
}

\src\backend\access\heap\hio.c
/*
 * 读取从relation中读取buffer,如果bistate不为空,根据bistate获取.
 */
static Buffer
ReadBufferBI(Relation relation, BlockNumber targetBlock,
             BulkInsertState bistate)
{
    Buffer      buffer;

    /* If not bulk-insert, exactly like ReadBuffer */
    if (!bistate)
        return ReadBuffer(relation, targetBlock);

    /* If we have the desired block already pinned, re-pin and return it */
    if (bistate->current_buf != InvalidBuffer)
    {
        if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
        {
            IncrBufferRefCount(bistate->current_buf);
            return bistate->current_buf;
        }
        /* ... else drop the old buffer */
        ReleaseBuffer(bistate->current_buf);
        bistate->current_buf = InvalidBuffer;
    }

    /* Perform a read using the buffer strategy */
    buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
                                RBM_NORMAL, bistate->strategy);

    /* Save the selected block as target for future inserts */
    IncrBufferRefCount(buffer);
    bistate->current_buf = buffer;

    return buffer;
}

\src\include\access\visibilitymap.h
/* 一个文件块所需比特位数 */
#define BITS_PER_HEAPBLOCK 2

\src\backend\access\heap\visibilitymap.c
/*#define TRACE_VISIBILITYMAP */

/*
 * Size of the bitmap on each visibility map page, in bytes. There's no
 * extra headers, so the whole page minus the standard page header is
 * used for the bitmap.
 */
#define MAPSIZE (BLCKSZ - MAXALIGN(SizeOfPageHeaderData))

/* 一个字节可以表示的文件块数量 */
#define HEAPBLOCKS_PER_BYTE (BITS_PER_BYTE / BITS_PER_HEAPBLOCK)

/* 一个VM块可以表示的文件块的数量. */
#define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)

/* 从堆块号到VM块中的bit位的映射 */
#define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)

3.源码分析

  1. heap_insert
/*
 *  heap_insert     - insert tuple into a heap
 *
 * The new tuple is stamped with current transaction ID and the specified
 * command ID.
 *
 * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
 * logged in WAL, even for a non-temp relation.  Safe usage of this behavior
 * requires that we arrange that all new tuples go into new pages not
 * containing any tuples from other transactions, and that the relation gets
 * fsync'd before commit.  (See also heap_sync() comments)
 *
 * The HEAP_INSERT_SKIP_FSM option is passed directly to
 * RelationGetBufferForTuple, which see for more info.
 *
 * HEAP_INSERT_FROZEN should only be specified for inserts into
 * relfilenodes created during the current subtransaction and when
 * there are no prior snapshots or pre-existing portals open.
 * This causes rows to be frozen, which is an MVCC violation and
 * requires explicit options chosen by user.
 *
 * HEAP_INSERT_IS_SPECULATIVE is used on so-called "speculative insertions",
 * which can be backed out afterwards without aborting the whole transaction.
 * Other sessions can wait for the speculative insertion to be confirmed,
 * turning it into a regular tuple, or aborted, as if it never existed.
 * Speculatively inserted tuples behave as "value locks" of short duration,
 * used to implement INSERT .. ON CONFLICT.
 *
 * HEAP_INSERT_NO_LOGICAL force-disables the emitting of logical decoding
 * information for the tuple. This should solely be used during table rewrites
 * where RelationIsLogicallyLogged(relation) is not yet accurate for the new
 * relation.
 *
 * Note that most of these options will be applied when inserting into the
 * heap's TOAST table, too, if the tuple requires any out-of-line data.  Only
 * HEAP_INSERT_IS_SPECULATIVE is explicitly ignored, as the toast data does
 * not partake in speculative insertion.
 *
 * The BulkInsertState object (if any; bistate can be NULL for default
 * behavior) is also just passed through to RelationGetBufferForTuple.
 *
 * The return value is the OID assigned to the tuple (either here or by the
 * caller), or InvalidOid if no OID.  The header fields of *tup are updated
 * to match the stored tuple; in particular tup->t_self receives the actual
 * TID where the tuple was stored.  But note that any toasting of fields
 * within the tuple data is NOT reflected into *tup.
 */
/*
输入:
    relation-数据表结构体
    tup-Heap Tuple数据(包括头部数据等),亦即数据行
    cid-命令ID(顺序)
    options-选项
    bistate-BulkInsert状态
输出:
    Oid-数据表Oid
*/
Oid
heap_insert(Relation relation, HeapTuple tup, CommandId cid,
            int options, BulkInsertState bistate)
{
    TransactionId xid = GetCurrentTransactionId();//事务id
    HeapTuple   heaptup;//Heap Tuple数据,亦即数据行
    Buffer      buffer;//数据缓存块
    Buffer      vmbuffer = InvalidBuffer;//vm缓冲块
    bool        all_visible_cleared = false;//标记

    /*
     * Fill in tuple header fields, assign an OID, and toast the tuple if
     * necessary.
     *
     * Note: below this point, heaptup is the data we actually intend to store
     * into the relation; tup is the caller's original untoasted data.
     */
    //插入前准备工作,比如设置t_infomask标记等
    heaptup = heap_prepare_insert(relation, tup, xid, cid, options);

    /*
     * Find buffer to insert this tuple into.  If the page is all visible,
     * this will also pin the requisite visibility map page.
     */
    //获取相应的buffer,详见上面的子函数解析
    buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
                                       InvalidBuffer, options, bistate,
                                       &vmbuffer, NULL);

    /*
     * We're about to do the actual insert -- but check for conflict first, to
     * avoid possibly having to roll back work we've just done.
     *
     * This is safe without a recheck as long as there is no possibility of
     * another process scanning the page between this check and the insert
     * being visible to the scan (i.e., an exclusive buffer content lock is
     * continuously held from this point until the tuple insert is visible).
     *
     * For a heap insert, we only need to check for table-level SSI locks. Our
     * new tuple can't possibly conflict with existing tuple locks, and heap
     * page locks are only consolidated versions of tuple locks; they do not
     * lock "gaps" as index page locks do.  So we don't need to specify a
     * buffer when making the call, which makes for a faster check.
     */
    //检查序列化是否冲突
    CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);

    /* NO EREPORT(ERROR) from here till changes are logged */
    //开始,变量+1
    START_CRIT_SECTION();
    //插入数据(详见上一节对该函数的解析)
    RelationPutHeapTuple(relation, buffer, heaptup,
                         (options & HEAP_INSERT_SPECULATIVE) != 0);
    //如Page is All Visible
    if (PageIsAllVisible(BufferGetPage(buffer)))
    {
        //复位
        all_visible_cleared = true;
        PageClearAllVisible(BufferGetPage(buffer));
        visibilitymap_clear(relation,
                            ItemPointerGetBlockNumber(&(heaptup->t_self)),
                            vmbuffer, VISIBILITYMAP_VALID_BITS);
    }

    /*
     * XXX Should we set PageSetPrunable on this page ?
     *
     * The inserting transaction may eventually abort thus making this tuple
     * DEAD and hence available for pruning. Though we don't want to optimize
     * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the
     * aborted tuple will never be pruned until next vacuum is triggered.
     *
     * If you do add PageSetPrunable here, add it in heap_xlog_insert too.
     */
    //设置缓冲块为脏块
    MarkBufferDirty(buffer);

    /* XLOG stuff */
    //记录日志
    if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
    {
        xl_heap_insert xlrec;
        xl_heap_header xlhdr;
        XLogRecPtr  recptr;
        Page        page = BufferGetPage(buffer);
        uint8       info = XLOG_HEAP_INSERT;
        int         bufflags = 0;

        /*
         * If this is a catalog, we need to transmit combocids to properly
         * decode, so log that as well.
         */
        if (RelationIsAccessibleInLogicalDecoding(relation))
            log_heap_new_cid(relation, heaptup);

        /*
         * If this is the single and first tuple on page, we can reinit the
         * page instead of restoring the whole thing.  Set flag, and hide
         * buffer references from XLogInsert.
         */
        if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
            PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
        {
            info |= XLOG_HEAP_INIT_PAGE;
            bufflags |= REGBUF_WILL_INIT;
        }

        xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
        xlrec.flags = 0;
        if (all_visible_cleared)
            xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
        if (options & HEAP_INSERT_SPECULATIVE)
            xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
        Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));

        /*
         * For logical decoding, we need the tuple even if we're doing a full
         * page write, so make sure it's included even if we take a full-page
         * image. (XXX We could alternatively store a pointer into the FPW).
         */
        if (RelationIsLogicallyLogged(relation))
        {
            xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
            bufflags |= REGBUF_KEEP_DATA;
        }

        XLogBeginInsert();
        XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);

        xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
        xlhdr.t_infomask = heaptup->t_data->t_infomask;
        xlhdr.t_hoff = heaptup->t_data->t_hoff;

        /*
         * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
         * write the whole page to the xlog, we don't need to store
         * xl_heap_header in the xlog.
         */
        XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
        XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
        /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
        XLogRegisterBufData(0,
                            (char *) heaptup->t_data + SizeofHeapTupleHeader,
                            heaptup->t_len - SizeofHeapTupleHeader);

        /* filtering by origin on a row level is much more efficient */
        XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);

        recptr = XLogInsert(RM_HEAP_ID, info);

        PageSetLSN(page, recptr);
    }
    //完成!
    END_CRIT_SECTION();
    //解锁Buffer,包括vm buffer
    UnlockReleaseBuffer(buffer);
    if (vmbuffer != InvalidBuffer)
        ReleaseBuffer(vmbuffer);

    /*
     * If tuple is cachable, mark it for invalidation from the caches in case
     * we abort.  Note it is OK to do this after releasing the buffer, because
     * the heaptup data structure is all in local memory, not in the shared
     * buffer.
     */
    //缓存操作后变“无效”的Tuple
    CacheInvalidateHeapTuple(relation, heaptup, NULL);

    /* Note: speculative insertions are counted too, even if aborted later */
    //更新统计信息
    pgstat_count_heap_insert(relation, 1);

    /*
     * If heaptup is a private copy, release it.  Don't forget to copy t_self
     * back to the caller's image, too.
     */
    if (heaptup != tup)
    {
        tup->t_self = heaptup->t_self;
        heap_freetuple(heaptup);
    }

    return HeapTupleGetOid(tup);
}
  1. heap_prepare_insert
/*
 * heap_insert()的子函数. 插入元组前做一些准备工作. 这将设置
 * 元组头, 为元组分配OID, 如果需要的话还会使用TOAST技术存储超长字段.
 * 返回值是原始的或使用TOAST技术处理过的元组HeapTuple。
 * 注意,无论如何, 元组的头部信息(tup->t_data)都会得到更新。
 */
static HeapTuple
heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
                    CommandId cid, int options)
{
    /*
     * 暂不支持并行模式
     */
    if (IsInParallelMode())
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                 errmsg("cannot insert tuples during a parallel operation")));
        
    if (relation->rd_rel->relhasoids)
    {
#ifdef NOT_USED
        /* this is redundant with an Assert in HeapTupleSetOid */
        Assert(tup->t_data->t_infomask & HEAP_HASOID);
#endif

        /*
         * If the object id of this tuple has already been assigned, trust the
         * caller.  There are a couple of ways this can happen.  At initial db
         * creation, the backend program sets oids for tuples. When we define
         * an index, we set the oid.  Finally, in the future, we may allow
         * users to set their own object ids in order to support a persistent
         * object store (objects need to contain pointers to one another).
         */
        if (!OidIsValid(HeapTupleGetOid(tup)))
            HeapTupleSetOid(tup, GetNewOid(relation));
    }
    else
    {
        /* check there is not space for an OID */
        Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
    }

    tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
    tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
    tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
    HeapTupleHeaderSetXmin(tup->t_data, xid);
    if (options & HEAP_INSERT_FROZEN)
        HeapTupleHeaderSetXminFrozen(tup->t_data);

    HeapTupleHeaderSetCmin(tup->t_data, cid);
    HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
    tup->t_tableOid = RelationGetRelid(relation);

    /*
     * If the new tuple is too big for storage or contains already toasted
     * out-of-line attributes from some other relation, invoke the toaster.
     */
    if (relation->rd_rel->relkind != RELKIND_RELATION &&
        relation->rd_rel->relkind != RELKIND_MATVIEW)
    {
        /* toast table entries should never be recursively toasted */
        Assert(!HeapTupleHasExternal(tup));
        return tup;
    }
    else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
        return toast_insert_or_update(relation, tup, NULL, options);
    else
        return tup;
}
  1. RelationGetBufferForTuple
    \src\backend\access\heap\hio.c
/*
输入:
    relation-数据表
    len-需要的空间大小
    otherBuffer-用于update场景,上一次pinned的buffer
    options-处理选项
    bistate-BulkInsert标记
    vmbuffer-第1个vm(visibilitymap)
    vmbuffer_other-用于update场景,上一次pinned的buffer对应的vm(visibilitymap)
注意:
    Update时,不是原地更新,而是原数据保留(更新xmax),新数据插入
    原数据&新数据如果在不同Block中,锁定Block的时候可能会出现Deadlock
    举个例子:Session A更新表T的第一行,第一行在Block 0中,新数据存储在Block 2中
    Session B更新表T的第二行,第二行在Block 0中,新数据存储在Block 2中
    Block 0/2均要锁定才能完整实现Update操作:
    如果Session A先锁定了Block 2,Session B先锁定了Block 0,
    然后Session A尝试锁定Block 0,Session B尝试锁定Block 2,这时候就会出现死锁
    为了避免这种情况,PG规定锁定时,同一个Relation,按Block的编号顺序锁定,
    如需要锁定0和2,那必须先锁定Block 0,再锁定2
输出:
    为Tuple分配的Buffer
*/
Buffer
RelationGetBufferForTuple(Relation relation, Size len,
                          Buffer otherBuffer, int options,
                          BulkInsertState bistate,
                          Buffer *vmbuffer, Buffer *vmbuffer_other)
{
    bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
    Buffer      buffer = InvalidBuffer;
    Page        page;
    Size        pageFreeSpace = 0,
                saveFreeSpace = 0;
    BlockNumber targetBlock,
                otherBlock;
    bool        needLock;

    len = MAXALIGN(len);        /* 保守一些,对齐长度 */

    /* 如果是更新操作(otherBuffer非零),bistate必须为0 */
    Assert(otherBuffer == InvalidBuffer || !bistate);

    /*
     * 待插入元组的大小不能超过元组的最大大小MaxHeapTupleSize
     */
    if (len > MaxHeapTupleSize)
        ereport(ERROR,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("row is too big: size %zu, maximum size %zu",
                        len, MaxHeapTupleSize)));

    /* 根据fillfactor计算还需的额外的空闲空间的大小(预留空间) */
    saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
                                                   HEAP_DEFAULT_FILLFACTOR);

    /*update操作, 获取上次pinned buffer对应的Block*/
    if (otherBuffer != InvalidBuffer)
        otherBlock = BufferGetBlockNumber(otherBuffer);
    else
        otherBlock = InvalidBlockNumber;    /* just to keep compiler quiet */

    /*
     * 我们首先尝试在上次插入元组所属的文件块上继续插入当前元组,
     * 待插入的目标块号可从缓存的bistate或relation中获取.  
     * 如果行不通,则尝试根据FSM机制,从FSM堆中查找目标块号。
     * 由于FSM中的信息有可能是过期的,我们需要
     * 循环并尝试多次.
     * 如果FSM也无法获取目标块号,尝试扩展表.
     *
     * 若指定不使用use_fsm, 我们要么将元组查到已存在的文件块中
     * 要么扩展表.
     */
    if (len + saveFreeSpace > MaxHeapTupleSize)
    {
        /* 若元组长度+预留空间 > 允许的最大元组长度,就无需再去尝试听过FSM堆获取目标块号 */
        targetBlock = InvalidBlockNumber;
        use_fsm = false;
    }
        // 如果属于批量插入,从bistate中获取目标块号
    else if (bistate && bistate->current_buf != InvalidBuffer)
        targetBlock = BufferGetBlockNumber(bistate->current_buf);
    else  // 如不属于批量插入,普通Insert模式,从缓存的关系结构中获取目标块号
        targetBlock = RelationGetTargetBlock(relation);

        // 如果无法从缓冲的数据结构中获取目标块号,且使用FSM机制,尝试从FSM堆中获取
    if (targetBlock == InvalidBlockNumber && use_fsm)
    {
        targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);

        /*
         * 如果FSM机制也无法获取目标块号,尝试最后一个数据块并
         * 进行扩展.  
         */
        if (targetBlock == InvalidBlockNumber)
        {
            BlockNumber nblocks = RelationGetNumberOfBlocks(relation);

            if (nblocks > 0)
                targetBlock = nblocks - 1;
        }
    }

loop:
    while (targetBlock != InvalidBlockNumber)
    {
        /*
         * Read and exclusive-lock the target block, as well as the other
         * block if one was given, taking suitable care with lock ordering and
         * the possibility they are the same block.
         *
         * If the page-level all-visible flag is set, caller will need to
         * clear both that and the corresponding visibility map bit.  However,
         * by the time we return, we'll have x-locked the buffer, and we don't
         * want to do any I/O while in that state.  So we check the bit here
         * before taking the lock, and pin the page if it appears necessary.
         * Checking without the lock creates a risk of getting the wrong
         * answer, so we'll have to recheck after acquiring the lock.
         */
        if (otherBuffer == InvalidBuffer)   // 非update操作
        {
            /* easy case */
            buffer = ReadBufferBI(relation, targetBlock, bistate); //获取buffer
                        //如果表块中所有元组都对当前事务可见时, 那么把Page Pin在内存中(Pin的意思是固定/保留),以备设置VM块中的对应bit位
            if (PageIsAllVisible(BufferGetPage(buffer)))
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);  //锁定buffer
        }
        else if (otherBlock == targetBlock)   //Update操作,新记录跟原记录在同一个Block中
        {
            /* also easy case */
            buffer = otherBuffer;
            if (PageIsAllVisible(BufferGetPage(buffer)))
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
        }
        else if (otherBlock < targetBlock) //Update操作,原记录所在的Block < 新记录的Block
        {
            /* lock other buffer first */
            buffer = ReadBuffer(relation, targetBlock);
            if (PageIsAllVisible(BufferGetPage(buffer)))
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);  //优先锁定blockNumber小的那个buffer
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
        }
        else //Update操作,原记录所在的Block > 新记录的Block
        {
            /* lock target buffer first */
            buffer = ReadBuffer(relation, targetBlock);
            if (PageIsAllVisible(BufferGetPage(buffer)))
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); //优先锁定BlockNumber小的那个
            LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
        }

        /*
         * We now have the target page (and the other buffer, if any) pinned
         * and locked.  However, since our initial PageIsAllVisible checks
         * were performed before acquiring the lock, the results might now be
         * out of date, either for the selected victim buffer, or for the
         * other buffer passed by the caller.  In that case, we'll need to
         * give up our locks, go get the pin(s) we failed to get earlier, and
         * re-lock.  That's pretty painful, but hopefully shouldn't happen
         * often.
         *
         * Note that there's a small possibility that we didn't pin the page
         * above but still have the correct page pinned anyway, either because
         * we've already made a previous pass through this loop, or because
         * caller passed us the right page anyway.
         *
         * Note also that it's possible that by the time we get the pin and
         * retake the buffer locks, the visibility map bit will have been
         * cleared by some other backend anyway.  In that case, we'll have
         * done a bit of extra work for no gain, but there's no real harm
         * done.
         */
        if (otherBuffer == InvalidBuffer || targetBlock <= otherBlock)
            GetVisibilityMapPins(relation, buffer, otherBuffer,
                                 targetBlock, otherBlock, vmbuffer,
                                 vmbuffer_other);
        else
            GetVisibilityMapPins(relation, otherBuffer, buffer,
                                 otherBlock, targetBlock, vmbuffer_other,
                                 vmbuffer);

    
        page = BufferGetPage(buffer);
        pageFreeSpace = PageGetHeapFreeSpace(page);
        if (len + saveFreeSpace <= pageFreeSpace)   //有足够的空间存储数据,返回此Buffer
        {
            /* use this page as future insert target, too */
            RelationSetTargetBlock(relation, targetBlock);    
            return buffer;
        }

        /*
         * 没有足够的空间,必须放弃文件块锁
         */
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);  //解除对buffer的锁定
        if (otherBuffer == InvalidBuffer)  //如果非update,释放buffer
            ReleaseBuffer(buffer);
        else if (otherBlock != targetBlock)  //如果是update,目标块号和上次锁定的块号不等, 解除上次锁定的buffer, 释放当前buffer
        {
            LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
            ReleaseBuffer(buffer);
        }

        /* Without FSM, always fall out of the loop and extend */
        if (!use_fsm)  //不使用FSM定位空闲空间,跳出循环,执行扩展
            break;

        //使用FSM获取下一个备选的Block
               //注意:如果全部扫描后发现没有满足条件的Block,targetBlock = InvalidBlockNumber,跳出循环
        targetBlock = RecordAndGetPageWithFreeSpace(relation,
                                                    targetBlock,
                                                    pageFreeSpace,
                                                    len + saveFreeSpace);
    }

    /*
     * 扩展表. 新创建的或临时表不需要加锁,其他情况需要加锁
     *
     * We have to use a lock to ensure no one else is extending the rel at the
     * same time, else we will both try to initialize the same new page.  We
     * can skip locking for new or temp relations, however, since no one else
     * could be accessing them.
     */
    needLock = !RELATION_IS_LOCAL(relation);   //判断当前关系是否需要加锁

    /*
     * If we need the lock but are not able to acquire it immediately, we'll
     * consider extending the relation by multiple blocks at a time to manage
     * contention on the relation extension lock.  However, this only makes
     * sense if we're using the FSM; otherwise, there's no point.
     */
    if (needLock)
    {
        if (!use_fsm)
            LockRelationForExtension(relation, ExclusiveLock);
        else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
        {
            /* Couldn't get the lock immediately; wait for it. */
            LockRelationForExtension(relation, ExclusiveLock);

            /*
             *如有其它进程扩展了数据表,那么可以成功获取满足条件的targetBlock
             */
            targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);

            /*
             * If some other waiter has already extended the relation, we
             * don't need to do so; just use the existing freespace.
             */
            if (targetBlock != InvalidBlockNumber)
            {
                UnlockRelationForExtension(relation, ExclusiveLock);
                goto loop;
            }

            /* Time to bulk-extend. */
            RelationAddExtraBlocks(relation, bistate);
        }
    }

    /*
     * In addition to whatever extension we performed above, we always add at
     * least one block to satisfy our own request.
     *
     * XXX This does an lseek - rather expensive - but at the moment it is the
     * only way to accurately determine how many blocks are in a relation.  Is
     * it worth keeping an accurate file length in shared memory someplace,
     * rather than relying on the kernel to do it for us?
     */
    buffer = ReadBufferBI(relation, P_NEW, bistate);

    /*
     * We can be certain that locking the otherBuffer first is OK, since it
     * must have a lower page number.
     */
    if (otherBuffer != InvalidBuffer)
        LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);

    /*
     * Now acquire lock on the new page.
     */
    LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

    /*
     * Release the file-extension lock; it's now OK for someone else to extend
     * the relation some more.  Note that we cannot release this lock before
     * we have buffer lock on the new page, or we risk a race condition
     * against vacuumlazy.c --- see comments therein.
     */
    if (needLock)
        UnlockRelationForExtension(relation, ExclusiveLock);

    /*
     * We need to initialize the empty new page.  Double-check that it really
     * is empty (this should never happen, but if it does we don't want to
     * risk wiping out valid data).
     */
    page = BufferGetPage(buffer);

    if (!PageIsNew(page))
        elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
             BufferGetBlockNumber(buffer),
             RelationGetRelationName(relation));

    PageInit(page, BufferGetPageSize(buffer), 0);

    if (len > PageGetHeapFreeSpace(page))
    {
        /* We should not get here given the test at the top */
        elog(PANIC, "tuple is too big: size %zu", len);
    }

    /*
     * Remember the new page as our target for future insertions.
     *
     * XXX should we enter the new page into the free space map immediately,
     * or just keep it for this backend's exclusive use in the short run
     * (until VACUUM sees it)?  Seems to depend on whether you expect the
     * current backend to make more insertions or not, which is probably a
     * good bet most of the time.  So for now, don't add it to FSM yet.
     */
    RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));

    return buffer;
}

  1. CheckForSerializableConflictIn
/*
 * CheckForSerializableConflictIn
 *      We are writing the given tuple.  If that indicates a rw-conflict
 *      in from another serializable transaction, take appropriate action.
 *
 * Skip checking for any granularity for which a parameter is missing.
 *
 * A tuple update or delete is in conflict if we have a predicate lock
 * against the relation or page in which the tuple exists, or against the
 * tuple itself.
 */
void
CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
                               Buffer buffer)
{
    PREDICATELOCKTARGETTAG targettag;

    if (!SerializationNeededForWrite(relation))
        return;

    /* Check if someone else has already decided that we need to die */
    if (SxactIsDoomed(MySerializableXact))
        ereport(ERROR,
                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                 errmsg("could not serialize access due to read/write dependencies among transactions"),
                 errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
                 errhint("The transaction might succeed if retried.")));

    /*
     * We're doing a write which might cause rw-conflicts now or later.
     * Memorize that fact.
     */
    MyXactDidWrite = true;

    /*
     * It is important that we check for locks from the finest granularity to
     * the coarsest granularity, so that granularity promotion doesn't cause
     * us to miss a lock.  The new (coarser) lock will be acquired before the
     * old (finer) locks are released.
     *
     * It is not possible to take and hold a lock across the checks for all
     * granularities because each target could be in a separate partition.
     */
    if (tuple != NULL)
    {
        SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
                                         relation->rd_node.dbNode,
                                         relation->rd_id,
                                         ItemPointerGetBlockNumber(&(tuple->t_self)),
                                         ItemPointerGetOffsetNumber(&(tuple->t_self)));
        CheckTargetForConflictsIn(&targettag);
    }

    if (BufferIsValid(buffer))
    {
        SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
                                        relation->rd_node.dbNode,
                                        relation->rd_id,
                                        BufferGetBlockNumber(buffer));
        CheckTargetForConflictsIn(&targettag);
    }

    SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
                                        relation->rd_node.dbNode,
                                        relation->rd_id);
    CheckTargetForConflictsIn(&targettag);
}
  1. START_CRIT_SECTION
volatile uint32 CritSectionCount = 0;
#define START_CRIT_SECTION()  (CritSectionCount++)
  1. PageIsAllVisible
#define PageIsAllVisible(page) \
    (((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
  1. PageClearAllVisible
#define PageClearAllVisible(page) \
    (((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
  1. visibilitymap_clear
/*
 *  visibilitymap_clear - 清除VM块中某文件块的指定的比特位
 *
 * You must pass a buffer containing the correct map page to this function.
 * Call visibilitymap_pin first to pin the right one. This function doesn't do
 * any I/O.  Returns true if any bits have been cleared and false otherwise.
 */
bool
visibilitymap_clear(Relation rel, BlockNumber heapBlk, Buffer buf, uint8 flags)
{
    BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
    int         mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
    int         mapOffset = HEAPBLK_TO_OFFSET(heapBlk);
    uint8       mask = flags << mapOffset;
    char       *map;
    bool        cleared = false;

    Assert(flags & VISIBILITYMAP_VALID_BITS);

#ifdef TRACE_VISIBILITYMAP
    elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
#endif

    if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
        elog(ERROR, "wrong buffer passed to visibilitymap_clear");

    LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
    map = PageGetContents(BufferGetPage(buf));

    if (map[mapByte] & mask)
    {
        map[mapByte] &= ~mask;

        MarkBufferDirty(buf);
        cleared = true;
    }

    LockBuffer(buf, BUFFER_LOCK_UNLOCK);

    return cleared;
}

  1. MarkBufferDirty
/*
 * MarkBufferDirty
 *
 *      Marks buffer contents as dirty (actual write happens later).
 *
 * Buffer must be pinned and exclusive-locked.  (If caller does not hold
 * exclusive lock, then somebody could be in process of writing the buffer,
 * leading to risk of bad data written to disk.)
 */
void
MarkBufferDirty(Buffer buffer)
{
    BufferDesc *bufHdr;
    uint32      buf_state;
    uint32      old_buf_state;

    if (!BufferIsValid(buffer))
        elog(ERROR, "bad buffer ID: %d", buffer);

    if (BufferIsLocal(buffer))
    {
        MarkLocalBufferDirty(buffer);
        return;
    }

    bufHdr = GetBufferDescriptor(buffer - 1);

    Assert(BufferIsPinned(buffer));
    Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
                                LW_EXCLUSIVE));

    old_buf_state = pg_atomic_read_u32(&bufHdr->state);
    for (;;)
    {
        if (old_buf_state & BM_LOCKED)
            old_buf_state = WaitBufHdrUnlocked(bufHdr);

        buf_state = old_buf_state;

        Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
        buf_state |= BM_DIRTY | BM_JUST_DIRTIED;

        if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
                                           buf_state))
            break;
    }

    /*
     * If the buffer was not dirty already, do vacuum accounting.
     */
    if (!(old_buf_state & BM_DIRTY))
    {
        VacuumPageDirty++;
        pgBufferUsage.shared_blks_dirtied++;
        if (VacuumCostActive)
            VacuumCostBalance += VacuumCostPageDirty;
    }
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容