PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
tqueue.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * tqueue.c
4  * Use shm_mq to send & receive tuples between parallel backends
5  *
6  * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7  * under the hood, writes tuples from the executor to a shm_mq. If
8  * necessary, it also writes control messages describing transient
9  * record types used within the tuple.
10  *
11  * A TupleQueueReader reads tuples, and if any are sent control messages,
12  * from a shm_mq and returns the tuples. If transient record types are
13  * in use, it registers those types based on the received control messages
14  * and rewrites the typemods sent by the remote side to the corresponding
15  * local record typemods.
16  *
17  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
18  * Portions Copyright (c) 1994, Regents of the University of California
19  *
20  * IDENTIFICATION
21  * src/backend/executor/tqueue.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 
26 #include "postgres.h"
27 
28 #include "access/htup_details.h"
29 #include "catalog/pg_type.h"
30 #include "executor/tqueue.h"
31 #include "funcapi.h"
32 #include "lib/stringinfo.h"
33 #include "miscadmin.h"
34 #include "utils/array.h"
35 #include "utils/lsyscache.h"
36 #include "utils/memutils.h"
37 #include "utils/rangetypes.h"
38 #include "utils/syscache.h"
39 #include "utils/typcache.h"
40 
41 typedef enum
42 {
43  TQUEUE_REMAP_NONE, /* no special processing required */
44  TQUEUE_REMAP_ARRAY, /* array */
45  TQUEUE_REMAP_RANGE, /* range */
46  TQUEUE_REMAP_RECORD /* composite type, named or anonymous */
47 } RemapClass;
48 
49 typedef struct
50 {
51  int natts;
52  RemapClass mapping[FLEXIBLE_ARRAY_MEMBER];
53 } RemapInfo;
54 
55 typedef struct
56 {
61  char mode;
65 
66 typedef struct RecordTypemodMap
67 {
71 
73 {
75  char mode;
79 };
80 
81 #define TUPLE_QUEUE_MODE_CONTROL 'c'
82 #define TUPLE_QUEUE_MODE_DATA 'd'
83 
84 static void tqueueWalk(TQueueDestReceiver * tqueue, RemapClass walktype,
85  Datum value);
86 static void tqueueWalkRecord(TQueueDestReceiver * tqueue, Datum value);
87 static void tqueueWalkArray(TQueueDestReceiver * tqueue, Datum value);
88 static void tqueueWalkRange(TQueueDestReceiver * tqueue, Datum value);
89 static void tqueueSendTypmodInfo(TQueueDestReceiver * tqueue, int typmod,
90  TupleDesc tupledesc);
92  Size nbytes, char *data);
94  Size nbytes, HeapTupleHeader data);
96  TupleDesc tupledesc, RemapInfo * remapinfo,
97  HeapTuple tuple);
98 static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass,
99  Datum value);
103 static RemapClass GetRemapClass(Oid typeid);
104 static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
105 
106 /*
107  * Receive a tuple.
108  *
109  * This is, at core, pretty simple: just send the tuple to the designated
110  * shm_mq. The complicated part is that if the tuple contains transient
111  * record types (see lookup_rowtype_tupdesc), we need to send control
112  * information to the shm_mq receiver so that those typemods can be correctly
113  * interpreted, as they are merely held in a backend-local cache. Worse, the
114  * record type may not at the top level: we could have a range over an array
115  * type over a range type over a range type over an array type over a record,
116  * or something like that.
117  */
118 static void
120 {
121  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
122  TupleDesc tupledesc = slot->tts_tupleDescriptor;
123  HeapTuple tuple;
124 
125  /*
126  * Test to see whether the tupledesc has changed; if so, set up for the
127  * new tupledesc. This is a strange test both because the executor really
128  * shouldn't change the tupledesc, and also because it would be unsafe if
129  * the old tupledesc could be freed and a new one allocated at the same
130  * address. But since some very old code in printtup.c uses a similar
131  * test, we adopt it here as well.
132  */
133  if (tqueue->tupledesc != tupledesc)
134  {
135  if (tqueue->remapinfo != NULL)
136  pfree(tqueue->remapinfo);
137  tqueue->remapinfo = BuildRemapInfo(tupledesc);
138  tqueue->tupledesc = tupledesc;
139  }
140 
141  tuple = ExecMaterializeSlot(slot);
142 
143  /*
144  * When, because of the types being transmitted, no record typemod mapping
145  * can be needed, we can skip a good deal of work.
146  */
147  if (tqueue->remapinfo != NULL)
148  {
149  RemapInfo *remapinfo = tqueue->remapinfo;
150  AttrNumber i;
151  MemoryContext oldcontext = NULL;
152 
153  /* Deform the tuple so we can examine it, if not done already. */
154  slot_getallattrs(slot);
155 
156  /* Iterate over each attribute and search it for transient typemods. */
157  Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts);
158  for (i = 0; i < remapinfo->natts; ++i)
159  {
160  /* Ignore nulls and types that don't need special handling. */
161  if (slot->tts_isnull[i] ||
162  remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
163  continue;
164 
165  /* Switch to temporary memory context to avoid leaking. */
166  if (oldcontext == NULL)
167  {
168  if (tqueue->tmpcontext == NULL)
169  tqueue->tmpcontext =
171  "tqueue temporary context",
175  oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
176  }
177 
178  /* Invoke the appropriate walker function. */
179  tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]);
180  }
181 
182  /* If we used the temp context, reset it and restore prior context. */
183  if (oldcontext != NULL)
184  {
185  MemoryContextSwitchTo(oldcontext);
187  }
188 
189  /* If we entered control mode, switch back to data mode. */
190  if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
191  {
192  tqueue->mode = TUPLE_QUEUE_MODE_DATA;
193  shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
194  }
195  }
196 
197  /* Send the tuple itself. */
198  shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
199 }
200 
201 /*
202  * Invoke the appropriate walker function based on the given RemapClass.
203  */
204 static void
206 {
208 
209  switch (walktype)
210  {
211  case TQUEUE_REMAP_NONE:
212  break;
213  case TQUEUE_REMAP_ARRAY:
214  tqueueWalkArray(tqueue, value);
215  break;
216  case TQUEUE_REMAP_RANGE:
217  tqueueWalkRange(tqueue, value);
218  break;
219  case TQUEUE_REMAP_RECORD:
220  tqueueWalkRecord(tqueue, value);
221  break;
222  }
223 }
224 
225 /*
226  * Walk a record and send control messages for transient record types
227  * contained therein.
228  */
229 static void
231 {
232  HeapTupleHeader tup;
233  Oid typeid;
234  Oid typmod;
235  TupleDesc tupledesc;
236  RemapInfo *remapinfo;
237 
238  /* Extract typmod from tuple. */
239  tup = DatumGetHeapTupleHeader(value);
240  typeid = HeapTupleHeaderGetTypeId(tup);
241  typmod = HeapTupleHeaderGetTypMod(tup);
242 
243  /* Look up tuple descriptor in typecache. */
244  tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
245 
246  /*
247  * If this is a transient record time, send its TupleDesc as a control
248  * message. (tqueueSendTypemodInfo is smart enough to do this only once
249  * per typmod.)
250  */
251  if (typeid == RECORDOID)
252  tqueueSendTypmodInfo(tqueue, typmod, tupledesc);
253 
254  /*
255  * Build the remap information for this tupledesc. We might want to think
256  * about keeping a cache of this information keyed by typeid and typemod,
257  * but let's keep it simple for now.
258  */
259  remapinfo = BuildRemapInfo(tupledesc);
260 
261  /*
262  * If remapping is required, deform the tuple and process each field. When
263  * BuildRemapInfo is null, the data types are such that there can be no
264  * transient record types here, so we can skip all this work.
265  */
266  if (remapinfo != NULL)
267  {
268  Datum *values;
269  bool *isnull;
270  HeapTupleData tdata;
271  AttrNumber i;
272 
273  /* Deform the tuple so we can check each column within. */
274  values = palloc(tupledesc->natts * sizeof(Datum));
275  isnull = palloc(tupledesc->natts * sizeof(bool));
277  ItemPointerSetInvalid(&(tdata.t_self));
278  tdata.t_tableOid = InvalidOid;
279  tdata.t_data = tup;
280  heap_deform_tuple(&tdata, tupledesc, values, isnull);
281 
282  /* Recursively check each non-NULL attribute. */
283  for (i = 0; i < tupledesc->natts; ++i)
284  if (!isnull[i])
285  tqueueWalk(tqueue, remapinfo->mapping[i], values[i]);
286  }
287 
288  /* Release reference count acquired by lookup_rowtype_tupdesc. */
289  DecrTupleDescRefCount(tupledesc);
290 }
291 
292 /*
293  * Walk a record and send control messages for transient record types
294  * contained therein.
295  */
296 static void
298 {
299  ArrayType *arr = DatumGetArrayTypeP(value);
300  Oid typeid = ARR_ELEMTYPE(arr);
301  RemapClass remapclass;
302  int16 typlen;
303  bool typbyval;
304  char typalign;
305  Datum *elem_values;
306  bool *elem_nulls;
307  int num_elems;
308  int i;
309 
310  remapclass = GetRemapClass(typeid);
311 
312  /*
313  * If the elements of the array don't need to be walked, we shouldn't have
314  * been called in the first place: GetRemapClass should have returned NULL
315  * when asked about this array type.
316  */
317  Assert(remapclass != TQUEUE_REMAP_NONE);
318 
319  /* Deconstruct the array. */
320  get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
321  deconstruct_array(arr, typeid, typlen, typbyval, typalign,
322  &elem_values, &elem_nulls, &num_elems);
323 
324  /* Walk each element. */
325  for (i = 0; i < num_elems; ++i)
326  if (!elem_nulls[i])
327  tqueueWalk(tqueue, remapclass, elem_values[i]);
328 }
329 
330 /*
331  * Walk a range type and send control messages for transient record types
332  * contained therein.
333  */
334 static void
336 {
338  Oid typeid = RangeTypeGetOid(range);
339  RemapClass remapclass;
340  TypeCacheEntry *typcache;
343  bool empty;
344 
345  /*
346  * Extract the lower and upper bounds. It might be worth implementing
347  * some caching scheme here so that we don't look up the same typeids in
348  * the type cache repeatedly, but for now let's keep it simple.
349  */
350  typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
351  if (typcache->rngelemtype == NULL)
352  elog(ERROR, "type %u is not a range type", typeid);
353  range_deserialize(typcache, range, &lower, &upper, &empty);
354 
355  /* Nothing to do for an empty range. */
356  if (empty)
357  return;
358 
359  /*
360  * If the range bounds don't need to be walked, we shouldn't have been
361  * called in the first place: GetRemapClass should have returned NULL when
362  * asked about this range type.
363  */
364  remapclass = GetRemapClass(typeid);
365  Assert(remapclass != TQUEUE_REMAP_NONE);
366 
367  /* Walk each bound, if present. */
368  if (!upper.infinite)
369  tqueueWalk(tqueue, remapclass, upper.val);
370  if (!lower.infinite)
371  tqueueWalk(tqueue, remapclass, lower.val);
372 }
373 
374 /*
375  * Send tuple descriptor information for a transient typemod, unless we've
376  * already done so previously.
377  */
378 static void
380  TupleDesc tupledesc)
381 {
383  bool found;
384  AttrNumber i;
385 
386  /* Initialize hash table if not done yet. */
387  if (tqueue->recordhtab == NULL)
388  {
389  HASHCTL ctl;
390 
391  ctl.keysize = sizeof(int);
392  ctl.entrysize = sizeof(int);
393  ctl.hcxt = TopMemoryContext;
394  tqueue->recordhtab = hash_create("tqueue record hashtable",
395  100, &ctl, HASH_ELEM | HASH_CONTEXT);
396  }
397 
398  /* Have we already seen this record type? If not, must report it. */
399  hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
400  if (found)
401  return;
402 
403  /* If message queue is in data mode, switch to control mode. */
404  if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
405  {
406  tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
407  shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
408  }
409 
410  /* Assemble a control message. */
411  initStringInfo(&buf);
412  appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
413  appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
414  appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
415  sizeof(bool));
416  for (i = 0; i < tupledesc->natts; ++i)
417  appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
418  sizeof(FormData_pg_attribute));
419 
420  /* Send control message. */
421  shm_mq_send(tqueue->handle, buf.len, buf.data, false);
422 }
423 
424 /*
425  * Prepare to receive tuples from executor.
426  */
427 static void
428 tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
429 {
430  /* do nothing */
431 }
432 
433 /*
434  * Clean up at end of an executor run
435  */
436 static void
438 {
439  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
440 
442 }
443 
444 /*
445  * Destroy receiver when done with it
446  */
447 static void
449 {
450  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
451 
452  if (tqueue->tmpcontext != NULL)
454  if (tqueue->recordhtab != NULL)
455  hash_destroy(tqueue->recordhtab);
456  if (tqueue->remapinfo != NULL)
457  pfree(tqueue->remapinfo);
458  pfree(self);
459 }
460 
461 /*
462  * Create a DestReceiver that writes tuples to a tuple queue.
463  */
464 DestReceiver *
466 {
467  TQueueDestReceiver *self;
468 
469  self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
470 
471  self->pub.receiveSlot = tqueueReceiveSlot;
472  self->pub.rStartup = tqueueStartupReceiver;
473  self->pub.rShutdown = tqueueShutdownReceiver;
474  self->pub.rDestroy = tqueueDestroyReceiver;
475  self->pub.mydest = DestTupleQueue;
476  self->handle = handle;
477  self->tmpcontext = NULL;
478  self->recordhtab = NULL;
479  self->mode = TUPLE_QUEUE_MODE_DATA;
480  self->remapinfo = NULL;
481 
482  return (DestReceiver *) self;
483 }
484 
485 /*
486  * Create a tuple queue reader.
487  */
490 {
491  TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
492 
493  reader->queue = handle;
494  reader->mode = TUPLE_QUEUE_MODE_DATA;
495  reader->tupledesc = tupledesc;
496  reader->remapinfo = BuildRemapInfo(tupledesc);
497 
498  return reader;
499 }
500 
501 /*
502  * Destroy a tuple queue reader.
503  */
504 void
506 {
508  if (reader->remapinfo != NULL)
509  pfree(reader->remapinfo);
510  pfree(reader);
511 }
512 
513 /*
514  * Fetch a tuple from a tuple queue reader.
515  *
516  * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
517  * accumulate bytes from a partially-read message, so it's useful to call
518  * this with nowait = true even if nothing is returned.
519  *
520  * The return value is NULL if there are no remaining queues or if
521  * nowait = true and no tuple is ready to return. *done, if not NULL,
522  * is set to true when queue is detached and otherwise to false.
523  */
524 HeapTuple
525 TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
526 {
527  shm_mq_result result;
528 
529  if (done != NULL)
530  *done = false;
531 
532  for (;;)
533  {
534  Size nbytes;
535  void *data;
536 
537  /* Attempt to read a message. */
538  result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
539 
540  /* If queue is detached, set *done and return NULL. */
541  if (result == SHM_MQ_DETACHED)
542  {
543  if (done != NULL)
544  *done = true;
545  return NULL;
546  }
547 
548  /* In non-blocking mode, bail out if no message ready yet. */
549  if (result == SHM_MQ_WOULD_BLOCK)
550  return NULL;
551  Assert(result == SHM_MQ_SUCCESS);
552 
553  /*
554  * OK, we got a message. Process it.
555  *
556  * One-byte messages are mode switch messages, so that we can switch
557  * between "control" and "data" mode. When in "data" mode, each
558  * message (unless exactly one byte) is a tuple. When in "control"
559  * mode, each message provides a transient-typmod-to-tupledesc mapping
560  * so we can interpret future tuples.
561  */
562  if (nbytes == 1)
563  {
564  /* Mode switch message. */
565  reader->mode = ((char *) data)[0];
566  }
567  else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
568  {
569  /* Tuple data. */
570  return TupleQueueHandleDataMessage(reader, nbytes, data);
571  }
572  else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
573  {
574  /* Control message, describing a transient record type. */
575  TupleQueueHandleControlMessage(reader, nbytes, data);
576  }
577  else
578  elog(ERROR, "invalid mode: %d", (int) reader->mode);
579  }
580 }
581 
582 /*
583  * Handle a data message - that is, a tuple - from the remote side.
584  */
585 static HeapTuple
587  Size nbytes,
588  HeapTupleHeader data)
589 {
590  HeapTupleData htup;
591 
593  htup.t_tableOid = InvalidOid;
594  htup.t_len = nbytes;
595  htup.t_data = data;
596 
597  return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo,
598  &htup);
599 }
600 
601 /*
602  * Remap tuple typmods per control information received from remote side.
603  */
604 static HeapTuple
606  RemapInfo * remapinfo, HeapTuple tuple)
607 {
608  Datum *values;
609  bool *isnull;
610  int i;
611 
612  /*
613  * If no remapping is necessary, just copy the tuple into a single
614  * palloc'd chunk, as caller will expect.
615  */
616  if (remapinfo == NULL)
617  return heap_copytuple(tuple);
618 
619  /* Deform tuple so we can remap record typmods for individual attrs. */
620  values = palloc(tupledesc->natts * sizeof(Datum));
621  isnull = palloc(tupledesc->natts * sizeof(bool));
622  heap_deform_tuple(tuple, tupledesc, values, isnull);
623  Assert(tupledesc->natts == remapinfo->natts);
624 
625  /* Recursively check each non-NULL attribute. */
626  for (i = 0; i < tupledesc->natts; ++i)
627  {
628  if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
629  continue;
630  values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]);
631  }
632 
633  /* Reform the modified tuple. */
634  return heap_form_tuple(tupledesc, values, isnull);
635 }
636 
637 /*
638  * Remap a value based on the specified remap class.
639  */
640 static Datum
642 {
644 
645  switch (remapclass)
646  {
647  case TQUEUE_REMAP_NONE:
648  /* caller probably shouldn't have called us at all, but... */
649  return value;
650 
651  case TQUEUE_REMAP_ARRAY:
652  return TupleQueueRemapArray(reader, value);
653 
654  case TQUEUE_REMAP_RANGE:
655  return TupleQueueRemapRange(reader, value);
656 
657  case TQUEUE_REMAP_RECORD:
658  return TupleQueueRemapRecord(reader, value);
659  }
660 
661  elog(ERROR, "unknown remap class: %d", (int) remapclass);
662  return (Datum) 0;
663 }
664 
665 /*
666  * Remap an array.
667  */
668 static Datum
670 {
671  ArrayType *arr = DatumGetArrayTypeP(value);
672  Oid typeid = ARR_ELEMTYPE(arr);
673  RemapClass remapclass;
674  int16 typlen;
675  bool typbyval;
676  char typalign;
677  Datum *elem_values;
678  bool *elem_nulls;
679  int num_elems;
680  int i;
681 
682  remapclass = GetRemapClass(typeid);
683 
684  /*
685  * If the elements of the array don't need to be walked, we shouldn't have
686  * been called in the first place: GetRemapClass should have returned NULL
687  * when asked about this array type.
688  */
689  Assert(remapclass != TQUEUE_REMAP_NONE);
690 
691  /* Deconstruct the array. */
692  get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
693  deconstruct_array(arr, typeid, typlen, typbyval, typalign,
694  &elem_values, &elem_nulls, &num_elems);
695 
696  /* Remap each element. */
697  for (i = 0; i < num_elems; ++i)
698  if (!elem_nulls[i])
699  elem_values[i] = TupleQueueRemap(reader, remapclass,
700  elem_values[i]);
701 
702  /* Reconstruct and return the array. */
703  arr = construct_md_array(elem_values, elem_nulls,
704  ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
705  typeid, typlen, typbyval, typalign);
706  return PointerGetDatum(arr);
707 }
708 
709 /*
710  * Remap a range type.
711  */
712 static Datum
714 {
716  Oid typeid = RangeTypeGetOid(range);
717  RemapClass remapclass;
718  TypeCacheEntry *typcache;
721  bool empty;
722 
723  /*
724  * Extract the lower and upper bounds. As in tqueueWalkRange, some
725  * caching might be a good idea here.
726  */
727  typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
728  if (typcache->rngelemtype == NULL)
729  elog(ERROR, "type %u is not a range type", typeid);
730  range_deserialize(typcache, range, &lower, &upper, &empty);
731 
732  /* Nothing to do for an empty range. */
733  if (empty)
734  return value;
735 
736  /*
737  * If the range bounds don't need to be walked, we shouldn't have been
738  * called in the first place: GetRemapClass should have returned NULL when
739  * asked about this range type.
740  */
741  remapclass = GetRemapClass(typeid);
742  Assert(remapclass != TQUEUE_REMAP_NONE);
743 
744  /* Remap each bound, if present. */
745  if (!upper.infinite)
746  upper.val = TupleQueueRemap(reader, remapclass, upper.val);
747  if (!lower.infinite)
748  lower.val = TupleQueueRemap(reader, remapclass, lower.val);
749 
750  /* And reserialize. */
751  range = range_serialize(typcache, &lower, &upper, empty);
752  return RangeTypeGetDatum(range);
753 }
754 
755 /*
756  * Remap a record.
757  */
758 static Datum
760 {
761  HeapTupleHeader tup;
762  Oid typeid;
763  int typmod;
764  RecordTypemodMap *mapent;
765  TupleDesc tupledesc;
766  RemapInfo *remapinfo;
767  HeapTupleData htup;
768  HeapTuple atup;
769 
770  /* Fetch type OID and typemod. */
771  tup = DatumGetHeapTupleHeader(value);
772  typeid = HeapTupleHeaderGetTypeId(tup);
773  typmod = HeapTupleHeaderGetTypMod(tup);
774 
775  /* If transient record, replace remote typmod with local typmod. */
776  if (typeid == RECORDOID)
777  {
778  Assert(reader->typmodmap != NULL);
779  mapent = hash_search(reader->typmodmap, &typmod,
780  HASH_FIND, NULL);
781  if (mapent == NULL)
782  elog(ERROR, "found unrecognized remote typmod %d", typmod);
783  typmod = mapent->localtypmod;
784  }
785 
786  /*
787  * Fetch tupledesc and compute remap info. We should probably cache this
788  * so that we don't have to keep recomputing it.
789  */
790  tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
791  remapinfo = BuildRemapInfo(tupledesc);
792  DecrTupleDescRefCount(tupledesc);
793 
794  /* Remap tuple. */
796  htup.t_tableOid = InvalidOid;
798  htup.t_data = tup;
799  atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
800  HeapTupleHeaderSetTypeId(atup->t_data, typeid);
801  HeapTupleHeaderSetTypMod(atup->t_data, typmod);
803 
804  /* And return the results. */
805  return HeapTupleHeaderGetDatum(atup->t_data);
806 }
807 
808 /*
809  * Handle a control message from the tuple queue reader.
810  *
811  * Control messages are sent when the remote side is sending tuples that
812  * contain transient record types. We need to arrange to bless those
813  * record types locally and translate between remote and local typmods.
814  */
815 static void
817  char *data)
818 {
819  int natts;
820  int remotetypmod;
821  bool hasoid;
822  char *buf = data;
823  int rc = 0;
824  int i;
825  Form_pg_attribute *attrs;
826  MemoryContext oldcontext;
827  TupleDesc tupledesc;
828  RecordTypemodMap *mapent;
829  bool found;
830 
831  /* Extract remote typmod. */
832  memcpy(&remotetypmod, &buf[rc], sizeof(int));
833  rc += sizeof(int);
834 
835  /* Extract attribute count. */
836  memcpy(&natts, &buf[rc], sizeof(int));
837  rc += sizeof(int);
838 
839  /* Extract hasoid flag. */
840  memcpy(&hasoid, &buf[rc], sizeof(bool));
841  rc += sizeof(bool);
842 
843  /* Extract attribute details. */
845  attrs = palloc(natts * sizeof(Form_pg_attribute));
846  for (i = 0; i < natts; ++i)
847  {
848  attrs[i] = palloc(sizeof(FormData_pg_attribute));
849  memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
850  rc += sizeof(FormData_pg_attribute);
851  }
852  MemoryContextSwitchTo(oldcontext);
853 
854  /* We should have read the whole message. */
855  Assert(rc == nbytes);
856 
857  /* Construct TupleDesc. */
858  tupledesc = CreateTupleDesc(natts, hasoid, attrs);
859  tupledesc = BlessTupleDesc(tupledesc);
860 
861  /* Create map if it doesn't exist already. */
862  if (reader->typmodmap == NULL)
863  {
864  HASHCTL ctl;
865 
866  ctl.keysize = sizeof(int);
867  ctl.entrysize = sizeof(RecordTypemodMap);
869  reader->typmodmap = hash_create("typmodmap hashtable",
870  100, &ctl, HASH_ELEM | HASH_CONTEXT);
871  }
872 
873  /* Create map entry. */
874  mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
875  &found);
876  if (found)
877  elog(ERROR, "duplicate message for typmod %d",
878  remotetypmod);
879  mapent->localtypmod = tupledesc->tdtypmod;
880  elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
881  remotetypmod, tupledesc->tdtypmod);
882 }
883 
884 /*
885  * Build a mapping indicating what remapping class applies to each attribute
886  * described by a tupledesc.
887  */
888 static RemapInfo *
890 {
891  RemapInfo *remapinfo;
892  Size size;
893  AttrNumber i;
894  bool noop = true;
895 
896  size = offsetof(RemapInfo, mapping) +
897  sizeof(RemapClass) * tupledesc->natts;
898  remapinfo = MemoryContextAllocZero(TopMemoryContext, size);
899  remapinfo->natts = tupledesc->natts;
900  for (i = 0; i < tupledesc->natts; ++i)
901  {
902  Form_pg_attribute attr = tupledesc->attrs[i];
903 
904  if (attr->attisdropped)
905  {
906  remapinfo->mapping[i] = TQUEUE_REMAP_NONE;
907  continue;
908  }
909 
910  remapinfo->mapping[i] = GetRemapClass(attr->atttypid);
911  if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE)
912  noop = false;
913  }
914 
915  if (noop)
916  {
917  pfree(remapinfo);
918  remapinfo = NULL;
919  }
920 
921  return remapinfo;
922 }
923 
924 /*
925  * Determine the remap class assocociated with a particular data type.
926  *
927  * Transient record types need to have the typmod applied on the sending side
928  * replaced with a value on the receiving side that has the same meaning.
929  *
930  * Arrays, range types, and all record types (including named composite types)
931  * need to searched for transient record values buried within them.
932  * Surprisingly, a walker is required even when the indicated type is a
933  * composite type, because the actual value may be a compatible transient
934  * record type.
935  */
936 static RemapClass
938 {
939  RemapClass forceResult = TQUEUE_REMAP_NONE;
940  RemapClass innerResult = TQUEUE_REMAP_NONE;
941 
942  for (;;)
943  {
944  HeapTuple tup;
945  Form_pg_type typ;
946 
947  /* Simple cases. */
948  if (typeid == RECORDOID)
949  {
950  innerResult = TQUEUE_REMAP_RECORD;
951  break;
952  }
953  if (typeid == RECORDARRAYOID)
954  {
955  innerResult = TQUEUE_REMAP_ARRAY;
956  break;
957  }
958 
959  /* Otherwise, we need a syscache lookup to figure it out. */
960  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
961  if (!HeapTupleIsValid(tup))
962  elog(ERROR, "cache lookup failed for type %u", typeid);
963  typ = (Form_pg_type) GETSTRUCT(tup);
964 
965  /* Look through domains to underlying base type. */
966  if (typ->typtype == TYPTYPE_DOMAIN)
967  {
968  typeid = typ->typbasetype;
969  ReleaseSysCache(tup);
970  continue;
971  }
972 
973  /*
974  * Look through arrays to underlying base type, but the final return
975  * value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If
976  * this is an array of integers, for example, we don't need to walk
977  * it.)
978  */
979  if (OidIsValid(typ->typelem) && typ->typlen == -1)
980  {
981  typeid = typ->typelem;
982  ReleaseSysCache(tup);
983  if (forceResult == TQUEUE_REMAP_NONE)
984  forceResult = TQUEUE_REMAP_ARRAY;
985  continue;
986  }
987 
988  /*
989  * Similarly, look through ranges to the underlying base type, but the
990  * final return value must be either TQUEUE_REMAP_RANGE or
991  * TQUEUE_REMAP_NONE.
992  */
993  if (typ->typtype == TYPTYPE_RANGE)
994  {
995  ReleaseSysCache(tup);
996  if (forceResult == TQUEUE_REMAP_NONE)
997  forceResult = TQUEUE_REMAP_RANGE;
998  typeid = get_range_subtype(typeid);
999  continue;
1000  }
1001 
1002  /* Walk composite types. Nothing else needs special handling. */
1003  if (typ->typtype == TYPTYPE_COMPOSITE)
1004  innerResult = TQUEUE_REMAP_RECORD;
1005  ReleaseSysCache(tup);
1006  break;
1007  }
1008 
1009  if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE)
1010  return forceResult;
1011  return innerResult;
1012 }
#define RECORDARRAYOID
Definition: pg_type.h:670
signed short int16
Definition: c.h:252
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:608
#define TYPTYPE_DOMAIN
Definition: pg_type.h:710
static struct @76 value
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:795
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:448
int natts
Definition: tqueue.c:51
static struct cvec * range(struct vars *v, celt a, celt b, int cases)
Definition: regc_locale.c:403
void DestroyTupleQueueReader(TupleQueueReader *reader)
Definition: tqueue.c:505
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:203
#define HeapTupleHeaderSetTypeId(tup, typeid)
Definition: htup_details.h:425
#define TYPECACHE_RANGE_INFO
Definition: typcache.h:121
#define RangeTypeGetDatum(X)
Definition: rangetypes.h:73
#define GETSTRUCT(TUP)
Definition: htup_details.h:631
MemoryContext tmpcontext
Definition: tqueue.c:59
bool tdhasoid
Definition: tupdesc.h:79
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:428
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader, Size nbytes, HeapTupleHeader data)
Definition: tqueue.c:586
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
TupleDesc lookup_rowtype_tupdesc(Oid type_id, int32 typmod)
Definition: typcache.c:1243
MemoryContext hcxt
Definition: hsearch.h:78
#define RangeTypeGetOid(r)
Definition: rangetypes.h:33
shm_mq_handle * queue
Definition: tqueue.c:74
#define DEBUG3
Definition: elog.h:23
Datum lower(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:43
#define TYPTYPE_COMPOSITE
Definition: pg_type.h:709
void get_typlenbyvalalign(Oid typid, int16 *typlen, bool *typbyval, char *typalign)
Definition: lsyscache.c:1989
#define PointerGetDatum(X)
Definition: postgres.h:564
TupleDesc tupledesc
Definition: tqueue.c:62
static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc, RemapInfo *remapinfo, HeapTuple tuple)
Definition: tqueue.c:605
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define DatumGetRangeType(X)
Definition: rangetypes.h:71
Datum val
Definition: rangetypes.h:62
Size entrysize
Definition: hsearch.h:73
MemoryContext CurTransactionContext
Definition: mcxt.c:49
HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Definition: tqueue.c:525
Datum * tts_values
Definition: tuptable.h:125
static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value)
Definition: tqueue.c:669
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:138
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
Datum upper(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:74
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:887
FormData_pg_type * Form_pg_type
Definition: pg_type.h:233
TupleDesc CreateTupleDesc(int natts, bool hasoid, Form_pg_attribute *attrs)
Definition: tupdesc.c:111
unsigned int Oid
Definition: postgres_ext.h:31
static RemapInfo * BuildRemapInfo(TupleDesc tupledesc)
Definition: tqueue.c:889
#define OidIsValid(objectId)
Definition: c.h:530
#define DatumGetHeapTupleHeader(X)
Definition: fmgr.h:254
int natts
Definition: tupdesc.h:73
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:142
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:141
int32 tdtypmod
Definition: tupdesc.h:78
char bool
Definition: c.h:199
TupleDesc tupledesc
Definition: tqueue.c:76
#define HeapTupleHeaderSetDatumLength(tup, len)
Definition: htup_details.h:417
void shm_mq_detach(shm_mq *mq)
Definition: shm_mq.c:749
#define ARR_LBOUND(a)
Definition: array.h:277
HeapTupleHeader t_data
Definition: htup.h:67
struct RecordTypemodMap RecordTypemodMap
#define HeapTupleHeaderGetTypMod(tup)
Definition: htup_details.h:430
Definition: dynahash.c:193
HTAB * recordhtab
Definition: tqueue.c:60
void pfree(void *pointer)
Definition: mcxt.c:995
static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value)
Definition: tqueue.c:713
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
RemapInfo * remapinfo
Definition: tqueue.c:63
static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value)
Definition: tqueue.c:759
#define ARR_DIMS(a)
Definition: array.h:275
RemapInfo * remapinfo
Definition: tqueue.c:77
ItemPointerData t_self
Definition: htup.h:65
HTAB * typmodmap
Definition: tqueue.c:78
static void tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value)
Definition: tqueue.c:205
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:1081
uint32 t_len
Definition: htup.h:64
static void tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value)
Definition: tqueue.c:297
static char * buf
Definition: pg_test_fsync.c:65
bool * tts_isnull
Definition: tuptable.h:126
void check_stack_depth(void)
Definition: postgres.c:3095
static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value)
Definition: tqueue.c:641
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
Oid t_tableOid
Definition: htup.h:66
#define RECORDOID
Definition: pg_type.h:668
static void tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value)
Definition: tqueue.c:335
RangeType * range_serialize(TypeCacheEntry *typcache, RangeBound *lower, RangeBound *upper, bool empty)
Definition: rangetypes.c:1513
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1171
MemoryContext TopMemoryContext
Definition: mcxt.c:43
struct TypeCacheEntry * rngelemtype
Definition: typcache.h:84
#define TYPTYPE_RANGE
Definition: pg_type.h:713
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
int localtypmod
Definition: tqueue.c:69
void range_deserialize(TypeCacheEntry *typcache, RangeType *range, RangeBound *lower, RangeBound *upper, bool *empty)
Definition: rangetypes.c:1642
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:436
void * palloc0(Size size)
Definition: mcxt.c:923
static void tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:119
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
uintptr_t Datum
Definition: postgres.h:374
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:990
#define HeapTupleHeaderSetTypMod(tup, typmod)
Definition: htup_details.h:435
#define HeapTupleHeaderGetTypeId(tup)
Definition: htup_details.h:420
Size keysize
Definition: hsearch.h:72
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:787
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:191
#define InvalidOid
Definition: postgres_ext.h:36
FormData_pg_attribute
Definition: pg_attribute.h:168
shm_mq_result
Definition: shm_mq.h:36
RemapClass
Definition: tqueue.c:41
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:667
int remotetypmod
Definition: tqueue.c:68
void DecrTupleDescRefCount(TupleDesc tupdesc)
Definition: tupdesc.c:333
size_t Size
Definition: c.h:352
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:320
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:437
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:773
shm_mq_handle * handle
Definition: tqueue.c:58
bool infinite
Definition: rangetypes.h:63
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:729
#define ARR_NDIM(a)
Definition: array.h:271
RemapClass mapping[FLEXIBLE_ARRAY_MEMBER]
Definition: tqueue.c:52
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:867
void deconstruct_array(ArrayType *array, Oid elmtype, int elmlen, bool elmbyval, char elmalign, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3475
#define TUPLE_QUEUE_MODE_CONTROL
Definition: tqueue.c:81
static Datum values[MAXATTR]
Definition: bootstrap.c:160
static void TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, char *data)
Definition: tqueue.c:816
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:135
Datum HeapTupleHeaderGetDatum(HeapTupleHeader tuple)
Definition: execTuples.c:1251
void * palloc(Size size)
Definition: mcxt.c:894
static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value)
Definition: tqueue.c:230
static void tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod, TupleDesc tupledesc)
Definition: tqueue.c:379
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:143
int i
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
Definition: tqueue.c:489
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:82
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:144
DestReceiver pub
Definition: tqueue.c:57
#define elog
Definition: elog.h:218
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
Definition: tqueue.c:465
ArrayType * construct_md_array(Datum *elems, bool *nulls, int ndims, int *dims, int *lbs, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3340
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:489
#define ARR_ELEMTYPE(a)
Definition: array.h:273
int16 AttrNumber
Definition: attnum.h:21
Oid get_range_subtype(Oid rangeOid)
Definition: lsyscache.c:3047
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
#define offsetof(type, field)
Definition: c.h:547
static RemapClass GetRemapClass(Oid typeid)
Definition: tqueue.c:937
#define HeapTupleHeaderGetDatumLength(tup)
Definition: htup_details.h:414
#define DatumGetArrayTypeP(X)
Definition: array.h:242