69 bool transactional,
const char *prefix,
70 Size sz,
const char *message);
104 "text conversion context",
123 if (strcmp(elem->
defname,
"include-xids") == 0)
130 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
131 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
134 else if (strcmp(elem->
defname,
"include-timestamp") == 0)
140 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
141 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
144 else if (strcmp(elem->
defname,
"force-binary") == 0)
152 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
153 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
159 else if (strcmp(elem->
defname,
"skip-empty-xacts") == 0)
166 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
167 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
170 else if (strcmp(elem->
defname,
"only-local") == 0)
177 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
178 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
184 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
185 errmsg(
"option \"%s\" = \"%s\" is unknown",
291 if (strcmp(outputstr,
"t") == 0)
299 for (valptr = outputstr; *valptr; valptr++)
326 for (natt = 0; natt < tupdesc->
natts; natt++)
335 attr = tupdesc->
attrs[natt];
341 if (attr->attisdropped)
348 if (attr->attnum < 0)
351 typid = attr->atttypid;
354 origval =
heap_getattr(tuple, natt + 1, tupdesc, &isnull);
356 if (isnull && skip_nulls)
370 &typoutput, &typisvarlena);
380 else if (!typisvarlena)
427 NameStr(class_form->relname)));
438 &change->
data.
tp.newtuple->tuple,
447 &change->
data.
tp.oldtuple->tuple,
456 &change->
data.
tp.newtuple->tuple,
468 &change->
data.
tp.oldtuple->tuple,
484 const char *prefix,
Size sz,
const char *message)
488 transactional, prefix, sz);
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
#define IsA(nodeptr, _type_)
void MemoryContextDelete(MemoryContext context)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
const char * quote_identifier(const char *ident)
#define RelationGetDescr(relation)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
#define PointerGetDatum(X)
#define RelationGetForm(relation)
LogicalDecodeMessageCB message_cb
Oid get_rel_namespace(Oid relid)
Form_pg_attribute * attrs
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
int errcode(int sqlerrcode)
void * output_plugin_private
void MemoryContextReset(MemoryContext context)
List * output_plugin_options
bool parse_bool(const char *value, bool *result)
enum ReorderBufferChangeType action
#define ALLOCSET_DEFAULT_MINSIZE
OutputPluginOutputType output_type
struct ReorderBufferChange::@51::@52 tp
void appendStringInfo(StringInfo str, const char *fmt,...)
static void print_literal(StringInfo s, Oid typid, char *outputstr)
LogicalDecodeCommitCB commit_cb
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void appendStringInfoString(StringInfo str, const char *s)
char * get_namespace_name(Oid nspid)
FormData_pg_attribute * Form_pg_attribute
#define ereport(elevel, rest)
void appendStringInfoChar(StringInfo str, char ch)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
#define heap_getattr(tup, attnum, tupleDesc, isnull)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
void * palloc0(Size size)
LogicalDecodeChangeCB change_cb
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
#define Assert(condition)
union ReorderBufferChange::@51 data
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
FormData_pg_class * Form_pg_class
#define HeapTupleHeaderGetOid(tup)
char * OidOutputFunctionCall(Oid functionId, Datum val)
int errmsg(const char *fmt,...)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
#define ALLOCSET_DEFAULT_INITSIZE
#define SQL_STR_DOUBLE(ch, escape_backslash)
LogicalDecodeBeginCB begin_cb
#define PG_DETOAST_DATUM(datum)
#define ALLOCSET_DEFAULT_MAXSIZE
LogicalDecodeFilterByOriginCB filter_by_origin_cb
#define RelationGetRelid(relation)
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
const char * timestamptz_to_str(TimestampTz t)
#define AssertVariableIsOfType(varname, typename)