67 const char *prefix,
Size message_size,
const char *message);
82 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
83 errmsg(
"logical decoding requires wal_level >= logical")));
87 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
88 errmsg(
"logical decoding requires a database connection")));
105 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
106 errmsg(
"logical decoding cannot be used while in recovery")));
130 "Logical Decoding Context",
169 (
errcode(ERRCODE_OUT_OF_MEMORY),
170 errmsg(
"out of memory")));
188 ctx->
write = do_write;
214 List *output_plugin_options,
229 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
232 elog(
ERROR,
"cannot initialize logical decoding without a specified plugin");
237 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
238 errmsg(
"cannot use physical replication slot for logical decoding")));
242 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
243 errmsg(
"replication slot \"%s\" was not created in this database",
249 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
250 errmsg(
"cannot create logical replication slot in transaction that has performed writes")));
293 read_page, prepare_write, do_write);
330 List *output_plugin_options,
344 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
349 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
350 (
errmsg(
"cannot use physical replication slot for logical decoding"))));
354 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
355 (
errmsg(
"replication slot \"%s\" was not created in this database",
363 else if (start_lsn < slot->data.confirmed_flush)
373 elog(
DEBUG1,
"cannot stream from %X/%X, minimum is %X/%X, forwarding",
383 read_page, prepare_write, do_write);
392 (
errmsg(
"starting logical decoding for slot \"%s\"",
394 errdetail(
"streaming transactions committing after %X/%X, reading WAL from %X/%X",
423 elog(
DEBUG1,
"searching for logical decoding starting point, starting at %X/%X",
477 elog(
ERROR,
"writes are only accepted in commit, begin and change callbacks");
490 elog(
ERROR,
"OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
508 if (plugin_init ==
NULL)
509 elog(
ERROR,
"output plugins have to declare the _PG_output_plugin_init symbol");
512 plugin_init(callbacks);
515 elog(
ERROR,
"output plugins have to register a begin callback");
517 elog(
ERROR,
"output plugins have to register a change callback");
519 elog(
ERROR,
"output plugins have to register a commit callback");
529 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
536 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback",
553 errcallback.
arg = (
void *) &state;
578 errcallback.
arg = (
void *) &state;
609 errcallback.
arg = (
void *) &state;
638 errcallback.
arg = (
void *) &state;
667 errcallback.
arg = (
void *) &state;
701 errcallback.
arg = (
void *) &state;
720 const char *prefix,
Size message_size,
const char *message)
734 errcallback.
arg = (
void *) &state;
745 message_size, message);
762 bool updated_xmin =
false;
784 else if (current_lsn <= slot->data.confirmed_flush)
819 bool updated_lsn =
false;
831 if (restart_lsn <= slot->data.restart_lsn)
839 else if (current_lsn <= slot->data.confirmed_flush)
858 elog(
DEBUG1,
"got new restart lsn %X/%X at %X/%X",
864 elog(
DEBUG1,
"failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
894 bool updated_xmin =
false;
895 bool updated_restart =
false;
932 updated_restart =
true;
938 if (updated_xmin || updated_restart)
942 elog(
DEBUG1,
"updated xmin: %u restart: %u", updated_xmin, updated_restart);
XLogReaderState * XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
static const char * plugin
void CheckSlotRequirements(void)
TransactionId candidate_catalog_xmin
#define InvalidXLogRecPtr
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
struct ReorderBuffer * reorder
#define PROC_IN_LOGICAL_DECODING
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin)
bool DecodingContextReady(LogicalDecodingContext *ctx)
struct LogicalErrorCallbackState LogicalErrorCallbackState
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
OutputPluginOptions options
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
StringInfo makeStringInfo(void)
LogicalDecodeMessageCB message_cb
OutputPluginCallbacks callbacks
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void ReorderBufferFree(ReorderBuffer *rb)
int errcode(int sqlerrcode)
bool IsTransactionOrTransactionBlock(void)
void ReplicationSlotSave(void)
List * output_plugin_options
ReorderBufferCommitCB commit
ReplicationSlotPersistentData data
bool RecoveryInProgress(void)
struct ErrorContextCallback * previous
#define SlotIsPhysical(slot)
LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite
const char * callback_name
#define ALLOCSET_DEFAULT_MINSIZE
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
XLogRecPtr confirmed_flush
XLogRecPtr write_location
void LWLockRelease(LWLock *lock)
ErrorContextCallback * error_context_stack
#define SpinLockAcquire(lock)
void ReplicationSlotReserveWal(void)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
void ReplicationSlotsComputeRequiredLSN(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
LogicalOutputPluginWriterPrepareWrite prepare_write
LogicalDecodeCommitCB commit_cb
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx)
XLogRecPtr candidate_restart_valid
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
void(* LogicalOutputPluginWriterWrite)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool last_write)
int errdetail(const char *fmt,...)
TransactionId catalog_xmin
#define InvalidTransactionId
MemoryContext CurrentMemoryContext
bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
TransactionId GetTopTransactionIdIfAny(void)
ReorderBufferMessageCB message
#define ereport(elevel, rest)
static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
PGFunction load_external_function(char *filename, char *funcname, bool signalNotFound, void **filehandle)
void XLogReaderFree(XLogReaderState *state)
#define SpinLockRelease(lock)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
void * palloc0(Size size)
LogicalDecodeChangeCB change_cb
TransactionId effective_catalog_xmin
void FreeSnapshotBuilder(SnapBuild *builder)
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn)
struct SnapBuild * snapshot_builder
ReplicationSlot * MyReplicationSlot
#define Assert(condition)
#define StrNCpy(dst, src, len)
void FreeDecodingContext(LogicalDecodingContext *ctx)
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
int(* XLogPageReadCB)(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI)
LogicalDecodeShutdownCB shutdown_cb
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool IsTransactionState(void)
ReorderBuffer * ReorderBufferAllocate(void)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
LogicalDecodeStartupCB startup_cb
TransactionId GetOldestSafeDecodingTransactionId(void)
XLogRecPtr candidate_xmin_lsn
XLogRecPtr report_location
void(* callback)(void *arg)
int errmsg(const char *fmt,...)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
LogicalOutputPluginWriterWrite write
#define ALLOCSET_DEFAULT_INITSIZE
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
LogicalDecodeBeginCB begin_cb
#define ALLOCSET_DEFAULT_MAXSIZE
#define CHECK_FOR_INTERRUPTS()
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define TransactionIdIsValid(xid)
LogicalDecodingContext * ctx
ReorderBufferBeginCB begin
static void output_plugin_error_callback(void *arg)
LogicalDecodeFilterByOriginCB filter_by_origin_cb
void CheckLogicalDecodingRequirements(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
XLogRecPtr candidate_restart_lsn
void ReplicationSlotMarkDirty(void)