242 memset(&hash_ctl, 0,
sizeof(hash_ctl));
405 if (change->
data.
tp.newtuple)
411 if (change->
data.
tp.oldtuple)
477 if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
483 #ifdef USE_ASSERT_CHECKING
488 #ifdef USE_ASSERT_CHECKING
498 MAXIMUM_ALIGNOF + alloc_len);
540 bool *is_new,
XLogRecPtr lsn,
bool create_as_top)
644 bool transactional,
const char *prefix,
645 Size message_size,
const char *message)
659 change->
data.
msg.message_size = message_size;
661 memcpy(change->
data.
msg.message, message, message_size);
670 volatile Snapshot snapshot_now = snapshot;
679 rb->
message(rb, txn, lsn,
false, prefix, message_size, message);
696 #ifdef USE_ASSERT_CHECKING
711 Assert(prev_first_lsn < cur_txn->first_lsn);
779 elog(
ERROR,
"existing subxact assigned to unknown toplevel xact");
807 elog(
ERROR,
"subxact logged without previous toplevel record");
866 else if (pos_a == pos_b)
915 for (off = 0; off < state->
nr_txns; off++)
1054 elog(
DEBUG2,
"restored %u/%u changes from disk",
1083 for (off = 0; off < state->
nr_txns; off++)
1202 memset(&hash_ctl, 0,
sizeof(hash_ctl));
1438 change = specinsert;
1448 change->
data.
tp.relnode.relNode);
1459 elog(
ERROR,
"could not map filenode \"%s\" to relation OID",
1465 if (relation ==
NULL)
1466 elog(
ERROR,
"could not open relation with OID %u (for filenode \"%s\")",
1494 if (change->
data.
tp.clear_toast_afterwards)
1519 if (specinsert !=
NULL)
1525 if (relation !=
NULL)
1548 if (specinsert !=
NULL)
1556 specinsert = change;
1562 change->
data.
msg.message_size,
1570 if (snapshot_now->
copied)
1602 if (command_id < change->data.command_id)
1606 if (!snapshot_now->
copied)
1613 snapshot_now->
curcid = command_id;
1629 elog(
ERROR,
"tuplecid value in changequeue");
1650 rb->
commit(rb, txn, commit_lsn);
1654 elog(
ERROR,
"output plugin used XID %u",
1674 if (snapshot_now->
copied)
1700 if (snapshot_now->
copied)
1847 for (i = 0; i < ninvalidations; i++)
1971 elog(
ERROR,
"only ever add one set of invalidations");
2105 elog(
DEBUG2,
"spill %u changes in XID %u to disk",
2142 sprintf(path,
"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2148 O_CREAT | O_WRONLY | O_APPEND |
PG_BINARY,
2154 errmsg(
"could not open file \"%s\": %m",
2202 oldtup = change->
data.
tp.oldtuple;
2203 newtup = change->
data.
tp.newtuple;
2248 Size prefix_size = strlen(change->
data.
msg.prefix) + 1;
2250 sz += prefix_size + change->
data.
msg.message_size +
2257 memcpy(data, &prefix_size,
sizeof(
Size));
2258 data +=
sizeof(
Size);
2259 memcpy(data, change->
data.
msg.prefix,
2261 data += prefix_size;
2264 memcpy(data, &change->
data.
msg.message_size,
sizeof(
Size));
2265 data +=
sizeof(
Size);
2266 memcpy(data, change->
data.
msg.message,
2267 change->
data.
msg.message_size);
2268 data += change->
data.
msg.message_size;
2295 memcpy(data, snap->
xip,
2302 memcpy(data, snap->
subxip,
2322 errmsg(
"could not write to data file for XID %u: %m",
2380 sprintf(path,
"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2385 if (*fd < 0 && errno == ENOENT)
2394 errmsg(
"could not open file \"%s\": %m",
2415 else if (readBytes < 0)
2418 errmsg(
"could not read from reorderbuffer spill file: %m")));
2422 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
2438 errmsg(
"could not read from reorderbuffer spill file: %m")));
2442 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
2489 if (change->
data.
tp.oldtuple)
2493 change->
data.
tp.oldtuple =
2497 memcpy(&change->
data.
tp.oldtuple->tuple, data,
2502 change->
data.
tp.oldtuple->tuple.t_data =
2506 memcpy(change->
data.
tp.oldtuple->tuple.t_data, data, tuplelen);
2510 if (change->
data.
tp.newtuple)
2518 change->
data.
tp.newtuple =
2522 memcpy(&change->
data.
tp.newtuple->tuple, data,
2527 change->
data.
tp.newtuple->tuple.t_data =
2531 memcpy(change->
data.
tp.newtuple->tuple.t_data, data, tuplelen);
2541 memcpy(&prefix_size, data,
sizeof(
Size));
2542 data +=
sizeof(
Size);
2545 memcpy(change->
data.
msg.prefix, data, prefix_size);
2547 data += prefix_size;
2550 memcpy(&change->
data.
msg.message_size, data,
sizeof(
Size));
2551 data +=
sizeof(
Size);
2553 change->
data.
msg.message_size);
2554 memcpy(change->
data.
msg.message, data,
2555 change->
data.
msg.message_size);
2556 data += change->
data.
msg.message_size;
2576 memcpy(newsnap, data, size);
2611 for (cur = first; cur <= last; cur++)
2618 sprintf(path,
"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2621 if (
unlink(path) != 0 && errno != ENOENT)
2624 errmsg(
"could not remove file \"%s\": %m", path)));
2636 struct dirent *logical_de;
2642 while ((logical_de =
ReadDir(logical_dir,
"pg_replslot")) !=
NULL)
2644 struct stat statbuf;
2647 if (strcmp(logical_de->
d_name,
".") == 0 ||
2648 strcmp(logical_de->
d_name,
"..") == 0)
2659 sprintf(path,
"pg_replslot/%s", logical_de->
d_name);
2662 if (
lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2666 while ((spill_de =
ReadDir(spill_dir, path)) !=
NULL)
2668 if (strcmp(spill_de->
d_name,
".") == 0 ||
2669 strcmp(spill_de->
d_name,
"..") == 0)
2673 if (strncmp(spill_de->
d_name,
"xid", 3) == 0)
2675 sprintf(path,
"pg_replslot/%s/%s", logical_de->
d_name,
2681 errmsg(
"could not remove file \"%s\": %m",
2705 memset(&hash_ctl, 0,
sizeof(hash_ctl));
2738 newtup = change->
data.
tp.newtuple;
2760 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq 0",
2761 chunk_seq, chunk_id);
2764 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq %d",
2777 elog(
ERROR,
"unexpected type of toast chunk");
2779 ent->
size += chunksize;
2826 newtup = change->
data.
tp.newtuple;
2830 for (natt = 0; natt < desc->
natts; natt++)
2839 struct varlena *new_datum =
NULL;
2840 struct varlena *reconstructed;
2845 if (attr->attnum < 0)
2848 if (attr->attisdropped)
2852 if (attr->attlen != -1)
2897 ctup = cchange->
data.
tp.newtuple;
2905 memcpy(
VARDATA(reconstructed) + data_done,
2918 memset(&redirect_pointer, 0,
sizeof(redirect_pointer));
2919 redirect_pointer.
pointer = reconstructed;
2923 sizeof(redirect_pointer));
2946 for (natt = 0; natt < desc->
natts; natt++)
3041 elog(
DEBUG3,
"mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3068 sprintf(path,
"pg_logical/mappings/%s", fname);
3072 (
errmsg(
"could not open file \"%s\": %m", path)));
3090 errmsg(
"could not read file \"%s\": %m",
3092 else if (readBytes == 0)
3097 errmsg(
"could not read from file \"%s\": read %d instead of %d bytes",
3153 return bsearch(&xid, xip, num,
3168 else if (a->
lsn > b->
lsn)
3181 struct dirent *mapping_de;
3189 while ((mapping_de =
ReadDir(mapping_dir,
"pg_logical/mappings")) !=
NULL)
3200 if (strcmp(mapping_de->
d_name,
".") == 0 ||
3201 strcmp(mapping_de->
d_name,
"..") == 0)
3205 if (strncmp(mapping_de->
d_name,
"map-", 4) != 0)
3209 &f_dboid, &f_relid, &f_hi, &f_lo,
3210 &f_mapped_xid, &f_create_xid) != 6)
3213 f_lsn = ((uint64) f_hi) << 32 | f_lo;
3216 if (f_dboid != dboid)
3220 if (f_relid != relid)
3242 foreach(file, files)
3244 files_a[off++] =
lfirst(file);
3276 bool updated_mapping =
false;
3279 memset(&key, 0,
sizeof(key));
3309 if (ent ==
NULL && !updated_mapping)
3313 updated_mapping =
true;
3316 else if (ent ==
NULL)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define BlockIdGetBlockNumber(blockId)
struct ReorderBufferToastEnt ReorderBufferToastEnt
void AbortCurrentTransaction(void)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
bool IsToastRelation(Relation relation)
void hash_destroy(HTAB *hashp)
dlist_head cached_changes
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
#define InvalidXLogRecPtr
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
HeapTupleData * HeapTuple
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void StartupReorderBuffer(void)
#define fastgetattr(tup, attnum, tupleDesc, isnull)
static const Size max_cached_tuplebufs
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define RelationGetDescr(relation)
static void dlist_push_head(dlist_head *head, dlist_node *node)
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
TransactionId by_txn_last_xid
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
#define PointerGetDatum(X)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define dlist_foreach(iter, lhead)
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
char * pstrdup(const char *in)
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
Form_pg_attribute * attrs
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void ReorderBufferFree(ReorderBuffer *rb)
static void slist_push_head(slist_head *head, slist_node *node)
struct ReorderBufferChange::@51::@53 msg
bool IsTransactionOrTransactionBlock(void)
void binaryheap_replace_first(binaryheap *heap, Datum d)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
bool TransactionIdDidCommit(TransactionId transactionId)
ReplicationSlotPersistentData data
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
XLogRecPtr base_snapshot_lsn
enum ReorderBufferChangeType action
void binaryheap_add_unordered(binaryheap *heap, Datum d)
static int fd(const char *x, int i)
#define VARDATA_EXTERNAL(PTR)
#define ALLOCSET_DEFAULT_MINSIZE
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static int file_sort_by_lsn(const void *a_p, const void *b_p)
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
#define VARATT_IS_EXTERNAL(PTR)
struct ReorderBufferChange::@51::@54 tuplecid
bool ReplicationSlotValidateName(const char *name, int elevel)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
struct ReorderBufferChange::@51::@52 tp
#define ReorderBufferTupleBufData(p)
#define dlist_container(type, membername, ptr)
void pfree(void *pointer)
static void slist_init(slist_head *head)
Size nr_cached_transactions
#define VARATT_IS_SHORT(PTR)
Datum binaryheap_first(binaryheap *heap)
ReorderBufferTupleCidKey key
TransactionId GetCurrentTransactionId(void)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
#define SET_VARTAG_EXTERNAL(PTR, tag)
static const Size max_cached_changes
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
int errcode_for_file_access(void)
struct SnapshotData SnapshotData
dlist_head cached_transactions
TransactionId GetCurrentTransactionIdIfAny(void)
#define InvalidTransactionId
static const Size max_cached_transactions
FormData_pg_attribute * Form_pg_attribute
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
DIR * AllocateDir(const char *dirname)
void RelationClose(Relation relation)
MemoryContext CurrentMemoryContext
static void dlist_delete(dlist_node *node)
ReorderBufferMessageCB message
int unlink(const char *filename)
#define ereport(elevel, rest)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
static slist_node * slist_pop_head_node(slist_head *head)
List * lappend(List *list, void *datum)
static HTAB * tuplecid_data
int CloseTransientFile(int fd)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
#define INDIRECT_POINTER_SIZE
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
#define slist_container(type, membername, ptr)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
void * palloc0(Size size)
static bool dlist_has_next(dlist_head *head, dlist_node *node)
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
ReorderBufferTXN * by_txn_last_txn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void cleanup(void)
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
bool IsSharedRelation(Oid relationId)
void * MemoryContextAllocZero(MemoryContext context, Size size)
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void dlist_init(dlist_head *head)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
ReplicationSlot * MyReplicationSlot
#define XLByteToSeg(xlrp, logSegNo)
#define Assert(condition)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferChange::@51 data
struct dirent * ReadDir(DIR *dir, const char *dirname)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
void StartTransactionCommand(void)
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
static bool dlist_is_empty(dlist_head *head)
void binaryheap_free(binaryheap *heap)
SharedInvalidationMessage * invalidations
static int list_length(const List *l)
void BeginInternalSubTransaction(char *name)
#define BufferIsLocal(buffer)
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void * repalloc(void *pointer, Size size)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void SnapBuildSnapDecRefcount(Snapshot snap)
#define LOGICAL_REWRITE_FORMAT
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
#define VARATT_IS_EXTENDED(PTR)
#define DatumGetPointer(X)
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
int errmsg(const char *fmt,...)
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
void * MemoryContextAlloc(MemoryContext context, Size size)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
#define ALLOCSET_DEFAULT_INITSIZE
XLogRecPtr restart_decoding_lsn
#define SET_VARSIZE_COMPRESSED(PTR, len)
Datum binaryheap_remove_first(binaryheap *heap)
static const Size max_changes_in_memory
#define ALLOCSET_DEFAULT_MAXSIZE
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define SET_VARSIZE(PTR, len)
#define ItemPointerGetBlockNumber(pointer)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define qsort(a, b, c, d)
#define TransactionIdIsValid(xid)
ReorderBufferChange change
ReorderBufferBeginCB begin
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
#define XLByteInSeg(xlrp, logSegNo)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
slist_head cached_tuplebufs
#define ItemPointerCopy(fromPointer, toPointer)
int xidComparator(const void *arg1, const void *arg2)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)