105 #include <sys/stat.h>
282 memset(&hash_ctl, 0,
sizeof(hash_ctl));
429 memset(&hashkey, 0,
sizeof(hashkey));
481 old_tid = old_tuple->
t_self;
490 new_tid = new_tuple->
t_self;
511 memset(&hashkey, 0,
sizeof(hashkey));
513 hashkey.
tid = old_tid;
518 if (unresolved !=
NULL)
527 new_tuple = unresolved->
tuple;
599 memset(&hashkey, 0,
sizeof(hashkey));
606 if (unresolved !=
NULL)
665 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
666 errmsg(
"row is too big: size %zu, maximum size %zu",
678 if (len + saveFreeSpace > pageFreeSpace)
832 memset(&hash_ctl, 0,
sizeof(hash_ctl));
860 elog(
DEBUG1,
"flushing %u logical rewrite mapping entries",
891 waldata_start = waldata =
palloc(len);
902 memcpy(waldata, &pmap->
map,
sizeof(pmap->
map));
903 waldata +=
sizeof(pmap->
map);
915 Assert(waldata == waldata_start + len);
925 errmsg(
"could not write to file \"%s\", wrote %d of %d: %m", src->
path,
936 pfree(waldata_start);
965 errmsg(
"could not fsync file \"%s\": %m", src->
path)));
1013 memcpy(src->
path, path,
sizeof(path));
1015 O_CREAT | O_EXCL | O_WRONLY |
PG_BINARY,
1020 errmsg(
"could not create file \"%s\": %m", path)));
1050 bool do_log_xmin =
false;
1051 bool do_log_xmax =
false;
1086 if (!do_log_xmin && !do_log_xmax)
1140 errmsg(
"could not create file \"%s\": %m", path)));
1149 errmsg(
"could not truncate file \"%s\" to %u: %m",
1153 if (lseek(fd, xlrec->
offset, SEEK_SET) != xlrec->
offset)
1156 errmsg(
"could not seek to end of file \"%s\": %m",
1164 if (
write(fd, data, len) != len)
1167 errmsg(
"could not write to file \"%s\": %m", path)));
1177 errmsg(
"could not fsync file \"%s\": %m", path)));
1198 struct dirent *mapping_de;
1214 mappings_dir =
AllocateDir(
"pg_logical/mappings");
1215 while ((mapping_de =
ReadDir(mappings_dir,
"pg_logical/mappings")) !=
NULL)
1217 struct stat statbuf;
1226 if (strcmp(mapping_de->
d_name,
".") == 0 ||
1227 strcmp(mapping_de->
d_name,
"..") == 0)
1231 if (
lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1235 if (strncmp(mapping_de->
d_name,
"map-", 4) != 0)
1239 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1242 lsn = ((uint64) hi) << 32 | lo;
1246 elog(
DEBUG1,
"removing logical rewrite file \"%s\"", path);
1250 errmsg(
"could not remove file \"%s\": %m", path)));
1264 errmsg(
"could not open file \"%s\": %m", path)));
1274 errmsg(
"could not fsync file \"%s\": %m", path)));
int FileWrite(File file, char *buffer, int amount)
#define HeapTupleHeaderGetUpdateXid(tup)
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
#define ItemPointerIsValid(pointer)
HeapTuple heap_copytuple(HeapTuple tuple)
#define TOAST_TUPLE_THRESHOLD
#define InvalidXLogRecPtr
File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode)
void MemoryContextDelete(MemoryContext context)
void end_heap_rewrite(RewriteState state)
void heap_xlog_logical_rewrite(XLogReaderState *r)
TransactionId rs_logical_xmin
OffsetNumber PageAddItem(Page page, Item item, Size size, OffsetNumber offsetNumber, bool overwrite, bool is_heap)
#define TransactionIdEquals(id1, id2)
#define dlist_foreach_modify(iter, lhead)
RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal)
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
static void logical_end_heap_rewrite(RewriteState state)
HeapTuple toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
struct RewriteStateData RewriteStateData
HeapTupleHeaderData * HeapTupleHeader
struct SMgrRelationData * rd_smgr
TransactionId rs_freeze_xid
#define XLOG_HEAP2_REWRITE
static void dlist_push_tail(dlist_head *head, dlist_node *node)
MultiXactId rs_cutoff_multi
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
int errcode(int sqlerrcode)
UnresolvedTupData * UnresolvedTup
void heap_sync(Relation rel)
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define HEAP_INSERT_SKIP_WAL
void CheckPointLogicalRewriteHeap(void)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void heap_freetuple(HeapTuple htup)
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
static int fd(const char *x, int i)
#define ALLOCSET_DEFAULT_MINSIZE
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid, TransactionId cutoff_multi)
#define RelationOpenSmgr(relation)
#define dlist_container(type, membername, ptr)
void pfree(void *pointer)
uint32 rs_num_rewrite_mappings
#define XLogRecGetData(decoder)
Size PageGetHeapFreeSpace(Page page)
#define HEAP_XMAX_INVALID
XLogRecPtr GetXLogInsertRecPtr(void)
HTAB * rs_unresolved_tups
TransactionId GetCurrentTransactionId(void)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
int errcode_for_file_access(void)
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
#define InvalidTransactionId
DIR * AllocateDir(const char *dirname)
MemoryContext CurrentMemoryContext
static void dlist_delete(dlist_node *node)
int unlink(const char *filename)
#define ereport(elevel, rest)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
#define RelationGetTargetPageFreeSpace(relation, defaultff)
HTAB * rs_logical_mappings
int CloseTransientFile(int fd)
TransactionId rs_oldest_xmin
#define PageGetItemId(page, offsetNumber)
void XLogRegisterData(char *data, int len)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
#define XLogRecGetXid(decoder)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
void * palloc0(Size size)
#define RELKIND_TOASTVALUE
#define RelationIsAccessibleInLogicalDecoding(relation)
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
#define RelationGetNumberOfBlocks(reln)
#define InvalidOffsetNumber
static void dlist_init(dlist_head *head)
TransactionId MultiXactId
void FileClose(File file)
static void logical_begin_heap_rewrite(RewriteState state)
#define Assert(condition)
HTAB * rs_old_new_tid_map
struct dirent * ReadDir(DIR *dir, const char *dirname)
#define HeapTupleHeaderGetXmin(tup)
void PageSetChecksumInplace(Page page, BlockNumber blkno)
XLogRecPtr GetRedoRecPtr(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
#define RelationNeedsWAL(relation)
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
static void logical_heap_rewrite_flush_mappings(RewriteState state)
struct RewriteMappingFile RewriteMappingFile
#define LOGICAL_REWRITE_FORMAT
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
#define HEAP_INSERT_SKIP_FSM
bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
#define ItemPointerSetInvalid(pointer)
#define HeapTupleHasExternal(tuple)
int errmsg(const char *fmt,...)
void * MemoryContextAlloc(MemoryContext context, Size size)
LogicalRewriteMappingData map
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
#define ALLOCSET_DEFAULT_INITSIZE
static void raw_heap_insert(RewriteState state, HeapTuple tup)
struct LogicalRewriteMappingData LogicalRewriteMappingData
#define ALLOCSET_DEFAULT_MAXSIZE
#define HEAP_DEFAULT_FILLFACTOR
struct RewriteMappingDataEntry RewriteMappingDataEntry
#define TransactionIdIsNormal(xid)
void XLogBeginInsert(void)
void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
#define RelationGetRelid(relation)
OldToNewMappingData * OldToNewMapping
#define PageGetItem(page, itemId)
#define ItemPointerSet(pointer, blockNumber, offNum)
void PageInit(Page page, Size pageSize, Size specialSize)