PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
test_decoding.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  * example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_type.h"
19 
20 #include "nodes/parsenodes.h"
21 
23 #include "replication/logical.h"
24 #include "replication/message.h"
25 #include "replication/origin.h"
26 
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
34 
36 
37 /* These must be available to pg_dlsym() */
38 extern void _PG_init(void);
40 
41 typedef struct
42 {
48  bool only_local;
50 
52  bool is_init);
55  ReorderBufferTXN *txn);
57  TestDecodingData *data,
58  ReorderBufferTXN *txn,
59  bool last_write);
61  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
63  ReorderBufferTXN *txn, Relation rel,
64  ReorderBufferChange *change);
66  RepOriginId origin_id);
68  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
69  bool transactional, const char *prefix,
70  Size sz, const char *message);
71 
72 void
73 _PG_init(void)
74 {
75  /* other plugins can perform things here */
76 }
77 
78 /* specify output plugin callbacks */
79 void
81 {
83 
91 }
92 
93 
94 /* initialize this plugin */
95 static void
97  bool is_init)
98 {
100  TestDecodingData *data;
101 
102  data = palloc0(sizeof(TestDecodingData));
104  "text conversion context",
108  data->include_xids = true;
109  data->include_timestamp = false;
110  data->skip_empty_xacts = false;
111  data->only_local = false;
112 
113  ctx->output_plugin_private = data;
114 
116 
117  foreach(option, ctx->output_plugin_options)
118  {
119  DefElem *elem = lfirst(option);
120 
121  Assert(elem->arg == NULL || IsA(elem->arg, String));
122 
123  if (strcmp(elem->defname, "include-xids") == 0)
124  {
125  /* if option does not provide a value, it means its value is true */
126  if (elem->arg == NULL)
127  data->include_xids = true;
128  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
129  ereport(ERROR,
130  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
131  errmsg("could not parse value \"%s\" for parameter \"%s\"",
132  strVal(elem->arg), elem->defname)));
133  }
134  else if (strcmp(elem->defname, "include-timestamp") == 0)
135  {
136  if (elem->arg == NULL)
137  data->include_timestamp = true;
138  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
139  ereport(ERROR,
140  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
141  errmsg("could not parse value \"%s\" for parameter \"%s\"",
142  strVal(elem->arg), elem->defname)));
143  }
144  else if (strcmp(elem->defname, "force-binary") == 0)
145  {
146  bool force_binary;
147 
148  if (elem->arg == NULL)
149  continue;
150  else if (!parse_bool(strVal(elem->arg), &force_binary))
151  ereport(ERROR,
152  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
153  errmsg("could not parse value \"%s\" for parameter \"%s\"",
154  strVal(elem->arg), elem->defname)));
155 
156  if (force_binary)
158  }
159  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
160  {
161 
162  if (elem->arg == NULL)
163  data->skip_empty_xacts = true;
164  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
165  ereport(ERROR,
166  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
167  errmsg("could not parse value \"%s\" for parameter \"%s\"",
168  strVal(elem->arg), elem->defname)));
169  }
170  else if (strcmp(elem->defname, "only-local") == 0)
171  {
172 
173  if (elem->arg == NULL)
174  data->only_local = true;
175  else if (!parse_bool(strVal(elem->arg), &data->only_local))
176  ereport(ERROR,
177  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
178  errmsg("could not parse value \"%s\" for parameter \"%s\"",
179  strVal(elem->arg), elem->defname)));
180  }
181  else
182  {
183  ereport(ERROR,
184  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
185  errmsg("option \"%s\" = \"%s\" is unknown",
186  elem->defname,
187  elem->arg ? strVal(elem->arg) : "(null)")));
188  }
189  }
190 }
191 
192 /* cleanup this plugin's resources */
193 static void
195 {
197 
198  /* cleanup our own resources via memory context reset */
200 }
201 
202 /* BEGIN callback */
203 static void
205 {
207 
208  data->xact_wrote_changes = false;
209  if (data->skip_empty_xacts)
210  return;
211 
212  pg_output_begin(ctx, data, txn, true);
213 }
214 
215 static void
217 {
218  OutputPluginPrepareWrite(ctx, last_write);
219  if (data->include_xids)
220  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
221  else
222  appendStringInfoString(ctx->out, "BEGIN");
223  OutputPluginWrite(ctx, last_write);
224 }
225 
226 /* COMMIT callback */
227 static void
229  XLogRecPtr commit_lsn)
230 {
232 
233  if (data->skip_empty_xacts && !data->xact_wrote_changes)
234  return;
235 
236  OutputPluginPrepareWrite(ctx, true);
237  if (data->include_xids)
238  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
239  else
240  appendStringInfoString(ctx->out, "COMMIT");
241 
242  if (data->include_timestamp)
243  appendStringInfo(ctx->out, " (at %s)",
245 
246  OutputPluginWrite(ctx, true);
247 }
248 
249 static bool
251  RepOriginId origin_id)
252 {
254 
255  if (data->only_local && origin_id != InvalidRepOriginId)
256  return true;
257  return false;
258 }
259 
260 /*
261  * Print literal `outputstr' already represented as string of type `typid'
262  * into stringbuf `s'.
263  *
264  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
265  * if standard_conforming_strings were enabled.
266  */
267 static void
268 print_literal(StringInfo s, Oid typid, char *outputstr)
269 {
270  const char *valptr;
271 
272  switch (typid)
273  {
274  case INT2OID:
275  case INT4OID:
276  case INT8OID:
277  case OIDOID:
278  case FLOAT4OID:
279  case FLOAT8OID:
280  case NUMERICOID:
281  /* NB: We don't care about Inf, NaN et al. */
282  appendStringInfoString(s, outputstr);
283  break;
284 
285  case BITOID:
286  case VARBITOID:
287  appendStringInfo(s, "B'%s'", outputstr);
288  break;
289 
290  case BOOLOID:
291  if (strcmp(outputstr, "t") == 0)
292  appendStringInfoString(s, "true");
293  else
294  appendStringInfoString(s, "false");
295  break;
296 
297  default:
298  appendStringInfoChar(s, '\'');
299  for (valptr = outputstr; *valptr; valptr++)
300  {
301  char ch = *valptr;
302 
303  if (SQL_STR_DOUBLE(ch, false))
304  appendStringInfoChar(s, ch);
305  appendStringInfoChar(s, ch);
306  }
307  appendStringInfoChar(s, '\'');
308  break;
309  }
310 }
311 
312 /* print the tuple 'tuple' into the StringInfo s */
313 static void
314 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
315 {
316  int natt;
317  Oid oid;
318 
319  /* print oid of tuple, it's not included in the TupleDesc */
320  if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
321  {
322  appendStringInfo(s, " oid[oid]:%u", oid);
323  }
324 
325  /* print all columns individually */
326  for (natt = 0; natt < tupdesc->natts; natt++)
327  {
328  Form_pg_attribute attr; /* the attribute itself */
329  Oid typid; /* type of current attribute */
330  Oid typoutput; /* output function */
331  bool typisvarlena;
332  Datum origval; /* possibly toasted Datum */
333  bool isnull; /* column is null? */
334 
335  attr = tupdesc->attrs[natt];
336 
337  /*
338  * don't print dropped columns, we can't be sure everything is
339  * available for them
340  */
341  if (attr->attisdropped)
342  continue;
343 
344  /*
345  * Don't print system columns, oid will already have been printed if
346  * present.
347  */
348  if (attr->attnum < 0)
349  continue;
350 
351  typid = attr->atttypid;
352 
353  /* get Datum from tuple */
354  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
355 
356  if (isnull && skip_nulls)
357  continue;
358 
359  /* print attribute name */
360  appendStringInfoChar(s, ' ');
361  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
362 
363  /* print attribute type */
364  appendStringInfoChar(s, '[');
366  appendStringInfoChar(s, ']');
367 
368  /* query output function */
369  getTypeOutputInfo(typid,
370  &typoutput, &typisvarlena);
371 
372  /* print separator */
373  appendStringInfoChar(s, ':');
374 
375  /* print data */
376  if (isnull)
377  appendStringInfoString(s, "null");
378  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
379  appendStringInfoString(s, "unchanged-toast-datum");
380  else if (!typisvarlena)
381  print_literal(s, typid,
382  OidOutputFunctionCall(typoutput, origval));
383  else
384  {
385  Datum val; /* definitely detoasted Datum */
386 
387  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
388  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
389  }
390  }
391 }
392 
393 /*
394  * callback for individual changed tuples
395  */
396 static void
398  Relation relation, ReorderBufferChange *change)
399 {
400  TestDecodingData *data;
401  Form_pg_class class_form;
402  TupleDesc tupdesc;
403  MemoryContext old;
404 
405  data = ctx->output_plugin_private;
406 
407  /* output BEGIN if we haven't yet */
408  if (data->skip_empty_xacts && !data->xact_wrote_changes)
409  {
410  pg_output_begin(ctx, data, txn, false);
411  }
412  data->xact_wrote_changes = true;
413 
414  class_form = RelationGetForm(relation);
415  tupdesc = RelationGetDescr(relation);
416 
417  /* Avoid leaking memory by using and resetting our own context */
418  old = MemoryContextSwitchTo(data->context);
419 
420  OutputPluginPrepareWrite(ctx, true);
421 
422  appendStringInfoString(ctx->out, "table ");
427  NameStr(class_form->relname)));
428  appendStringInfoChar(ctx->out, ':');
429 
430  switch (change->action)
431  {
433  appendStringInfoString(ctx->out, " INSERT:");
434  if (change->data.tp.newtuple == NULL)
435  appendStringInfoString(ctx->out, " (no-tuple-data)");
436  else
437  tuple_to_stringinfo(ctx->out, tupdesc,
438  &change->data.tp.newtuple->tuple,
439  false);
440  break;
442  appendStringInfoString(ctx->out, " UPDATE:");
443  if (change->data.tp.oldtuple != NULL)
444  {
445  appendStringInfoString(ctx->out, " old-key:");
446  tuple_to_stringinfo(ctx->out, tupdesc,
447  &change->data.tp.oldtuple->tuple,
448  true);
449  appendStringInfoString(ctx->out, " new-tuple:");
450  }
451 
452  if (change->data.tp.newtuple == NULL)
453  appendStringInfoString(ctx->out, " (no-tuple-data)");
454  else
455  tuple_to_stringinfo(ctx->out, tupdesc,
456  &change->data.tp.newtuple->tuple,
457  false);
458  break;
460  appendStringInfoString(ctx->out, " DELETE:");
461 
462  /* if there was no PK, we only know that a delete happened */
463  if (change->data.tp.oldtuple == NULL)
464  appendStringInfoString(ctx->out, " (no-tuple-data)");
465  /* In DELETE, only the replica identity is present; display that */
466  else
467  tuple_to_stringinfo(ctx->out, tupdesc,
468  &change->data.tp.oldtuple->tuple,
469  true);
470  break;
471  default:
472  Assert(false);
473  }
474 
477 
478  OutputPluginWrite(ctx, true);
479 }
480 
481 static void
483  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
484  const char *prefix, Size sz, const char *message)
485 {
486  OutputPluginPrepareWrite(ctx, true);
487  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
488  transactional, prefix, sz);
489  appendBinaryStringInfo(ctx->out, message, sz);
490  OutputPluginWrite(ctx, true);
491 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
TimestampTz commit_time
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:317
#define IsA(nodeptr, _type_)
Definition: nodes.h:542
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:203
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:35
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2600
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:9518
#define RelationGetDescr(relation)
Definition: rel.h:353
#define OIDOID
Definition: pg_type.h:328
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
#define PointerGetDatum(X)
Definition: postgres.h:564
#define NUMERICOID
Definition: pg_type.h:542
#define RelationGetForm(relation)
Definition: rel.h:335
PG_MODULE_MAGIC
Definition: test_decoding.c:35
LogicalDecodeMessageCB message_cb
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1718
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define INT4OID
Definition: pg_type.h:316
uint16 RepOriginId
Definition: xlogdefs.h:51
#define VARBITOID
Definition: pg_type.h:534
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
void * output_plugin_private
Definition: logical.h:62
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
MemoryContext context
Definition: logical.h:32
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:138
List * output_plugin_options
Definition: logical.h:46
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
int natts
Definition: tupdesc.h:73
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:142
HeapTupleHeader t_data
Definition: htup.h:67
OutputPluginOutputType output_type
Definition: output_plugin.h:28
struct ReorderBufferChange::@51::@52 tp
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
static void print_literal(StringInfo s, Oid typid, char *outputstr)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
#define INT2OID
Definition: pg_type.h:308
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3006
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
#define ereport(elevel, rest)
Definition: elog.h:122
Node * arg
Definition: parsenodes.h:666
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:9604
MemoryContext context
Definition: test_decoding.c:43
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:744
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
#define FLOAT4OID
Definition: pg_type.h:408
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:436
void * palloc0(Size size)
Definition: mcxt.c:923
LogicalDecodeChangeCB change_cb
uintptr_t Datum
Definition: postgres.h:374
TransactionId xid
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: test_decoding.c:96
#define InvalidOid
Definition: postgres_ext.h:36
#define INT8OID
Definition: pg_type.h:304
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:474
#define BITOID
Definition: pg_type.h:530
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:667
#define lfirst(lc)
Definition: pg_list.h:106
union ReorderBufferChange::@51 data
size_t Size
Definition: c.h:352
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
LogicalDecodeShutdownCB shutdown_cb
#define FLOAT8OID
Definition: pg_type.h:411
void _PG_init(void)
Definition: test_decoding.c:73
#define BOOLOID
Definition: pg_type.h:288
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:34
FormData_pg_class * Form_pg_class
Definition: pg_class.h:92
#define HeapTupleHeaderGetOid(tup)
Definition: htup_details.h:440
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:2048
int errmsg(const char *fmt,...)
Definition: elog.c:797
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:487
StringInfo out
Definition: logical.h:57
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:143
#define NameStr(name)
Definition: c.h:494
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:502
LogicalDecodeBeginCB begin_cb
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:196
char * defname
Definition: parsenodes.h:665
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:144
LogicalDecodeFilterByOriginCB filter_by_origin_cb
#define RelationGetRelid(relation)
Definition: rel.h:341
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
long val
Definition: informix.c:689
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: test_decoding.c:80
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1804
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:775