PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_clog-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38 
39 /*
40  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50 
51 /*
52  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
61 
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63  sizeof(RepOriginId))
64 
65 #define COMMIT_TS_XACTS_PER_PAGE \
66  (BLCKSZ / SizeOfCommitTimestampEntry)
67 
68 #define TransactionIdToCTsPage(xid) \
69  ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid) \
71  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72 
73 /*
74  * Link to shared-memory data structures for CommitTs control
75  */
77 
78 #define CommitTsCtl (&CommitTsCtlData)
79 
80 /*
81  * We keep a cache of the last value set in shared memory.
82  *
83  * This is also good place to keep the activation status. We keep this
84  * separate from the GUC so that the standby can activate the module if the
85  * primary has it active independently of the value of the GUC.
86  *
87  * This is protected by CommitTsLock. In some places, we use commitTsActive
88  * without acquiring the lock; where this happens, a comment explains the
89  * rationale for it.
90  */
91 typedef struct CommitTimestampShared
92 {
97 
99 
100 
101 /* GUC variable */
103 
104 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105  TransactionId *subxids, TimestampTz ts,
106  RepOriginId nodeid, int pageno);
108  RepOriginId nodeid, int slotno);
109 static void error_commit_ts_disabled(void);
110 static int ZeroCommitTsPage(int pageno, bool writeXlog);
111 static bool CommitTsPagePrecedes(int page1, int page2);
112 static void ActivateCommitTs(void);
113 static void DeactivateCommitTs(void);
114 static void WriteZeroPageXlogRec(int pageno);
115 static void WriteTruncateXlogRec(int pageno);
116 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
118  RepOriginId nodeid);
119 
120 /*
121  * TransactionTreeSetCommitTsData
122  *
123  * Record the final commit timestamp of transaction entries in the commit log
124  * for a transaction and its subtransaction tree, as efficiently as possible.
125  *
126  * xid is the top level transaction id.
127  *
128  * subxids is an array of xids of length nsubxids, representing subtransactions
129  * in the tree of xid. In various cases nsubxids may be zero.
130  * The reason why tracking just the parent xid commit timestamp is not enough
131  * is that the subtrans SLRU does not stay valid across crashes (it's not
132  * permanent) so we need to keep the information about them here. If the
133  * subtrans implementation changes in the future, we might want to revisit the
134  * decision of storing timestamp info for each subxid.
135  *
136  * The write_xlog parameter tells us whether to include an XLog record of this
137  * or not. Normally, this is called from transaction commit routines (both
138  * normal and prepared) and the information will be stored in the transaction
139  * commit XLog record, and so they should pass "false" for this. The XLog redo
140  * code should use "false" here as well. Other callers probably want to pass
141  * true, so that the given values persist in case of crashes.
142  */
143 void
146  RepOriginId nodeid, bool write_xlog)
147 {
148  int i;
149  TransactionId headxid;
150  TransactionId newestXact;
151 
152  /*
153  * No-op if the module is not active.
154  *
155  * An unlocked read here is fine, because in a standby (the only place
156  * where the flag can change in flight) this routine is only called by
157  * the recovery process, which is also the only process which can change
158  * the flag.
159  */
160  if (!commitTsShared->commitTsActive)
161  return;
162 
163  /*
164  * Comply with the WAL-before-data rule: if caller specified it wants this
165  * value to be recorded in WAL, do so before touching the data.
166  */
167  if (write_xlog)
168  WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
169 
170  /*
171  * Figure out the latest Xid in this batch: either the last subxid if
172  * there's any, otherwise the parent xid.
173  */
174  if (nsubxids > 0)
175  newestXact = subxids[nsubxids - 1];
176  else
177  newestXact = xid;
178 
179  /*
180  * We split the xids to set the timestamp to in groups belonging to the
181  * same SLRU page; the first element in each such set is its head. The
182  * first group has the main XID as the head; subsequent sets use the first
183  * subxid not on the previous page as head. This way, we only have to
184  * lock/modify each SLRU page once.
185  */
186  for (i = 0, headxid = xid;;)
187  {
188  int pageno = TransactionIdToCTsPage(headxid);
189  int j;
190 
191  for (j = i; j < nsubxids; j++)
192  {
193  if (TransactionIdToCTsPage(subxids[j]) != pageno)
194  break;
195  }
196  /* subxids[i..j] are on the same page as the head */
197 
198  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
199  pageno);
200 
201  /* if we wrote out all subxids, we're done. */
202  if (j + 1 >= nsubxids)
203  break;
204 
205  /*
206  * Set the new head and skip over it, as well as over the subxids we
207  * just wrote.
208  */
209  headxid = subxids[j];
210  i += j - i + 1;
211  }
212 
213  /* update the cached value in shared memory */
214  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
215  commitTsShared->xidLastCommit = xid;
216  commitTsShared->dataLastCommit.time = timestamp;
217  commitTsShared->dataLastCommit.nodeid = nodeid;
218 
219  /* and move forwards our endpoint, if needed */
222  LWLockRelease(CommitTsLock);
223 }
224 
225 /*
226  * Record the commit timestamp of transaction entries in the commit log for all
227  * entries on a single page. Atomic only on this page.
228  */
229 static void
231  TransactionId *subxids, TimestampTz ts,
232  RepOriginId nodeid, int pageno)
233 {
234  int slotno;
235  int i;
236 
237  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
238 
239  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
240 
241  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
242  for (i = 0; i < nsubxids; i++)
243  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
244 
245  CommitTsCtl->shared->page_dirty[slotno] = true;
246 
247  LWLockRelease(CommitTsControlLock);
248 }
249 
250 /*
251  * Sets the commit timestamp of a single transaction.
252  *
253  * Must be called with CommitTsControlLock held
254  */
255 static void
257  RepOriginId nodeid, int slotno)
258 {
259  int entryno = TransactionIdToCTsEntry(xid);
260  CommitTimestampEntry entry;
261 
263 
264  entry.time = ts;
265  entry.nodeid = nodeid;
266 
267  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
268  SizeOfCommitTimestampEntry * entryno,
270 }
271 
272 /*
273  * Interrogate the commit timestamp of a transaction.
274  *
275  * The return value indicates whether a commit timestamp record was found for
276  * the given xid. The timestamp value is returned in *ts (which may not be
277  * null), and the origin node for the Xid is returned in *nodeid, if it's not
278  * null.
279  */
280 bool
282  RepOriginId *nodeid)
283 {
284  int pageno = TransactionIdToCTsPage(xid);
285  int entryno = TransactionIdToCTsEntry(xid);
286  int slotno;
287  CommitTimestampEntry entry;
288  TransactionId oldestCommitTsXid;
289  TransactionId newestCommitTsXid;
290 
291  /* error if the given Xid doesn't normally commit */
292  if (!TransactionIdIsNormal(xid))
293  ereport(ERROR,
294  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 
297  LWLockAcquire(CommitTsLock, LW_SHARED);
298 
299  /* Error if module not enabled */
300  if (!commitTsShared->commitTsActive)
302 
303  /*
304  * If we're asked for the cached value, return that. Otherwise, fall
305  * through to read from SLRU.
306  */
307  if (commitTsShared->xidLastCommit == xid)
308  {
309  *ts = commitTsShared->dataLastCommit.time;
310  if (nodeid)
311  *nodeid = commitTsShared->dataLastCommit.nodeid;
312 
313  LWLockRelease(CommitTsLock);
314  return *ts != 0;
315  }
316 
317  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
318  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
319  /* neither is invalid, or both are */
320  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321  LWLockRelease(CommitTsLock);
322 
323  /*
324  * Return empty if the requested value is outside our valid range.
325  */
326  if (!TransactionIdIsValid(oldestCommitTsXid) ||
327  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328  TransactionIdPrecedes(newestCommitTsXid, xid))
329  {
330  *ts = 0;
331  if (nodeid)
332  *nodeid = InvalidRepOriginId;
333  return false;
334  }
335 
336  /* lock is acquired by SimpleLruReadPage_ReadOnly */
337  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338  memcpy(&entry,
339  CommitTsCtl->shared->page_buffer[slotno] +
340  SizeOfCommitTimestampEntry * entryno,
342 
343  *ts = entry.time;
344  if (nodeid)
345  *nodeid = entry.nodeid;
346 
347  LWLockRelease(CommitTsControlLock);
348  return *ts != 0;
349 }
350 
351 /*
352  * Return the Xid of the latest committed transaction. (As far as this module
353  * is concerned, anyway; it's up to the caller to ensure the value is useful
354  * for its purposes.)
355  *
356  * ts and extra are filled with the corresponding data; they can be passed
357  * as NULL if not wanted.
358  */
361 {
362  TransactionId xid;
363 
364  LWLockAcquire(CommitTsLock, LW_SHARED);
365 
366  /* Error if module not enabled */
367  if (!commitTsShared->commitTsActive)
369 
370  xid = commitTsShared->xidLastCommit;
371  if (ts)
372  *ts = commitTsShared->dataLastCommit.time;
373  if (nodeid)
374  *nodeid = commitTsShared->dataLastCommit.nodeid;
375  LWLockRelease(CommitTsLock);
376 
377  return xid;
378 }
379 
380 static void
382 {
383  ereport(ERROR,
384  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385  errmsg("could not get commit timestamp data"),
387  errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
388  "track_commit_timestamp") :
389  errhint("Make sure the configuration parameter \"%s\" is set.",
390  "track_commit_timestamp")));
391 }
392 
393 /*
394  * SQL-callable wrapper to obtain commit time of a transaction
395  */
396 Datum
398 {
400  TimestampTz ts;
401  bool found;
402 
403  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
404 
405  if (!found)
406  PG_RETURN_NULL();
407 
409 }
410 
411 
412 Datum
414 {
415  TransactionId xid;
416  TimestampTz ts;
417  Datum values[2];
418  bool nulls[2];
419  TupleDesc tupdesc;
420  HeapTuple htup;
421 
422  /* and construct a tuple with our data */
423  xid = GetLatestCommitTsData(&ts, NULL);
424 
425  /*
426  * Construct a tuple descriptor for the result row. This must match this
427  * function's pg_proc entry!
428  */
429  tupdesc = CreateTemplateTupleDesc(2, false);
430  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
431  XIDOID, -1, 0);
432  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
433  TIMESTAMPTZOID, -1, 0);
434  tupdesc = BlessTupleDesc(tupdesc);
435 
436  if (!TransactionIdIsNormal(xid))
437  {
438  memset(nulls, true, sizeof(nulls));
439  }
440  else
441  {
442  values[0] = TransactionIdGetDatum(xid);
443  nulls[0] = false;
444 
445  values[1] = TimestampTzGetDatum(ts);
446  nulls[1] = false;
447  }
448 
449  htup = heap_form_tuple(tupdesc, values, nulls);
450 
452 }
453 
454 
455 /*
456  * Number of shared CommitTS buffers.
457  *
458  * We use a very similar logic as for the number of CLOG buffers; see comments
459  * in CLOGShmemBuffers.
460  */
461 Size
463 {
464  return Min(16, Max(4, NBuffers / 1024));
465 }
466 
467 /*
468  * Shared memory sizing for CommitTs
469  */
470 Size
472 {
474  sizeof(CommitTimestampShared);
475 }
476 
477 /*
478  * Initialize CommitTs at system startup (postmaster start or standalone
479  * backend)
480  */
481 void
483 {
484  bool found;
485 
486  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
487  SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
488  CommitTsControlLock, "pg_commit_ts",
490 
491  commitTsShared = ShmemInitStruct("CommitTs shared",
492  sizeof(CommitTimestampShared),
493  &found);
494 
495  if (!IsUnderPostmaster)
496  {
497  Assert(!found);
498 
499  commitTsShared->xidLastCommit = InvalidTransactionId;
500  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
501  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
502  commitTsShared->commitTsActive = false;
503  }
504  else
505  Assert(found);
506 }
507 
508 /*
509  * This function must be called ONCE on system install.
510  *
511  * (The CommitTs directory is assumed to have been created by initdb, and
512  * CommitTsShmemInit must have been called already.)
513  */
514 void
516 {
517  /*
518  * Nothing to do here at present, unlike most other SLRU modules; segments
519  * are created when the server is started with this module enabled. See
520  * ActivateCommitTs.
521  */
522 }
523 
524 /*
525  * Initialize (or reinitialize) a page of CommitTs to zeroes.
526  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
527  *
528  * The page is not actually written, just set up in shared memory.
529  * The slot number of the new page is returned.
530  *
531  * Control lock must be held at entry, and will be held at exit.
532  */
533 static int
534 ZeroCommitTsPage(int pageno, bool writeXlog)
535 {
536  int slotno;
537 
538  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
539 
540  if (writeXlog)
541  WriteZeroPageXlogRec(pageno);
542 
543  return slotno;
544 }
545 
546 /*
547  * This must be called ONCE during postmaster or standalone-backend startup,
548  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
549  */
550 void
552 {
554 }
555 
556 /*
557  * This must be called ONCE during postmaster or standalone-backend startup,
558  * after recovery has finished.
559  */
560 void
562 {
563  /*
564  * If the feature is not enabled, turn it off for good. This also removes
565  * any leftover data.
566  *
567  * Conversely, we activate the module if the feature is enabled. This is
568  * not necessary in a master system because we already did it earlier, but
569  * if we're in a standby server that got promoted which had the feature
570  * enabled and was following a master that had the feature disabled, this
571  * is where we turn it on locally.
572  */
575  else
577 }
578 
579 /*
580  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
581  * XLog record in a standby.
582  */
583 void
584 CommitTsParameterChange(bool newvalue, bool oldvalue)
585 {
586  /*
587  * If the commit_ts module is disabled in this server and we get word from
588  * the master server that it is enabled there, activate it so that we can
589  * replay future WAL records involving it; also mark it as active on
590  * pg_control. If the old value was already set, we already did this, so
591  * don't do anything.
592  *
593  * If the module is disabled in the master, disable it here too, unless
594  * the module is enabled locally.
595  *
596  * Note this only runs in the recovery process, so an unlocked read is
597  * fine.
598  */
599  if (newvalue)
600  {
601  if (!commitTsShared->commitTsActive)
603  }
604  else if (commitTsShared->commitTsActive)
606 }
607 
608 /*
609  * Activate this module whenever necessary.
610  * This must happen during postmaster or standalong-backend startup,
611  * or during WAL replay anytime the track_commit_timestamp setting is
612  * changed in the master.
613  *
614  * The reason why this SLRU needs separate activation/deactivation functions is
615  * that it can be enabled/disabled during start and the activation/deactivation
616  * on master is propagated to slave via replay. Other SLRUs don't have this
617  * property and they can be just initialized during normal startup.
618  *
619  * This is in charge of creating the currently active segment, if it's not
620  * already there. The reason for this is that the server might have been
621  * running with this module disabled for a while and thus might have skipped
622  * the normal creation point.
623  */
624 static void
626 {
627  TransactionId xid;
628  int pageno;
629 
630  /* If we've done this already, there's nothing to do */
631  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
632  if (commitTsShared->commitTsActive)
633  {
634  LWLockRelease(CommitTsLock);
635  return;
636  }
637  LWLockRelease(CommitTsLock);
638 
640  pageno = TransactionIdToCTsPage(xid);
641 
642  /*
643  * Re-Initialize our idea of the latest page number.
644  */
645  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
646  CommitTsCtl->shared->latest_page_number = pageno;
647  LWLockRelease(CommitTsControlLock);
648 
649  /*
650  * If CommitTs is enabled, but it wasn't in the previous server run, we
651  * need to set the oldest and newest values to the next Xid; that way, we
652  * will not try to read data that might not have been set.
653  *
654  * XXX does this have a problem if a server is started with commitTs
655  * enabled, then started with commitTs disabled, then restarted with it
656  * enabled again? It doesn't look like it does, because there should be a
657  * checkpoint that sets the value to InvalidTransactionId at end of
658  * recovery; and so any chance of injecting new transactions without
659  * CommitTs values would occur after the oldestCommitTsXid has been set to
660  * Invalid temporarily.
661  */
662  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
664  {
667  }
668  LWLockRelease(CommitTsLock);
669 
670  /* Create the current segment file, if necessary */
672  {
673  int slotno;
674 
675  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
676  slotno = ZeroCommitTsPage(pageno, false);
678  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
679  LWLockRelease(CommitTsControlLock);
680  }
681 
682  /* Change the activation status in shared memory. */
683  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
684  commitTsShared->commitTsActive = true;
685  LWLockRelease(CommitTsLock);
686 }
687 
688 /*
689  * Deactivate this module.
690  *
691  * This must be called when the track_commit_timestamp parameter is turned off.
692  * This happens during postmaster or standalone-backend startup, or during WAL
693  * replay.
694  *
695  * Resets CommitTs into invalid state to make sure we don't hand back
696  * possibly-invalid data; also removes segments of old data.
697  */
698 static void
700 {
701  /*
702  * Cleanup the status in the shared memory.
703  *
704  * We reset everything in the commitTsShared record to prevent user from
705  * getting confusing data about last committed transaction on the standby
706  * when the module was activated repeatedly on the primary.
707  */
708  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
709 
710  commitTsShared->commitTsActive = false;
711  commitTsShared->xidLastCommit = InvalidTransactionId;
712  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
713  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
714 
717 
718  LWLockRelease(CommitTsLock);
719 
720  /*
721  * Remove *all* files. This is necessary so that there are no leftover
722  * files; in the case where this feature is later enabled after running
723  * with it disabled for some time there may be a gap in the file sequence.
724  * (We can probably tolerate out-of-sequence files, as they are going to
725  * be overwritten anyway when we wrap around, but it seems better to be
726  * tidy.)
727  */
728  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
730  LWLockRelease(CommitTsControlLock);
731 }
732 
733 /*
734  * This must be called ONCE during postmaster or standalone-backend shutdown
735  */
736 void
738 {
739  /* Flush dirty CommitTs pages to disk */
740  SimpleLruFlush(CommitTsCtl, false);
741 }
742 
743 /*
744  * Perform a checkpoint --- either during shutdown, or on-the-fly
745  */
746 void
748 {
749  /* Flush dirty CommitTs pages to disk */
751 }
752 
753 /*
754  * Make sure that CommitTs has room for a newly-allocated XID.
755  *
756  * NB: this is called while holding XidGenLock. We want it to be very fast
757  * most of the time; even when it's not so fast, no actual I/O need happen
758  * unless we're forced to write out a dirty CommitTs or xlog page to make room
759  * in shared memory.
760  *
761  * NB: the current implementation relies on track_commit_timestamp being
762  * PGC_POSTMASTER.
763  */
764 void
766 {
767  int pageno;
768 
769  /*
770  * Nothing to do if module not enabled. Note we do an unlocked read of the
771  * flag here, which is okay because this routine is only called from
772  * GetNewTransactionId, which is never called in a standby.
773  */
774  Assert(!InRecovery);
775  if (!commitTsShared->commitTsActive)
776  return;
777 
778  /*
779  * No work except at first XID of a page. But beware: just after
780  * wraparound, the first XID of page zero is FirstNormalTransactionId.
781  */
782  if (TransactionIdToCTsEntry(newestXact) != 0 &&
784  return;
785 
786  pageno = TransactionIdToCTsPage(newestXact);
787 
788  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
789 
790  /* Zero the page and make an XLOG entry about it */
791  ZeroCommitTsPage(pageno, !InRecovery);
792 
793  LWLockRelease(CommitTsControlLock);
794 }
795 
796 /*
797  * Remove all CommitTs segments before the one holding the passed
798  * transaction ID.
799  *
800  * Note that we don't need to flush XLOG here.
801  */
802 void
804 {
805  int cutoffPage;
806 
807  /*
808  * The cutoff point is the start of the segment containing oldestXact. We
809  * pass the *page* containing oldestXact to SimpleLruTruncate.
810  */
811  cutoffPage = TransactionIdToCTsPage(oldestXact);
812 
813  /* Check to see if there's any files that could be removed */
815  &cutoffPage))
816  return; /* nothing to remove */
817 
818  /* Write XLOG record */
819  WriteTruncateXlogRec(cutoffPage);
820 
821  /* Now we can remove the old CommitTs segment(s) */
822  SimpleLruTruncate(CommitTsCtl, cutoffPage);
823 }
824 
825 /*
826  * Set the limit values between which commit TS can be consulted.
827  */
828 void
830 {
831  /*
832  * Be careful not to overwrite values that are either further into the
833  * "future" or signal a disabled committs.
834  */
835  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
837  {
842  }
843  else
844  {
846  }
847  LWLockRelease(CommitTsLock);
848 }
849 
850 /*
851  * Move forwards the oldest commitTS value that can be consulted
852  */
853 void
855 {
856  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
860  LWLockRelease(CommitTsLock);
861 }
862 
863 
864 /*
865  * Decide which of two CLOG page numbers is "older" for truncation purposes.
866  *
867  * We need to use comparison of TransactionIds here in order to do the right
868  * thing with wraparound XID arithmetic. However, if we are asked about
869  * page number zero, we don't want to hand InvalidTransactionId to
870  * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
871  * offset both xids by FirstNormalTransactionId to avoid that.
872  */
873 static bool
874 CommitTsPagePrecedes(int page1, int page2)
875 {
876  TransactionId xid1;
877  TransactionId xid2;
878 
879  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
880  xid1 += FirstNormalTransactionId;
881  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
882  xid2 += FirstNormalTransactionId;
883 
884  return TransactionIdPrecedes(xid1, xid2);
885 }
886 
887 
888 /*
889  * Write a ZEROPAGE xlog record
890  */
891 static void
893 {
894  XLogBeginInsert();
895  XLogRegisterData((char *) (&pageno), sizeof(int));
896  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
897 }
898 
899 /*
900  * Write a TRUNCATE xlog record
901  */
902 static void
904 {
905  XLogBeginInsert();
906  XLogRegisterData((char *) (&pageno), sizeof(int));
907  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
908 }
909 
910 /*
911  * Write a SETTS xlog record
912  */
913 static void
916  RepOriginId nodeid)
917 {
918  xl_commit_ts_set record;
919 
920  record.timestamp = timestamp;
921  record.nodeid = nodeid;
922  record.mainxid = mainxid;
923 
924  XLogBeginInsert();
925  XLogRegisterData((char *) &record,
926  offsetof(xl_commit_ts_set, mainxid) +
927  sizeof(TransactionId));
928  XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
929  XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
930 }
931 
932 /*
933  * CommitTS resource manager's routines
934  */
935 void
937 {
938  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
939 
940  /* Backup blocks are not used in commit_ts records */
941  Assert(!XLogRecHasAnyBlockRefs(record));
942 
943  if (info == COMMIT_TS_ZEROPAGE)
944  {
945  int pageno;
946  int slotno;
947 
948  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
949 
950  LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
951 
952  slotno = ZeroCommitTsPage(pageno, false);
954  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
955 
956  LWLockRelease(CommitTsControlLock);
957  }
958  else if (info == COMMIT_TS_TRUNCATE)
959  {
960  int pageno;
961 
962  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
963 
964  /*
965  * During XLOG replay, latest_page_number isn't set up yet; insert a
966  * suitable value to bypass the sanity test in SimpleLruTruncate.
967  */
968  CommitTsCtl->shared->latest_page_number = pageno;
969 
971  }
972  else if (info == COMMIT_TS_SETTS)
973  {
974  xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
975  int nsubxids;
976  TransactionId *subxids;
977 
978  nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
979  sizeof(TransactionId));
980  if (nsubxids > 0)
981  {
982  subxids = palloc(sizeof(TransactionId) * nsubxids);
983  memcpy(subxids,
985  sizeof(TransactionId) * nsubxids);
986  }
987  else
988  subxids = NULL;
989 
990  TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
991  setts->timestamp, setts->nodeid, true);
992  if (subxids)
993  pfree(subxids);
994  }
995  else
996  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
997 }
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:49
#define TIMESTAMPTZOID
Definition: pg_type.h:513
#define PG_GETARG_UINT32(n)
Definition: fmgr.h:226
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:94
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:413
int errhint(const char *fmt,...)
Definition: elog.c:987
#define COMMIT_TS_SETTS
Definition: commit_ts.h:51
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:393
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:829
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1340
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:914
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:62
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:230
#define CommitTsCtl
Definition: commit_ts.c:78
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:534
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1155
bool InRecovery
Definition: xlog.c:187
#define Min(x, y)
Definition: c.h:798
unsigned char uint8
Definition: c.h:263
uint16 RepOriginId
Definition: xlogdefs.h:51
void StartupCommitTs(void)
Definition: commit_ts.c:551
int errcode(int sqlerrcode)
Definition: elog.c:575
#define XIDOID
Definition: pg_type.h:336
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
bool RecoveryInProgress(void)
Definition: xlog.c:7547
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:397
#define PANIC
Definition: elog.h:53
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:144
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:60
void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1092
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:462
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:854
TimestampTz timestamp
Definition: commit_ts.h:55
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:561
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1774
double TimestampTz
Definition: timestamp.h:51
void pfree(void *pointer)
Definition: mcxt.c:995
#define XLogRecGetData(decoder)
Definition: xlogreader.h:201
#define FirstNormalTransactionId
Definition: transam.h:34
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:256
#define ERROR
Definition: elog.h:43
#define XLogRecGetDataLen(decoder)
Definition: xlogreader.h:202
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:65
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:584
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:334
#define TimestampTzGetDatum(X)
Definition: timestamp.h:52
TransactionId nextXid
Definition: transam.h:117
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:372
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:1081
bool track_commit_timestamp
Definition: commit_ts.c:102
#define SizeOfCommitTsSet
Definition: commit_ts.h:61
bool IsUnderPostmaster
Definition: globals.c:98
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:936
VariableCache ShmemVariableCache
Definition: varsup.c:34
#define InvalidTransactionId
Definition: transam.h:31
TransactionId ReadNewTransactionId(void)
Definition: varsup.c:250
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:134
static void ActivateCommitTs(void)
Definition: commit_ts.c:625
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:587
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:575
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:892
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:492
#define ereport(elevel, rest)
Definition: elog.h:122
struct CommitTimestampEntry CommitTimestampEntry
TransactionId oldestCommitTsXid
Definition: transam.h:129
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:197
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:803
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
TimestampTz time
Definition: commit_ts.c:58
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:408
#define TransactionIdGetDatum(X)
Definition: postgres.h:529
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1308
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:281
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:68
uintptr_t Datum
Definition: postgres.h:374
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:297
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:50
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:874
TransactionId xidLastCommit
Definition: commit_ts.c:93
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:464
void CheckPointCommitTs(void)
Definition: commit_ts.c:747
#define Max(x, y)
Definition: c.h:792
TransactionId mainxid
Definition: commit_ts.h:57
void ShutdownCommitTs(void)
Definition: commit_ts.c:737
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:667
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:765
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1363
TransactionId newestCommitTsXid
Definition: transam.h:130
size_t Size
Definition: c.h:352
Size CommitTsShmemSize(void)
Definition: commit_ts.c:471
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1167
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
double timestamp
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:40
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:70
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:76
#define InvalidRepOriginId
Definition: origin.h:34
static void DeactivateCommitTs(void)
Definition: commit_ts.c:699
static Datum values[MAXATTR]
Definition: bootstrap.c:160
void * palloc(Size size)
Definition: mcxt.c:894
int errmsg(const char *fmt,...)
Definition: elog.c:797
RepOriginId nodeid
Definition: commit_ts.h:56
int i
int NBuffers
Definition: globals.c:120
static void WriteTruncateXlogRec(int pageno)
Definition: commit_ts.c:903
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:203
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
#define elog
Definition: elog.h:218
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void BootStrapCommitTs(void)
Definition: commit_ts.c:515
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
RepOriginId nodeid
Definition: commit_ts.c:59
void XLogBeginInsert(void)
Definition: xloginsert.c:120
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid, bool write_xlog)
Definition: commit_ts.c:144
CommitTimestampShared * commitTsShared
Definition: commit_ts.c:98
int16 AttrNumber
Definition: attnum.h:21
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:381
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:260
#define PG_RETURN_NULL()
Definition: fmgr.h:289
#define offsetof(type, field)
Definition: c.h:547
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:360
void CommitTsShmemInit(void)
Definition: commit_ts.c:482
struct CommitTimestampShared CommitTimestampShared
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:164