62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
65 #define COMMIT_TS_XACTS_PER_PAGE \
66 (BLCKSZ / SizeOfCommitTimestampEntry)
68 #define TransactionIdToCTsPage(xid) \
69 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid) \
71 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
78 #define CommitTsCtl (&CommitTsCtlData)
175 newestXact = subxids[nsubxids - 1];
186 for (i = 0, headxid = xid;;)
191 for (j = i; j < nsubxids; j++)
202 if (j + 1 >= nsubxids)
209 headxid = subxids[j];
242 for (i = 0; i < nsubxids; i++)
294 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295 errmsg(
"cannot retrieve commit timestamp for transaction %u", xid)));
384 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 errmsg(
"could not get commit timestamp data"),
387 errhint(
"Make sure the configuration parameter \"%s\" is set on the master server.",
388 "track_commit_timestamp") :
389 errhint(
"Make sure the configuration parameter \"%s\" is set.",
390 "track_commit_timestamp")));
438 memset(nulls,
true,
sizeof(nulls));
488 CommitTsControlLock,
"pg_commit_ts",
996 elog(
PANIC,
"commit_ts_redo: unknown op code %u", info);
#define COMMIT_TS_ZEROPAGE
#define PG_GETARG_UINT32(n)
CommitTimestampEntry dataLastCommit
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
int errhint(const char *fmt,...)
#define TransactionIdEquals(id1, id2)
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
#define SizeOfCommitTimestampEntry
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
static int ZeroCommitTsPage(int pageno, bool writeXlog)
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
void StartupCommitTs(void)
int errcode(int sqlerrcode)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
bool RecoveryInProgress(void)
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Size SimpleLruShmemSize(int nslots, int nlsns)
#define PG_RETURN_TIMESTAMPTZ(x)
void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
Size CommitTsShmemBuffers(void)
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
void CompleteCommitTsInitialization(void)
void LWLockRelease(LWLock *lock)
void pfree(void *pointer)
#define XLogRecGetData(decoder)
#define FirstNormalTransactionId
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
#define XLogRecGetDataLen(decoder)
#define COMMIT_TS_XACTS_PER_PAGE
void CommitTsParameterChange(bool newvalue, bool oldvalue)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define TimestampTzGetDatum(X)
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
bool track_commit_timestamp
#define SizeOfCommitTsSet
void commit_ts_redo(XLogReaderState *record)
VariableCache ShmemVariableCache
#define InvalidTransactionId
TransactionId ReadNewTransactionId(void)
#define TIMESTAMP_NOBEGIN(j)
static void ActivateCommitTs(void)
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
void SimpleLruWritePage(SlruCtl ctl, int slotno)
static void WriteZeroPageXlogRec(int pageno)
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
#define ereport(elevel, rest)
struct CommitTimestampEntry CommitTimestampEntry
TransactionId oldestCommitTsXid
#define XLogRecGetInfo(decoder)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
void TruncateCommitTs(TransactionId oldestXact)
void XLogRegisterData(char *data, int len)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
#define TransactionIdGetDatum(X)
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
#define TransactionIdToCTsPage(xid)
#define PG_RETURN_DATUM(x)
#define COMMIT_TS_TRUNCATE
static bool CommitTsPagePrecedes(int page1, int page2)
TransactionId xidLastCommit
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
void CheckPointCommitTs(void)
void ShutdownCommitTs(void)
#define Assert(condition)
void ExtendCommitTs(TransactionId newestXact)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
TransactionId newestCommitTsXid
Size CommitTsShmemSize(void)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
#define HeapTupleGetDatum(tuple)
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
#define TransactionIdToCTsEntry(xid)
static SlruCtlData CommitTsCtlData
#define InvalidRepOriginId
static void DeactivateCommitTs(void)
static Datum values[MAXATTR]
int errmsg(const char *fmt,...)
static void WriteTruncateXlogRec(int pageno)
#define XLogRecHasAnyBlockRefs(decoder)
#define TransactionIdIsValid(xid)
void BootStrapCommitTs(void)
#define TransactionIdIsNormal(xid)
void XLogBeginInsert(void)
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid, bool write_xlog)
CommitTimestampShared * commitTsShared
static void error_commit_ts_disabled(void)
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
#define offsetof(type, field)
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
void CommitTsShmemInit(void)
struct CommitTimestampShared CommitTimestampShared
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)