0. 前言
本文是对作者EthanHe的PostgreSQL源码解读系列文章(https://www.jianshu.com/u/6b8fc3f18f72)的读后总结,稍加了一些自我理解和自身的测试,并非全部原创,仅用于加深印象和方便后续查阅。
“读《PostgreSQL源码解读》之插入行数据” 系列文章共计12篇,主要分析了PG插入行数据部分的源码,同时结合gdb跟踪源码进行测试和辅佐分析。本文主要研究的是PG插入一行数据过程的方法(heap_insert)的源码逻辑。
本系列文章和后续研究PG源码的研究方法如下【摘自EthanHe的博文】:
通过psql从插入一行数据的最小方法/函数(PageAddItemExtended)为出发点,深入理解该函数后,使用gdb跟踪该函数的调用栈,根据调用栈的函数信息逐步上溯到最顶层的调用入口函数或主函数,每上溯一层就把该层函数相关的数据结构、宏定义和依赖的子函数完全彻底的理解清楚。通过这么一个过程,把插入数据相关联的知识体系建立起来,比如Page存储结构、Buffer的管理、WAL日志相关管理、SQL解析执行、前后台接口等相关知识。有了这个脉络,有了相关的数据结构作为基础,再来理解其他操作,比如UPDATE/DELETE等DML、CREATE TABLE/ALTER TABLE等DDL语句、SELECT等查询语句等就相对容易很多。
由于PG源码十分浩瀚,与其自顶而下,陷入巨大的迷茫,不如采用EthanHe这种方法,自下而上,逐步溯源。弄懂一条分支,自然会对其他分支触类旁通。不失为一种很好的研究方法。
1. 数据结构、宏和函数声明
\src\include\c.h
typedef uint32 CommandId;
typedef uint32 TransactionId;
\src\backend\access\transam\xact.c
/*
* GetCurrentTransactionId函数
*
* 返回当前事务id
*/
TransactionId
GetCurrentTransactionId(void)
{
TransactionState s = CurrentTransactionState;
if (!TransactionIdIsValid(s->transactionId))
AssignTransactionId(s);
return s->transactionId;
}
\src\include\access\heapam.h
/* heap_insert函数的"options" 标志位取值 */
#define HEAP_INSERT_SKIP_WAL 0x0001
#define HEAP_INSERT_SKIP_FSM 0x0002
#define HEAP_INSERT_FROZEN 0x0004
#define HEAP_INSERT_SPECULATIVE 0x0008
#define HEAP_INSERT_NO_LOGICAL 0x0010
typedef struct BulkInsertStateData *BulkInsertState;
\src\include\access\hio.h
/*
* 批量插入的状态 --- heapam.c and hio.c私有
*
* 如果current_buf 有效, 我们即锁定了此buffer
*
* "typedef struct BulkInsertStateData *BulkInsertState" is in heapam.h
*/
typedef struct BulkInsertStateData
{
BufferAccessStrategy strategy; /* our BULKWRITE strategy object */
Buffer current_buf; /* current insertion target page */
} BulkInsertStateData;
\src\include\access\htup_details.h
元组的最大长度,包括了元组头和因对齐长度而补齐的零
#define MaxHeapTupleSize (BLCKSZ - MAXALIGN(SizeOfPageHeaderData + sizeof(ItemIdData)))
获取元组的OID
#define HeapTupleGetOid(tuple) \
HeapTupleHeaderGetOid((tuple)->t_data)
设置元组的OID
#define HeapTupleSetOid(tuple, oid) \
HeapTupleHeaderSetOid((tuple)->t_data, (oid))
从元组头中获取元组的OID
#define HeapTupleHeaderGetOid(tup) \
( \
((tup)->t_infomask & HEAP_HASOID) ? \
*((Oid *) ((char *)(tup) + (tup)->t_hoff - sizeof(Oid))) \
: \
InvalidOid \
)
设置元组头的OID
#define HeapTupleHeaderSetOid(tup, oid) \
do { \
Assert((tup)->t_infomask & HEAP_HASOID); \
*((Oid *) ((char *)(tup) + (tup)->t_hoff - sizeof(Oid))) = (oid); \
} while (0)
\src\include\utils\rel.h
/*
* RelationGetTargetBlock
* 获取缓存的relation中的当前插入目标块号.
*/
#define RelationGetTargetBlock(relation) \
( (relation)->rd_smgr != NULL ? (relation)->rd_smgr->smgr_targblock : InvalidBlockNumber )
\src\backend\storage\freespace\freespace.c
/*
* GetPageWithFreeSpace - 获取具有不低于指定空闲空间大小的
* 文件块的块号.
*/
BlockNumber
GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
{
uint8 min_cat = fsm_space_needed_to_cat(spaceNeeded);
return fsm_search(rel, min_cat);
}
/*
* 返回指定的空闲空闲大小所处的级别(取值范围0 - 255)
*/
static uint8
fsm_space_needed_to_cat(Size needed)
{
int cat;
/* 指定的空闲空间级别不能大于最大值255 */
if (needed > MaxFSMRequestSize)
elog(ERROR, "invalid FSM request size %zu", needed);
if (needed == 0)
return 1;
/*FSM级别的计算方法,其中FSM_CAT_STEP等于32*/
cat = (needed + FSM_CAT_STEP - 1) / FSM_CAT_STEP;
if (cat > 255)
cat = 255;
return (uint8) cat;
}
/*
* 对FSM 块构成的三层大根堆进行搜索,查找空闲空间级别不低于min_cat的FSM块所对应的文件块块号BlockNumber.
*/
static BlockNumber
fsm_search(Relation rel, uint8 min_cat)
{
int restarts = 0;
FSMAddress addr = FSM_ROOT_ADDRESS;
for (;;)
{
int slot;
Buffer buf;
uint8 max_avail = 0;
/* Read the FSM page. */
buf = fsm_readbuf(rel, addr, false);
/* Search within the page */
if (BufferIsValid(buf))
{
LockBuffer(buf, BUFFER_LOCK_SHARE);
slot = fsm_search_avail(buf, min_cat,
(addr.level == FSM_BOTTOM_LEVEL),
false);
if (slot == -1)
max_avail = fsm_get_max_avail(BufferGetPage(buf));
UnlockReleaseBuffer(buf);
}
else
slot = -1;
if (slot != -1)
{
if (addr.level == FSM_BOTTOM_LEVEL)
return fsm_get_heap_blk(addr, slot);
addr = fsm_get_child(addr, slot);
}
else if (addr.level == FSM_ROOT_LEVEL)
{
return InvalidBlockNumber;
}
else
{
uint16 parentslot;
FSMAddress parent;
parent = fsm_get_parent(addr, &parentslot);
fsm_set_and_search(rel, parent, parentslot, max_avail, 0);
if (restarts++ > 10000)
return InvalidBlockNumber;
addr = FSM_ROOT_ADDRESS;
}
}
}
\src\backend\access\heap\hio.c
/*
* 读取从relation中读取buffer,如果bistate不为空,根据bistate获取.
*/
static Buffer
ReadBufferBI(Relation relation, BlockNumber targetBlock,
BulkInsertState bistate)
{
Buffer buffer;
/* If not bulk-insert, exactly like ReadBuffer */
if (!bistate)
return ReadBuffer(relation, targetBlock);
/* If we have the desired block already pinned, re-pin and return it */
if (bistate->current_buf != InvalidBuffer)
{
if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
{
IncrBufferRefCount(bistate->current_buf);
return bistate->current_buf;
}
/* ... else drop the old buffer */
ReleaseBuffer(bistate->current_buf);
bistate->current_buf = InvalidBuffer;
}
/* Perform a read using the buffer strategy */
buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
RBM_NORMAL, bistate->strategy);
/* Save the selected block as target for future inserts */
IncrBufferRefCount(buffer);
bistate->current_buf = buffer;
return buffer;
}
\src\include\access\visibilitymap.h
/* 一个文件块所需比特位数 */
#define BITS_PER_HEAPBLOCK 2
\src\backend\access\heap\visibilitymap.c
/*#define TRACE_VISIBILITYMAP */
/*
* Size of the bitmap on each visibility map page, in bytes. There's no
* extra headers, so the whole page minus the standard page header is
* used for the bitmap.
*/
#define MAPSIZE (BLCKSZ - MAXALIGN(SizeOfPageHeaderData))
/* 一个字节可以表示的文件块数量 */
#define HEAPBLOCKS_PER_BYTE (BITS_PER_BYTE / BITS_PER_HEAPBLOCK)
/* 一个VM块可以表示的文件块的数量. */
#define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)
/* 从堆块号到VM块中的bit位的映射 */
#define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
3.源码分析
- heap_insert
/*
* heap_insert - insert tuple into a heap
*
* The new tuple is stamped with current transaction ID and the specified
* command ID.
*
* If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
* logged in WAL, even for a non-temp relation. Safe usage of this behavior
* requires that we arrange that all new tuples go into new pages not
* containing any tuples from other transactions, and that the relation gets
* fsync'd before commit. (See also heap_sync() comments)
*
* The HEAP_INSERT_SKIP_FSM option is passed directly to
* RelationGetBufferForTuple, which see for more info.
*
* HEAP_INSERT_FROZEN should only be specified for inserts into
* relfilenodes created during the current subtransaction and when
* there are no prior snapshots or pre-existing portals open.
* This causes rows to be frozen, which is an MVCC violation and
* requires explicit options chosen by user.
*
* HEAP_INSERT_IS_SPECULATIVE is used on so-called "speculative insertions",
* which can be backed out afterwards without aborting the whole transaction.
* Other sessions can wait for the speculative insertion to be confirmed,
* turning it into a regular tuple, or aborted, as if it never existed.
* Speculatively inserted tuples behave as "value locks" of short duration,
* used to implement INSERT .. ON CONFLICT.
*
* HEAP_INSERT_NO_LOGICAL force-disables the emitting of logical decoding
* information for the tuple. This should solely be used during table rewrites
* where RelationIsLogicallyLogged(relation) is not yet accurate for the new
* relation.
*
* Note that most of these options will be applied when inserting into the
* heap's TOAST table, too, if the tuple requires any out-of-line data. Only
* HEAP_INSERT_IS_SPECULATIVE is explicitly ignored, as the toast data does
* not partake in speculative insertion.
*
* The BulkInsertState object (if any; bistate can be NULL for default
* behavior) is also just passed through to RelationGetBufferForTuple.
*
* The return value is the OID assigned to the tuple (either here or by the
* caller), or InvalidOid if no OID. The header fields of *tup are updated
* to match the stored tuple; in particular tup->t_self receives the actual
* TID where the tuple was stored. But note that any toasting of fields
* within the tuple data is NOT reflected into *tup.
*/
/*
输入:
relation-数据表结构体
tup-Heap Tuple数据(包括头部数据等),亦即数据行
cid-命令ID(顺序)
options-选项
bistate-BulkInsert状态
输出:
Oid-数据表Oid
*/
Oid
heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate)
{
TransactionId xid = GetCurrentTransactionId();//事务id
HeapTuple heaptup;//Heap Tuple数据,亦即数据行
Buffer buffer;//数据缓存块
Buffer vmbuffer = InvalidBuffer;//vm缓冲块
bool all_visible_cleared = false;//标记
/*
* Fill in tuple header fields, assign an OID, and toast the tuple if
* necessary.
*
* Note: below this point, heaptup is the data we actually intend to store
* into the relation; tup is the caller's original untoasted data.
*/
//插入前准备工作,比如设置t_infomask标记等
heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
/*
* Find buffer to insert this tuple into. If the page is all visible,
* this will also pin the requisite visibility map page.
*/
//获取相应的buffer,详见上面的子函数解析
buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
InvalidBuffer, options, bistate,
&vmbuffer, NULL);
/*
* We're about to do the actual insert -- but check for conflict first, to
* avoid possibly having to roll back work we've just done.
*
* This is safe without a recheck as long as there is no possibility of
* another process scanning the page between this check and the insert
* being visible to the scan (i.e., an exclusive buffer content lock is
* continuously held from this point until the tuple insert is visible).
*
* For a heap insert, we only need to check for table-level SSI locks. Our
* new tuple can't possibly conflict with existing tuple locks, and heap
* page locks are only consolidated versions of tuple locks; they do not
* lock "gaps" as index page locks do. So we don't need to specify a
* buffer when making the call, which makes for a faster check.
*/
//检查序列化是否冲突
CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
/* NO EREPORT(ERROR) from here till changes are logged */
//开始,变量+1
START_CRIT_SECTION();
//插入数据(详见上一节对该函数的解析)
RelationPutHeapTuple(relation, buffer, heaptup,
(options & HEAP_INSERT_SPECULATIVE) != 0);
//如Page is All Visible
if (PageIsAllVisible(BufferGetPage(buffer)))
{
//复位
all_visible_cleared = true;
PageClearAllVisible(BufferGetPage(buffer));
visibilitymap_clear(relation,
ItemPointerGetBlockNumber(&(heaptup->t_self)),
vmbuffer, VISIBILITYMAP_VALID_BITS);
}
/*
* XXX Should we set PageSetPrunable on this page ?
*
* The inserting transaction may eventually abort thus making this tuple
* DEAD and hence available for pruning. Though we don't want to optimize
* for aborts, if no other tuple in this page is UPDATEd/DELETEd, the
* aborted tuple will never be pruned until next vacuum is triggered.
*
* If you do add PageSetPrunable here, add it in heap_xlog_insert too.
*/
//设置缓冲块为脏块
MarkBufferDirty(buffer);
/* XLOG stuff */
//记录日志
if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
{
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
int bufflags = 0;
/*
* If this is a catalog, we need to transmit combocids to properly
* decode, so log that as well.
*/
if (RelationIsAccessibleInLogicalDecoding(relation))
log_heap_new_cid(relation, heaptup);
/*
* If this is the single and first tuple on page, we can reinit the
* page instead of restoring the whole thing. Set flag, and hide
* buffer references from XLogInsert.
*/
if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
bufflags |= REGBUF_WILL_INIT;
}
xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
xlrec.flags = 0;
if (all_visible_cleared)
xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
if (options & HEAP_INSERT_SPECULATIVE)
xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
/*
* For logical decoding, we need the tuple even if we're doing a full
* page write, so make sure it's included even if we take a full-page
* image. (XXX We could alternatively store a pointer into the FPW).
*/
if (RelationIsLogicallyLogged(relation))
{
xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
bufflags |= REGBUF_KEEP_DATA;
}
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
xlhdr.t_infomask = heaptup->t_data->t_infomask;
xlhdr.t_hoff = heaptup->t_data->t_hoff;
/*
* note we mark xlhdr as belonging to buffer; if XLogInsert decides to
* write the whole page to the xlog, we don't need to store
* xl_heap_header in the xlog.
*/
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
XLogRegisterBufData(0,
(char *) heaptup->t_data + SizeofHeapTupleHeader,
heaptup->t_len - SizeofHeapTupleHeader);
/* filtering by origin on a row level is much more efficient */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
recptr = XLogInsert(RM_HEAP_ID, info);
PageSetLSN(page, recptr);
}
//完成!
END_CRIT_SECTION();
//解锁Buffer,包括vm buffer
UnlockReleaseBuffer(buffer);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
/*
* If tuple is cachable, mark it for invalidation from the caches in case
* we abort. Note it is OK to do this after releasing the buffer, because
* the heaptup data structure is all in local memory, not in the shared
* buffer.
*/
//缓存操作后变“无效”的Tuple
CacheInvalidateHeapTuple(relation, heaptup, NULL);
/* Note: speculative insertions are counted too, even if aborted later */
//更新统计信息
pgstat_count_heap_insert(relation, 1);
/*
* If heaptup is a private copy, release it. Don't forget to copy t_self
* back to the caller's image, too.
*/
if (heaptup != tup)
{
tup->t_self = heaptup->t_self;
heap_freetuple(heaptup);
}
return HeapTupleGetOid(tup);
}
- heap_prepare_insert
/*
* heap_insert()的子函数. 插入元组前做一些准备工作. 这将设置
* 元组头, 为元组分配OID, 如果需要的话还会使用TOAST技术存储超长字段.
* 返回值是原始的或使用TOAST技术处理过的元组HeapTuple。
* 注意,无论如何, 元组的头部信息(tup->t_data)都会得到更新。
*/
static HeapTuple
heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
CommandId cid, int options)
{
/*
* 暂不支持并行模式
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot insert tuples during a parallel operation")));
if (relation->rd_rel->relhasoids)
{
#ifdef NOT_USED
/* this is redundant with an Assert in HeapTupleSetOid */
Assert(tup->t_data->t_infomask & HEAP_HASOID);
#endif
/*
* If the object id of this tuple has already been assigned, trust the
* caller. There are a couple of ways this can happen. At initial db
* creation, the backend program sets oids for tuples. When we define
* an index, we set the oid. Finally, in the future, we may allow
* users to set their own object ids in order to support a persistent
* object store (objects need to contain pointers to one another).
*/
if (!OidIsValid(HeapTupleGetOid(tup)))
HeapTupleSetOid(tup, GetNewOid(relation));
}
else
{
/* check there is not space for an OID */
Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
}
tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
HeapTupleHeaderSetXmin(tup->t_data, xid);
if (options & HEAP_INSERT_FROZEN)
HeapTupleHeaderSetXminFrozen(tup->t_data);
HeapTupleHeaderSetCmin(tup->t_data, cid);
HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
tup->t_tableOid = RelationGetRelid(relation);
/*
* If the new tuple is too big for storage or contains already toasted
* out-of-line attributes from some other relation, invoke the toaster.
*/
if (relation->rd_rel->relkind != RELKIND_RELATION &&
relation->rd_rel->relkind != RELKIND_MATVIEW)
{
/* toast table entries should never be recursively toasted */
Assert(!HeapTupleHasExternal(tup));
return tup;
}
else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
return toast_insert_or_update(relation, tup, NULL, options);
else
return tup;
}
- RelationGetBufferForTuple
\src\backend\access\heap\hio.c
/*
输入:
relation-数据表
len-需要的空间大小
otherBuffer-用于update场景,上一次pinned的buffer
options-处理选项
bistate-BulkInsert标记
vmbuffer-第1个vm(visibilitymap)
vmbuffer_other-用于update场景,上一次pinned的buffer对应的vm(visibilitymap)
注意:
Update时,不是原地更新,而是原数据保留(更新xmax),新数据插入
原数据&新数据如果在不同Block中,锁定Block的时候可能会出现Deadlock
举个例子:Session A更新表T的第一行,第一行在Block 0中,新数据存储在Block 2中
Session B更新表T的第二行,第二行在Block 0中,新数据存储在Block 2中
Block 0/2均要锁定才能完整实现Update操作:
如果Session A先锁定了Block 2,Session B先锁定了Block 0,
然后Session A尝试锁定Block 0,Session B尝试锁定Block 2,这时候就会出现死锁
为了避免这种情况,PG规定锁定时,同一个Relation,按Block的编号顺序锁定,
如需要锁定0和2,那必须先锁定Block 0,再锁定2
输出:
为Tuple分配的Buffer
*/
Buffer
RelationGetBufferForTuple(Relation relation, Size len,
Buffer otherBuffer, int options,
BulkInsertState bistate,
Buffer *vmbuffer, Buffer *vmbuffer_other)
{
bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
Buffer buffer = InvalidBuffer;
Page page;
Size pageFreeSpace = 0,
saveFreeSpace = 0;
BlockNumber targetBlock,
otherBlock;
bool needLock;
len = MAXALIGN(len); /* 保守一些,对齐长度 */
/* 如果是更新操作(otherBuffer非零),bistate必须为0 */
Assert(otherBuffer == InvalidBuffer || !bistate);
/*
* 待插入元组的大小不能超过元组的最大大小MaxHeapTupleSize
*/
if (len > MaxHeapTupleSize)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("row is too big: size %zu, maximum size %zu",
len, MaxHeapTupleSize)));
/* 根据fillfactor计算还需的额外的空闲空间的大小(预留空间) */
saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
HEAP_DEFAULT_FILLFACTOR);
/*update操作, 获取上次pinned buffer对应的Block*/
if (otherBuffer != InvalidBuffer)
otherBlock = BufferGetBlockNumber(otherBuffer);
else
otherBlock = InvalidBlockNumber; /* just to keep compiler quiet */
/*
* 我们首先尝试在上次插入元组所属的文件块上继续插入当前元组,
* 待插入的目标块号可从缓存的bistate或relation中获取.
* 如果行不通,则尝试根据FSM机制,从FSM堆中查找目标块号。
* 由于FSM中的信息有可能是过期的,我们需要
* 循环并尝试多次.
* 如果FSM也无法获取目标块号,尝试扩展表.
*
* 若指定不使用use_fsm, 我们要么将元组查到已存在的文件块中
* 要么扩展表.
*/
if (len + saveFreeSpace > MaxHeapTupleSize)
{
/* 若元组长度+预留空间 > 允许的最大元组长度,就无需再去尝试听过FSM堆获取目标块号 */
targetBlock = InvalidBlockNumber;
use_fsm = false;
}
// 如果属于批量插入,从bistate中获取目标块号
else if (bistate && bistate->current_buf != InvalidBuffer)
targetBlock = BufferGetBlockNumber(bistate->current_buf);
else // 如不属于批量插入,普通Insert模式,从缓存的关系结构中获取目标块号
targetBlock = RelationGetTargetBlock(relation);
// 如果无法从缓冲的数据结构中获取目标块号,且使用FSM机制,尝试从FSM堆中获取
if (targetBlock == InvalidBlockNumber && use_fsm)
{
targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
/*
* 如果FSM机制也无法获取目标块号,尝试最后一个数据块并
* 进行扩展.
*/
if (targetBlock == InvalidBlockNumber)
{
BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
if (nblocks > 0)
targetBlock = nblocks - 1;
}
}
loop:
while (targetBlock != InvalidBlockNumber)
{
/*
* Read and exclusive-lock the target block, as well as the other
* block if one was given, taking suitable care with lock ordering and
* the possibility they are the same block.
*
* If the page-level all-visible flag is set, caller will need to
* clear both that and the corresponding visibility map bit. However,
* by the time we return, we'll have x-locked the buffer, and we don't
* want to do any I/O while in that state. So we check the bit here
* before taking the lock, and pin the page if it appears necessary.
* Checking without the lock creates a risk of getting the wrong
* answer, so we'll have to recheck after acquiring the lock.
*/
if (otherBuffer == InvalidBuffer) // 非update操作
{
/* easy case */
buffer = ReadBufferBI(relation, targetBlock, bistate); //获取buffer
//如果表块中所有元组都对当前事务可见时, 那么把Page Pin在内存中(Pin的意思是固定/保留),以备设置VM块中的对应bit位
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); //锁定buffer
}
else if (otherBlock == targetBlock) //Update操作,新记录跟原记录在同一个Block中
{
/* also easy case */
buffer = otherBuffer;
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (otherBlock < targetBlock) //Update操作,原记录所在的Block < 新记录的Block
{
/* lock other buffer first */
buffer = ReadBuffer(relation, targetBlock);
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); //优先锁定blockNumber小的那个buffer
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else //Update操作,原记录所在的Block > 新记录的Block
{
/* lock target buffer first */
buffer = ReadBuffer(relation, targetBlock);
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); //优先锁定BlockNumber小的那个
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
}
/*
* We now have the target page (and the other buffer, if any) pinned
* and locked. However, since our initial PageIsAllVisible checks
* were performed before acquiring the lock, the results might now be
* out of date, either for the selected victim buffer, or for the
* other buffer passed by the caller. In that case, we'll need to
* give up our locks, go get the pin(s) we failed to get earlier, and
* re-lock. That's pretty painful, but hopefully shouldn't happen
* often.
*
* Note that there's a small possibility that we didn't pin the page
* above but still have the correct page pinned anyway, either because
* we've already made a previous pass through this loop, or because
* caller passed us the right page anyway.
*
* Note also that it's possible that by the time we get the pin and
* retake the buffer locks, the visibility map bit will have been
* cleared by some other backend anyway. In that case, we'll have
* done a bit of extra work for no gain, but there's no real harm
* done.
*/
if (otherBuffer == InvalidBuffer || targetBlock <= otherBlock)
GetVisibilityMapPins(relation, buffer, otherBuffer,
targetBlock, otherBlock, vmbuffer,
vmbuffer_other);
else
GetVisibilityMapPins(relation, otherBuffer, buffer,
otherBlock, targetBlock, vmbuffer_other,
vmbuffer);
page = BufferGetPage(buffer);
pageFreeSpace = PageGetHeapFreeSpace(page);
if (len + saveFreeSpace <= pageFreeSpace) //有足够的空间存储数据,返回此Buffer
{
/* use this page as future insert target, too */
RelationSetTargetBlock(relation, targetBlock);
return buffer;
}
/*
* 没有足够的空间,必须放弃文件块锁
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); //解除对buffer的锁定
if (otherBuffer == InvalidBuffer) //如果非update,释放buffer
ReleaseBuffer(buffer);
else if (otherBlock != targetBlock) //如果是update,目标块号和上次锁定的块号不等, 解除上次锁定的buffer, 释放当前buffer
{
LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
}
/* Without FSM, always fall out of the loop and extend */
if (!use_fsm) //不使用FSM定位空闲空间,跳出循环,执行扩展
break;
//使用FSM获取下一个备选的Block
//注意:如果全部扫描后发现没有满足条件的Block,targetBlock = InvalidBlockNumber,跳出循环
targetBlock = RecordAndGetPageWithFreeSpace(relation,
targetBlock,
pageFreeSpace,
len + saveFreeSpace);
}
/*
* 扩展表. 新创建的或临时表不需要加锁,其他情况需要加锁
*
* We have to use a lock to ensure no one else is extending the rel at the
* same time, else we will both try to initialize the same new page. We
* can skip locking for new or temp relations, however, since no one else
* could be accessing them.
*/
needLock = !RELATION_IS_LOCAL(relation); //判断当前关系是否需要加锁
/*
* If we need the lock but are not able to acquire it immediately, we'll
* consider extending the relation by multiple blocks at a time to manage
* contention on the relation extension lock. However, this only makes
* sense if we're using the FSM; otherwise, there's no point.
*/
if (needLock)
{
if (!use_fsm)
LockRelationForExtension(relation, ExclusiveLock);
else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
{
/* Couldn't get the lock immediately; wait for it. */
LockRelationForExtension(relation, ExclusiveLock);
/*
*如有其它进程扩展了数据表,那么可以成功获取满足条件的targetBlock
*/
targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
/*
* If some other waiter has already extended the relation, we
* don't need to do so; just use the existing freespace.
*/
if (targetBlock != InvalidBlockNumber)
{
UnlockRelationForExtension(relation, ExclusiveLock);
goto loop;
}
/* Time to bulk-extend. */
RelationAddExtraBlocks(relation, bistate);
}
}
/*
* In addition to whatever extension we performed above, we always add at
* least one block to satisfy our own request.
*
* XXX This does an lseek - rather expensive - but at the moment it is the
* only way to accurately determine how many blocks are in a relation. Is
* it worth keeping an accurate file length in shared memory someplace,
* rather than relying on the kernel to do it for us?
*/
buffer = ReadBufferBI(relation, P_NEW, bistate);
/*
* We can be certain that locking the otherBuffer first is OK, since it
* must have a lower page number.
*/
if (otherBuffer != InvalidBuffer)
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
/*
* Now acquire lock on the new page.
*/
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/*
* Release the file-extension lock; it's now OK for someone else to extend
* the relation some more. Note that we cannot release this lock before
* we have buffer lock on the new page, or we risk a race condition
* against vacuumlazy.c --- see comments therein.
*/
if (needLock)
UnlockRelationForExtension(relation, ExclusiveLock);
/*
* We need to initialize the empty new page. Double-check that it really
* is empty (this should never happen, but if it does we don't want to
* risk wiping out valid data).
*/
page = BufferGetPage(buffer);
if (!PageIsNew(page))
elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
BufferGetBlockNumber(buffer),
RelationGetRelationName(relation));
PageInit(page, BufferGetPageSize(buffer), 0);
if (len > PageGetHeapFreeSpace(page))
{
/* We should not get here given the test at the top */
elog(PANIC, "tuple is too big: size %zu", len);
}
/*
* Remember the new page as our target for future insertions.
*
* XXX should we enter the new page into the free space map immediately,
* or just keep it for this backend's exclusive use in the short run
* (until VACUUM sees it)? Seems to depend on whether you expect the
* current backend to make more insertions or not, which is probably a
* good bet most of the time. So for now, don't add it to FSM yet.
*/
RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
return buffer;
}
- CheckForSerializableConflictIn
/*
* CheckForSerializableConflictIn
* We are writing the given tuple. If that indicates a rw-conflict
* in from another serializable transaction, take appropriate action.
*
* Skip checking for any granularity for which a parameter is missing.
*
* A tuple update or delete is in conflict if we have a predicate lock
* against the relation or page in which the tuple exists, or against the
* tuple itself.
*/
void
CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
Buffer buffer)
{
PREDICATELOCKTARGETTAG targettag;
if (!SerializationNeededForWrite(relation))
return;
/* Check if someone else has already decided that we need to die */
if (SxactIsDoomed(MySerializableXact))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to read/write dependencies among transactions"),
errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
errhint("The transaction might succeed if retried.")));
/*
* We're doing a write which might cause rw-conflicts now or later.
* Memorize that fact.
*/
MyXactDidWrite = true;
/*
* It is important that we check for locks from the finest granularity to
* the coarsest granularity, so that granularity promotion doesn't cause
* us to miss a lock. The new (coarser) lock will be acquired before the
* old (finer) locks are released.
*
* It is not possible to take and hold a lock across the checks for all
* granularities because each target could be in a separate partition.
*/
if (tuple != NULL)
{
SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
relation->rd_node.dbNode,
relation->rd_id,
ItemPointerGetBlockNumber(&(tuple->t_self)),
ItemPointerGetOffsetNumber(&(tuple->t_self)));
CheckTargetForConflictsIn(&targettag);
}
if (BufferIsValid(buffer))
{
SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
relation->rd_node.dbNode,
relation->rd_id,
BufferGetBlockNumber(buffer));
CheckTargetForConflictsIn(&targettag);
}
SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
relation->rd_node.dbNode,
relation->rd_id);
CheckTargetForConflictsIn(&targettag);
}
- START_CRIT_SECTION
volatile uint32 CritSectionCount = 0;
#define START_CRIT_SECTION() (CritSectionCount++)
- PageIsAllVisible
#define PageIsAllVisible(page) \
(((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
- PageClearAllVisible
#define PageClearAllVisible(page) \
(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
- visibilitymap_clear
/*
* visibilitymap_clear - 清除VM块中某文件块的指定的比特位
*
* You must pass a buffer containing the correct map page to this function.
* Call visibilitymap_pin first to pin the right one. This function doesn't do
* any I/O. Returns true if any bits have been cleared and false otherwise.
*/
bool
visibilitymap_clear(Relation rel, BlockNumber heapBlk, Buffer buf, uint8 flags)
{
BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
int mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
int mapOffset = HEAPBLK_TO_OFFSET(heapBlk);
uint8 mask = flags << mapOffset;
char *map;
bool cleared = false;
Assert(flags & VISIBILITYMAP_VALID_BITS);
#ifdef TRACE_VISIBILITYMAP
elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
#endif
if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
elog(ERROR, "wrong buffer passed to visibilitymap_clear");
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
map = PageGetContents(BufferGetPage(buf));
if (map[mapByte] & mask)
{
map[mapByte] &= ~mask;
MarkBufferDirty(buf);
cleared = true;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
return cleared;
}
- MarkBufferDirty
/*
* MarkBufferDirty
*
* Marks buffer contents as dirty (actual write happens later).
*
* Buffer must be pinned and exclusive-locked. (If caller does not hold
* exclusive lock, then somebody could be in process of writing the buffer,
* leading to risk of bad data written to disk.)
*/
void
MarkBufferDirty(Buffer buffer)
{
BufferDesc *bufHdr;
uint32 buf_state;
uint32 old_buf_state;
if (!BufferIsValid(buffer))
elog(ERROR, "bad buffer ID: %d", buffer);
if (BufferIsLocal(buffer))
{
MarkLocalBufferDirty(buffer);
return;
}
bufHdr = GetBufferDescriptor(buffer - 1);
Assert(BufferIsPinned(buffer));
Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
LW_EXCLUSIVE));
old_buf_state = pg_atomic_read_u32(&bufHdr->state);
for (;;)
{
if (old_buf_state & BM_LOCKED)
old_buf_state = WaitBufHdrUnlocked(bufHdr);
buf_state = old_buf_state;
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
buf_state))
break;
}
/*
* If the buffer was not dirty already, do vacuum accounting.
*/
if (!(old_buf_state & BM_DIRTY))
{
VacuumPageDirty++;
pgBufferUsage.shared_blks_dirtied++;
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageDirty;
}
}