PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "commands/defrem.h"
20 #include "commands/explain.h"
21 #include "commands/vacuum.h"
22 #include "foreign/fdwapi.h"
23 #include "funcapi.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "nodes/nodeFuncs.h"
27 #include "optimizer/cost.h"
28 #include "optimizer/pathnode.h"
29 #include "optimizer/paths.h"
30 #include "optimizer/planmain.h"
31 #include "optimizer/restrictinfo.h"
32 #include "optimizer/var.h"
33 #include "optimizer/tlist.h"
34 #include "parser/parsetree.h"
35 #include "utils/builtins.h"
36 #include "utils/guc.h"
37 #include "utils/lsyscache.h"
38 #include "utils/memutils.h"
39 #include "utils/rel.h"
40 #include "utils/sampling.h"
41 
43 
44 /* Default CPU cost to start up a foreign query. */
45 #define DEFAULT_FDW_STARTUP_COST 100.0
46 
47 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
48 #define DEFAULT_FDW_TUPLE_COST 0.01
49 
50 /* If no remote estimates, assume a sort costs 20% extra */
51 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
52 
53 /*
54  * Indexes of FDW-private information stored in fdw_private lists.
55  *
56  * These items are indexed with the enum FdwScanPrivateIndex, so an item
57  * can be fetched with list_nth(). For example, to get the SELECT statement:
58  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
59  */
61 {
62  /* SQL statement to execute remotely (as a String node) */
64  /* List of restriction clauses that can be executed remotely */
66  /* Integer list of attribute numbers retrieved by the SELECT */
68  /* Integer representing the desired fetch_size */
70  /* Oid of user mapping to be used while connecting to the foreign server */
72 
73  /*
74  * String describing join i.e. names of relations being joined and types
75  * of join, added when the scan is join
76  */
78 };
79 
80 /*
81  * Similarly, this enum describes what's kept in the fdw_private list for
82  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
83  *
84  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
85  * 2) Integer list of target attribute numbers for INSERT/UPDATE
86  * (NIL for a DELETE)
87  * 3) Boolean flag showing if the remote query has a RETURNING clause
88  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
89  */
91 {
92  /* SQL statement to execute remotely (as a String node) */
94  /* Integer list of target attribute numbers for INSERT/UPDATE */
96  /* has-returning flag (as an integer Value node) */
98  /* Integer list of attribute numbers retrieved by RETURNING */
100 };
101 
102 /*
103  * Similarly, this enum describes what's kept in the fdw_private list for
104  * a ForeignScan node that modifies a foreign table directly. We store:
105  *
106  * 1) UPDATE/DELETE statement text to be sent to the remote server
107  * 2) Boolean flag showing if the remote query has a RETURNING clause
108  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
109  * 4) Boolean flag showing if we set the command es_processed
110  */
112 {
113  /* SQL statement to execute remotely (as a String node) */
115  /* has-returning flag (as an integer Value node) */
117  /* Integer list of attribute numbers retrieved by RETURNING */
119  /* set-processed flag (as an integer Value node) */
121 };
122 
123 /*
124  * Execution state of a foreign scan using postgres_fdw.
125  */
126 typedef struct PgFdwScanState
127 {
128  Relation rel; /* relcache entry for the foreign table. NULL
129  * for a foreign join scan. */
130  TupleDesc tupdesc; /* tuple descriptor of scan */
131  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
132 
133  /* extracted fdw_private data */
134  char *query; /* text of SELECT command */
135  List *retrieved_attrs; /* list of retrieved attribute numbers */
136 
137  /* for remote query execution */
138  PGconn *conn; /* connection for the scan */
139  unsigned int cursor_number; /* quasi-unique ID for my cursor */
140  bool cursor_exists; /* have we created the cursor? */
141  int numParams; /* number of parameters passed to query */
142  FmgrInfo *param_flinfo; /* output conversion functions for them */
143  List *param_exprs; /* executable expressions for param values */
144  const char **param_values; /* textual values of query parameters */
145 
146  /* for storing result tuples */
147  HeapTuple *tuples; /* array of currently-retrieved tuples */
148  int num_tuples; /* # of tuples in array */
149  int next_tuple; /* index of next one to return */
150 
151  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
152  int fetch_ct_2; /* Min(# of fetches done, 2) */
153  bool eof_reached; /* true if last fetch reached EOF */
154 
155  /* working memory contexts */
156  MemoryContext batch_cxt; /* context holding current batch of tuples */
157  MemoryContext temp_cxt; /* context for per-tuple temporary data */
158 
159  int fetch_size; /* number of tuples per fetch */
161 
162 /*
163  * Execution state of a foreign insert/update/delete operation.
164  */
165 typedef struct PgFdwModifyState
166 {
167  Relation rel; /* relcache entry for the foreign table */
168  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
169 
170  /* for remote query execution */
171  PGconn *conn; /* connection for the scan */
172  char *p_name; /* name of prepared statement, if created */
173 
174  /* extracted fdw_private data */
175  char *query; /* text of INSERT/UPDATE/DELETE command */
176  List *target_attrs; /* list of target attribute numbers */
177  bool has_returning; /* is there a RETURNING clause? */
178  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
179 
180  /* info about parameters for prepared statement */
181  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
182  int p_nums; /* number of parameters to transmit */
183  FmgrInfo *p_flinfo; /* output conversion functions for them */
184 
185  /* working memory context */
186  MemoryContext temp_cxt; /* context for per-tuple temporary data */
188 
189 /*
190  * Execution state of a foreign scan that modifies a foreign table directly.
191  */
193 {
194  Relation rel; /* relcache entry for the foreign table */
195  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
196 
197  /* extracted fdw_private data */
198  char *query; /* text of UPDATE/DELETE command */
199  bool has_returning; /* is there a RETURNING clause? */
200  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
201  bool set_processed; /* do we set the command es_processed? */
202 
203  /* for remote query execution */
204  PGconn *conn; /* connection for the update */
205  int numParams; /* number of parameters passed to query */
206  FmgrInfo *param_flinfo; /* output conversion functions for them */
207  List *param_exprs; /* executable expressions for param values */
208  const char **param_values; /* textual values of query parameters */
209 
210  /* for storing result tuples */
211  PGresult *result; /* result for query */
212  int num_tuples; /* # of result tuples */
213  int next_tuple; /* index of next one to return */
214 
215  /* working memory context */
216  MemoryContext temp_cxt; /* context for per-tuple temporary data */
218 
219 /*
220  * Workspace for analyzing a foreign table.
221  */
222 typedef struct PgFdwAnalyzeState
223 {
224  Relation rel; /* relcache entry for the foreign table */
225  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
226  List *retrieved_attrs; /* attr numbers retrieved by query */
227 
228  /* collected sample rows */
229  HeapTuple *rows; /* array of size targrows */
230  int targrows; /* target # of sample rows */
231  int numrows; /* # of sample rows collected */
232 
233  /* for random sampling */
234  double samplerows; /* # of rows fetched */
235  double rowstoskip; /* # of rows to skip before next sample */
236  ReservoirStateData rstate; /* state for reservoir sampling */
237 
238  /* working memory contexts */
239  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
240  MemoryContext temp_cxt; /* context for per-tuple temporary data */
242 
243 /*
244  * Identify the attribute where data conversion fails.
245  */
246 typedef struct ConversionLocation
247 {
248  Relation rel; /* foreign table's relcache entry. */
249  AttrNumber cur_attno; /* attribute number being processed, or 0 */
250 
251  /*
252  * In case of foreign join push down, fdw_scan_tlist is used to identify
253  * the Var node corresponding to the error location and
254  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
255  * to get the relation name and attribute name.
256  */
259 
260 /* Callback argument for ec_member_matches_foreign */
261 typedef struct
262 {
263  Expr *current; /* current expr, or NULL if not yet found */
264  List *already_used; /* expressions already dealt with */
266 
267 /*
268  * SQL functions
269  */
271 
272 /*
273  * FDW callback routines
274  */
275 static void postgresGetForeignRelSize(PlannerInfo *root,
276  RelOptInfo *baserel,
277  Oid foreigntableid);
278 static void postgresGetForeignPaths(PlannerInfo *root,
279  RelOptInfo *baserel,
280  Oid foreigntableid);
282  RelOptInfo *baserel,
283  Oid foreigntableid,
284  ForeignPath *best_path,
285  List *tlist,
286  List *scan_clauses,
287  Plan *outer_plan);
288 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
291 static void postgresEndForeignScan(ForeignScanState *node);
292 static void postgresAddForeignUpdateTargets(Query *parsetree,
293  RangeTblEntry *target_rte,
294  Relation target_relation);
296  ModifyTable *plan,
297  Index resultRelation,
298  int subplan_index);
299 static void postgresBeginForeignModify(ModifyTableState *mtstate,
300  ResultRelInfo *resultRelInfo,
301  List *fdw_private,
302  int subplan_index,
303  int eflags);
305  ResultRelInfo *resultRelInfo,
306  TupleTableSlot *slot,
307  TupleTableSlot *planSlot);
309  ResultRelInfo *resultRelInfo,
310  TupleTableSlot *slot,
311  TupleTableSlot *planSlot);
313  ResultRelInfo *resultRelInfo,
314  TupleTableSlot *slot,
315  TupleTableSlot *planSlot);
316 static void postgresEndForeignModify(EState *estate,
317  ResultRelInfo *resultRelInfo);
319 static bool postgresPlanDirectModify(PlannerInfo *root,
320  ModifyTable *plan,
321  Index resultRelation,
322  int subplan_index);
323 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
325 static void postgresEndDirectModify(ForeignScanState *node);
327  ExplainState *es);
329  ResultRelInfo *rinfo,
330  List *fdw_private,
331  int subplan_index,
332  ExplainState *es);
334  ExplainState *es);
335 static bool postgresAnalyzeForeignTable(Relation relation,
336  AcquireSampleRowsFunc *func,
337  BlockNumber *totalpages);
339  Oid serverOid);
340 static void postgresGetForeignJoinPaths(PlannerInfo *root,
341  RelOptInfo *joinrel,
342  RelOptInfo *outerrel,
343  RelOptInfo *innerrel,
344  JoinType jointype,
345  JoinPathExtraData *extra);
347  TupleTableSlot *slot);
348 
349 /*
350  * Helper functions
351  */
352 static void estimate_path_cost_size(PlannerInfo *root,
353  RelOptInfo *baserel,
354  List *join_conds,
355  List *pathkeys,
356  double *p_rows, int *p_width,
357  Cost *p_startup_cost, Cost *p_total_cost);
358 static void get_remote_estimate(const char *sql,
359  PGconn *conn,
360  double *rows,
361  int *width,
362  Cost *startup_cost,
363  Cost *total_cost);
364 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
366  void *arg);
367 static void create_cursor(ForeignScanState *node);
368 static void fetch_more_data(ForeignScanState *node);
369 static void close_cursor(PGconn *conn, unsigned int cursor_number);
370 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
371 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
372  ItemPointer tupleid,
373  TupleTableSlot *slot);
374 static void store_returning_result(PgFdwModifyState *fmstate,
375  TupleTableSlot *slot, PGresult *res);
376 static void execute_dml_stmt(ForeignScanState *node);
378 static void prepare_query_params(PlanState *node,
379  List *fdw_exprs,
380  int numParams,
381  FmgrInfo **param_flinfo,
382  List **param_exprs,
383  const char ***param_values);
384 static void process_query_params(ExprContext *econtext,
385  FmgrInfo *param_flinfo,
386  List *param_exprs,
387  const char **param_values);
388 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
389  HeapTuple *rows, int targrows,
390  double *totalrows,
391  double *totaldeadrows);
392 static void analyze_row_processor(PGresult *res, int row,
393  PgFdwAnalyzeState *astate);
395  int row,
396  Relation rel,
397  AttInMetadata *attinmeta,
398  List *retrieved_attrs,
399  ForeignScanState *fsstate,
400  MemoryContext temp_context);
401 static void conversion_error_callback(void *arg);
402 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
403  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
404  JoinPathExtraData *extra);
406  RelOptInfo *rel);
409  Path *epq_path);
410 
411 
412 /*
413  * Foreign-data wrapper handler function: return a struct with pointers
414  * to my callback routines.
415  */
416 Datum
418 {
419  FdwRoutine *routine = makeNode(FdwRoutine);
420 
421  /* Functions for scanning foreign tables */
429 
430  /* Functions for updating foreign tables */
443 
444  /* Function for EvalPlanQual rechecks */
446  /* Support functions for EXPLAIN */
450 
451  /* Support functions for ANALYZE */
453 
454  /* Support functions for IMPORT FOREIGN SCHEMA */
456 
457  /* Support functions for join push-down */
459 
460  PG_RETURN_POINTER(routine);
461 }
462 
463 /*
464  * postgresGetForeignRelSize
465  * Estimate # of rows and width of the result of the scan
466  *
467  * We should consider the effect of all baserestrictinfo clauses here, but
468  * not any join clauses.
469  */
470 static void
472  RelOptInfo *baserel,
473  Oid foreigntableid)
474 {
475  PgFdwRelationInfo *fpinfo;
476  ListCell *lc;
477  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
478  const char *namespace;
479  const char *relname;
480  const char *refname;
481 
482  /*
483  * We use PgFdwRelationInfo to pass various information to subsequent
484  * functions.
485  */
486  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
487  baserel->fdw_private = (void *) fpinfo;
488 
489  /* Base foreign tables need to be push down always. */
490  fpinfo->pushdown_safe = true;
491 
492  /* Look up foreign-table catalog info. */
493  fpinfo->table = GetForeignTable(foreigntableid);
494  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
495 
496  /*
497  * Extract user-settable option values. Note that per-table setting of
498  * use_remote_estimate overrides per-server setting.
499  */
500  fpinfo->use_remote_estimate = false;
503  fpinfo->shippable_extensions = NIL;
504  fpinfo->fetch_size = 100;
505 
506  foreach(lc, fpinfo->server->options)
507  {
508  DefElem *def = (DefElem *) lfirst(lc);
509 
510  if (strcmp(def->defname, "use_remote_estimate") == 0)
511  fpinfo->use_remote_estimate = defGetBoolean(def);
512  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
513  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
514  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
515  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
516  else if (strcmp(def->defname, "extensions") == 0)
517  fpinfo->shippable_extensions =
518  ExtractExtensionList(defGetString(def), false);
519  else if (strcmp(def->defname, "fetch_size") == 0)
520  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
521  }
522  foreach(lc, fpinfo->table->options)
523  {
524  DefElem *def = (DefElem *) lfirst(lc);
525 
526  if (strcmp(def->defname, "use_remote_estimate") == 0)
527  fpinfo->use_remote_estimate = defGetBoolean(def);
528  else if (strcmp(def->defname, "fetch_size") == 0)
529  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
530  }
531 
532  /*
533  * If the table or the server is configured to use remote estimates,
534  * identify which user to do remote access as during planning. This
535  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
536  * permissions, the query would have failed at runtime anyway.
537  */
538  if (fpinfo->use_remote_estimate)
539  {
540  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
541 
542  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
543  }
544  else
545  fpinfo->user = NULL;
546 
547  /*
548  * Identify which baserestrictinfo clauses can be sent to the remote
549  * server and which can't.
550  */
551  classifyConditions(root, baserel, baserel->baserestrictinfo,
552  &fpinfo->remote_conds, &fpinfo->local_conds);
553 
554  /*
555  * Identify which attributes will need to be retrieved from the remote
556  * server. These include all attrs needed for joins or final output, plus
557  * all attrs used in the local_conds. (Note: if we end up using a
558  * parameterized scan, it's possible that some of the join clauses will be
559  * sent to the remote and thus we wouldn't really need to retrieve the
560  * columns used in them. Doesn't seem worth detecting that case though.)
561  */
562  fpinfo->attrs_used = NULL;
563  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
564  &fpinfo->attrs_used);
565  foreach(lc, fpinfo->local_conds)
566  {
567  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
568 
569  pull_varattnos((Node *) rinfo->clause, baserel->relid,
570  &fpinfo->attrs_used);
571  }
572 
573  /*
574  * Compute the selectivity and cost of the local_conds, so we don't have
575  * to do it over again for each path. The best we can do for these
576  * conditions is to estimate selectivity on the basis of local statistics.
577  */
579  fpinfo->local_conds,
580  baserel->relid,
581  JOIN_INNER,
582  NULL);
583 
584  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
585 
586  /*
587  * Set cached relation costs to some negative value, so that we can detect
588  * when they are set to some sensible costs during one (usually the first)
589  * of the calls to estimate_path_cost_size().
590  */
591  fpinfo->rel_startup_cost = -1;
592  fpinfo->rel_total_cost = -1;
593 
594  /*
595  * If the table or the server is configured to use remote estimates,
596  * connect to the foreign server and execute EXPLAIN to estimate the
597  * number of rows selected by the restriction clauses, as well as the
598  * average row width. Otherwise, estimate using whatever statistics we
599  * have locally, in a way similar to ordinary tables.
600  */
601  if (fpinfo->use_remote_estimate)
602  {
603  /*
604  * Get cost/size estimates with help of remote server. Save the
605  * values in fpinfo so we don't need to do it again to generate the
606  * basic foreign path.
607  */
608  estimate_path_cost_size(root, baserel, NIL, NIL,
609  &fpinfo->rows, &fpinfo->width,
610  &fpinfo->startup_cost, &fpinfo->total_cost);
611 
612  /* Report estimated baserel size to planner. */
613  baserel->rows = fpinfo->rows;
614  baserel->reltarget->width = fpinfo->width;
615  }
616  else
617  {
618  /*
619  * If the foreign table has never been ANALYZEd, it will have relpages
620  * and reltuples equal to zero, which most likely has nothing to do
621  * with reality. We can't do a whole lot about that if we're not
622  * allowed to consult the remote server, but we can use a hack similar
623  * to plancat.c's treatment of empty relations: use a minimum size
624  * estimate of 10 pages, and divide by the column-datatype-based width
625  * estimate to get the corresponding number of tuples.
626  */
627  if (baserel->pages == 0 && baserel->tuples == 0)
628  {
629  baserel->pages = 10;
630  baserel->tuples =
631  (10 * BLCKSZ) / (baserel->reltarget->width +
633  }
634 
635  /* Estimate baserel size as best we can with local statistics. */
636  set_baserel_size_estimates(root, baserel);
637 
638  /* Fill in basically-bogus cost estimates for use later. */
639  estimate_path_cost_size(root, baserel, NIL, NIL,
640  &fpinfo->rows, &fpinfo->width,
641  &fpinfo->startup_cost, &fpinfo->total_cost);
642  }
643 
644  /*
645  * Set the name of relation in fpinfo, while we are constructing it here.
646  * It will be used to build the string describing the join relation in
647  * EXPLAIN output. We can't know whether VERBOSE option is specified or
648  * not, so always schema-qualify the foreign table name.
649  */
650  fpinfo->relation_name = makeStringInfo();
651  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
652  relname = get_rel_name(foreigntableid);
653  refname = rte->eref->aliasname;
654  appendStringInfo(fpinfo->relation_name, "%s.%s",
655  quote_identifier(namespace),
656  quote_identifier(relname));
657  if (*refname && strcmp(refname, relname) != 0)
658  appendStringInfo(fpinfo->relation_name, " %s",
660 }
661 
662 /*
663  * get_useful_ecs_for_relation
664  * Determine which EquivalenceClasses might be involved in useful
665  * orderings of this relation.
666  *
667  * This function is in some respects a mirror image of the core function
668  * pathkeys_useful_for_merging: for a regular table, we know what indexes
669  * we have and want to test whether any of them are useful. For a foreign
670  * table, we don't know what indexes are present on the remote side but
671  * want to speculate about which ones we'd like to use if they existed.
672  *
673  * This function returns a list of potentially-useful equivalence classes,
674  * but it does not guarantee that an EquivalenceMember exists which contains
675  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
676  * ft1.x + t1.x = 0, this function will say that the equivalence class
677  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
678  * t1 is local (or on a different server), it will turn out that no useful
679  * ORDER BY clause can be generated. It's not our job to figure that out
680  * here; we're only interested in identifying relevant ECs.
681  */
682 static List *
684 {
685  List *useful_eclass_list = NIL;
686  ListCell *lc;
687  Relids relids;
688 
689  /*
690  * First, consider whether any active EC is potentially useful for a merge
691  * join against this relation.
692  */
693  if (rel->has_eclass_joins)
694  {
695  foreach(lc, root->eq_classes)
696  {
697  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
698 
699  if (eclass_useful_for_merging(root, cur_ec, rel))
700  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
701  }
702  }
703 
704  /*
705  * Next, consider whether there are any non-EC derivable join clauses that
706  * are merge-joinable. If the joininfo list is empty, we can exit
707  * quickly.
708  */
709  if (rel->joininfo == NIL)
710  return useful_eclass_list;
711 
712  /* If this is a child rel, we must use the topmost parent rel to search. */
714  relids = find_childrel_top_parent(root, rel)->relids;
715  else
716  relids = rel->relids;
717 
718  /* Check each join clause in turn. */
719  foreach(lc, rel->joininfo)
720  {
721  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
722 
723  /* Consider only mergejoinable clauses */
724  if (restrictinfo->mergeopfamilies == NIL)
725  continue;
726 
727  /* Make sure we've got canonical ECs. */
728  update_mergeclause_eclasses(root, restrictinfo);
729 
730  /*
731  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
732  * that left_ec and right_ec will be initialized, per comments in
733  * distribute_qual_to_rels.
734  *
735  * We want to identify which side of this merge-joinable clause
736  * contains columns from the relation produced by this RelOptInfo. We
737  * test for overlap, not containment, because there could be extra
738  * relations on either side. For example, suppose we've got something
739  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
740  * A.y = D.y. The input rel might be the joinrel between A and B, and
741  * we'll consider the join clause A.y = D.y. relids contains a
742  * relation not involved in the join class (B) and the equivalence
743  * class for the left-hand side of the clause contains a relation not
744  * involved in the input rel (C). Despite the fact that we have only
745  * overlap and not containment in either direction, A.y is potentially
746  * useful as a sort column.
747  *
748  * Note that it's even possible that relids overlaps neither side of
749  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
750  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
751  * but overlaps neither side of B. In that case, we just skip this
752  * join clause, since it doesn't suggest a useful sort order for this
753  * relation.
754  */
755  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
756  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
757  restrictinfo->right_ec);
758  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
759  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
760  restrictinfo->left_ec);
761  }
762 
763  return useful_eclass_list;
764 }
765 
766 /*
767  * get_useful_pathkeys_for_relation
768  * Determine which orderings of a relation might be useful.
769  *
770  * Getting data in sorted order can be useful either because the requested
771  * order matches the final output ordering for the overall query we're
772  * planning, or because it enables an efficient merge join. Here, we try
773  * to figure out which pathkeys to consider.
774  */
775 static List *
777 {
778  List *useful_pathkeys_list = NIL;
779  List *useful_eclass_list;
781  EquivalenceClass *query_ec = NULL;
782  ListCell *lc;
783 
784  /*
785  * Pushing the query_pathkeys to the remote server is always worth
786  * considering, because it might let us avoid a local sort.
787  */
788  if (root->query_pathkeys)
789  {
790  bool query_pathkeys_ok = true;
791 
792  foreach(lc, root->query_pathkeys)
793  {
794  PathKey *pathkey = (PathKey *) lfirst(lc);
795  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
796  Expr *em_expr;
797 
798  /*
799  * The planner and executor don't have any clever strategy for
800  * taking data sorted by a prefix of the query's pathkeys and
801  * getting it to be sorted by all of those pathkeys. We'll just
802  * end up resorting the entire data set. So, unless we can push
803  * down all of the query pathkeys, forget it.
804  *
805  * is_foreign_expr would detect volatile expressions as well, but
806  * checking ec_has_volatile here saves some cycles.
807  */
808  if (pathkey_ec->ec_has_volatile ||
809  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
810  !is_foreign_expr(root, rel, em_expr))
811  {
812  query_pathkeys_ok = false;
813  break;
814  }
815  }
816 
817  if (query_pathkeys_ok)
818  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
819  }
820 
821  /*
822  * Even if we're not using remote estimates, having the remote side do the
823  * sort generally won't be any worse than doing it locally, and it might
824  * be much better if the remote side can generate data in the right order
825  * without needing a sort at all. However, what we're going to do next is
826  * try to generate pathkeys that seem promising for possible merge joins,
827  * and that's more speculative. A wrong choice might hurt quite a bit, so
828  * bail out if we can't use remote estimates.
829  */
830  if (!fpinfo->use_remote_estimate)
831  return useful_pathkeys_list;
832 
833  /* Get the list of interesting EquivalenceClasses. */
834  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
835 
836  /* Extract unique EC for query, if any, so we don't consider it again. */
837  if (list_length(root->query_pathkeys) == 1)
838  {
839  PathKey *query_pathkey = linitial(root->query_pathkeys);
840 
841  query_ec = query_pathkey->pk_eclass;
842  }
843 
844  /*
845  * As a heuristic, the only pathkeys we consider here are those of length
846  * one. It's surely possible to consider more, but since each one we
847  * choose to consider will generate a round-trip to the remote side, we
848  * need to be a bit cautious here. It would sure be nice to have a local
849  * cache of information about remote index definitions...
850  */
851  foreach(lc, useful_eclass_list)
852  {
853  EquivalenceClass *cur_ec = lfirst(lc);
854  Expr *em_expr;
855  PathKey *pathkey;
856 
857  /* If redundant with what we did above, skip it. */
858  if (cur_ec == query_ec)
859  continue;
860 
861  /* If no pushable expression for this rel, skip it. */
862  em_expr = find_em_expr_for_rel(cur_ec, rel);
863  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
864  continue;
865 
866  /* Looks like we can generate a pathkey, so let's do it. */
867  pathkey = make_canonical_pathkey(root, cur_ec,
868  linitial_oid(cur_ec->ec_opfamilies),
870  false);
871  useful_pathkeys_list = lappend(useful_pathkeys_list,
872  list_make1(pathkey));
873  }
874 
875  return useful_pathkeys_list;
876 }
877 
878 /*
879  * postgresGetForeignPaths
880  * Create possible scan paths for a scan on the foreign table
881  */
882 static void
884  RelOptInfo *baserel,
885  Oid foreigntableid)
886 {
887  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
888  ForeignPath *path;
889  List *ppi_list;
890  ListCell *lc;
891 
892  /*
893  * Create simplest ForeignScan path node and add it to baserel. This path
894  * corresponds to SeqScan path of regular tables (though depending on what
895  * baserestrict conditions we were able to send to remote, there might
896  * actually be an indexscan happening there). We already did all the work
897  * to estimate cost and size of this path.
898  */
899  path = create_foreignscan_path(root, baserel,
900  NULL, /* default pathtarget */
901  fpinfo->rows,
902  fpinfo->startup_cost,
903  fpinfo->total_cost,
904  NIL, /* no pathkeys */
905  NULL, /* no outer rel either */
906  NULL, /* no extra plan */
907  NIL); /* no fdw_private list */
908  add_path(baserel, (Path *) path);
909 
910  /* Add paths with pathkeys */
911  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
912 
913  /*
914  * If we're not using remote estimates, stop here. We have no way to
915  * estimate whether any join clauses would be worth sending across, so
916  * don't bother building parameterized paths.
917  */
918  if (!fpinfo->use_remote_estimate)
919  return;
920 
921  /*
922  * Thumb through all join clauses for the rel to identify which outer
923  * relations could supply one or more safe-to-send-to-remote join clauses.
924  * We'll build a parameterized path for each such outer relation.
925  *
926  * It's convenient to manage this by representing each candidate outer
927  * relation by the ParamPathInfo node for it. We can then use the
928  * ppi_clauses list in the ParamPathInfo node directly as a list of the
929  * interesting join clauses for that rel. This takes care of the
930  * possibility that there are multiple safe join clauses for such a rel,
931  * and also ensures that we account for unsafe join clauses that we'll
932  * still have to enforce locally (since the parameterized-path machinery
933  * insists that we handle all movable clauses).
934  */
935  ppi_list = NIL;
936  foreach(lc, baserel->joininfo)
937  {
938  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
939  Relids required_outer;
940  ParamPathInfo *param_info;
941 
942  /* Check if clause can be moved to this rel */
943  if (!join_clause_is_movable_to(rinfo, baserel))
944  continue;
945 
946  /* See if it is safe to send to remote */
947  if (!is_foreign_expr(root, baserel, rinfo->clause))
948  continue;
949 
950  /* Calculate required outer rels for the resulting path */
951  required_outer = bms_union(rinfo->clause_relids,
952  baserel->lateral_relids);
953  /* We do not want the foreign rel itself listed in required_outer */
954  required_outer = bms_del_member(required_outer, baserel->relid);
955 
956  /*
957  * required_outer probably can't be empty here, but if it were, we
958  * couldn't make a parameterized path.
959  */
960  if (bms_is_empty(required_outer))
961  continue;
962 
963  /* Get the ParamPathInfo */
964  param_info = get_baserel_parampathinfo(root, baserel,
965  required_outer);
966  Assert(param_info != NULL);
967 
968  /*
969  * Add it to list unless we already have it. Testing pointer equality
970  * is OK since get_baserel_parampathinfo won't make duplicates.
971  */
972  ppi_list = list_append_unique_ptr(ppi_list, param_info);
973  }
974 
975  /*
976  * The above scan examined only "generic" join clauses, not those that
977  * were absorbed into EquivalenceClauses. See if we can make anything out
978  * of EquivalenceClauses.
979  */
980  if (baserel->has_eclass_joins)
981  {
982  /*
983  * We repeatedly scan the eclass list looking for column references
984  * (or expressions) belonging to the foreign rel. Each time we find
985  * one, we generate a list of equivalence joinclauses for it, and then
986  * see if any are safe to send to the remote. Repeat till there are
987  * no more candidate EC members.
988  */
990 
991  arg.already_used = NIL;
992  for (;;)
993  {
994  List *clauses;
995 
996  /* Make clauses, skipping any that join to lateral_referencers */
997  arg.current = NULL;
999  baserel,
1001  (void *) &arg,
1002  baserel->lateral_referencers);
1003 
1004  /* Done if there are no more expressions in the foreign rel */
1005  if (arg.current == NULL)
1006  {
1007  Assert(clauses == NIL);
1008  break;
1009  }
1010 
1011  /* Scan the extracted join clauses */
1012  foreach(lc, clauses)
1013  {
1014  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1015  Relids required_outer;
1016  ParamPathInfo *param_info;
1017 
1018  /* Check if clause can be moved to this rel */
1019  if (!join_clause_is_movable_to(rinfo, baserel))
1020  continue;
1021 
1022  /* See if it is safe to send to remote */
1023  if (!is_foreign_expr(root, baserel, rinfo->clause))
1024  continue;
1025 
1026  /* Calculate required outer rels for the resulting path */
1027  required_outer = bms_union(rinfo->clause_relids,
1028  baserel->lateral_relids);
1029  required_outer = bms_del_member(required_outer, baserel->relid);
1030  if (bms_is_empty(required_outer))
1031  continue;
1032 
1033  /* Get the ParamPathInfo */
1034  param_info = get_baserel_parampathinfo(root, baserel,
1035  required_outer);
1036  Assert(param_info != NULL);
1037 
1038  /* Add it to list unless we already have it */
1039  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1040  }
1041 
1042  /* Try again, now ignoring the expression we found this time */
1043  arg.already_used = lappend(arg.already_used, arg.current);
1044  }
1045  }
1046 
1047  /*
1048  * Now build a path for each useful outer relation.
1049  */
1050  foreach(lc, ppi_list)
1051  {
1052  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1053  double rows;
1054  int width;
1055  Cost startup_cost;
1056  Cost total_cost;
1057 
1058  /* Get a cost estimate from the remote */
1059  estimate_path_cost_size(root, baserel,
1060  param_info->ppi_clauses, NIL,
1061  &rows, &width,
1062  &startup_cost, &total_cost);
1063 
1064  /*
1065  * ppi_rows currently won't get looked at by anything, but still we
1066  * may as well ensure that it matches our idea of the rowcount.
1067  */
1068  param_info->ppi_rows = rows;
1069 
1070  /* Make the path */
1071  path = create_foreignscan_path(root, baserel,
1072  NULL, /* default pathtarget */
1073  rows,
1074  startup_cost,
1075  total_cost,
1076  NIL, /* no pathkeys */
1077  param_info->ppi_req_outer,
1078  NULL,
1079  NIL); /* no fdw_private list */
1080  add_path(baserel, (Path *) path);
1081  }
1082 }
1083 
1084 /*
1085  * postgresGetForeignPlan
1086  * Create ForeignScan plan node which implements selected best path
1087  */
1088 static ForeignScan *
1090  RelOptInfo *foreignrel,
1091  Oid foreigntableid,
1092  ForeignPath *best_path,
1093  List *tlist,
1094  List *scan_clauses,
1095  Plan *outer_plan)
1096 {
1097  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1098  Index scan_relid;
1099  List *fdw_private;
1100  List *remote_conds = NIL;
1101  List *remote_exprs = NIL;
1102  List *local_exprs = NIL;
1103  List *params_list = NIL;
1104  List *retrieved_attrs;
1105  StringInfoData sql;
1106  ListCell *lc;
1107  List *fdw_scan_tlist = NIL;
1108 
1109  /*
1110  * For base relations, set scan_relid as the relid of the relation. For
1111  * other kinds of relations set it to 0.
1112  */
1113  if (foreignrel->reloptkind == RELOPT_BASEREL ||
1114  foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
1115  scan_relid = foreignrel->relid;
1116  else
1117  {
1118  scan_relid = 0;
1119 
1120  /*
1121  * create_scan_plan() and create_foreignscan_plan() pass
1122  * rel->baserestrictinfo + parameterization clauses through
1123  * scan_clauses. For a join rel->baserestrictinfo is NIL and we are
1124  * not considering parameterization right now, so there should be no
1125  * scan_clauses for a joinrel.
1126  */
1127  Assert(!scan_clauses);
1128  }
1129 
1130  /*
1131  * Separate the scan_clauses into those that can be executed remotely and
1132  * those that can't. baserestrictinfo clauses that were previously
1133  * determined to be safe or unsafe by classifyConditions are shown in
1134  * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
1135  * scan_clauses list will be a join clause, which we have to check for
1136  * remote-safety.
1137  *
1138  * Note: the join clauses we see here should be the exact same ones
1139  * previously examined by postgresGetForeignPaths. Possibly it'd be worth
1140  * passing forward the classification work done then, rather than
1141  * repeating it here.
1142  *
1143  * This code must match "extract_actual_clauses(scan_clauses, false)"
1144  * except for the additional decision about remote versus local execution.
1145  * Note however that we don't strip the RestrictInfo nodes from the
1146  * remote_conds list, since appendWhereClause expects a list of
1147  * RestrictInfos.
1148  */
1149  foreach(lc, scan_clauses)
1150  {
1151  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1152 
1153  Assert(IsA(rinfo, RestrictInfo));
1154 
1155  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1156  if (rinfo->pseudoconstant)
1157  continue;
1158 
1159  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1160  {
1161  remote_conds = lappend(remote_conds, rinfo);
1162  remote_exprs = lappend(remote_exprs, rinfo->clause);
1163  }
1164  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1165  local_exprs = lappend(local_exprs, rinfo->clause);
1166  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1167  {
1168  remote_conds = lappend(remote_conds, rinfo);
1169  remote_exprs = lappend(remote_exprs, rinfo->clause);
1170  }
1171  else
1172  local_exprs = lappend(local_exprs, rinfo->clause);
1173  }
1174 
1175  if (foreignrel->reloptkind == RELOPT_JOINREL)
1176  {
1177  /* For a join relation, get the conditions from fdw_private structure */
1178  remote_conds = fpinfo->remote_conds;
1179  local_exprs = fpinfo->local_conds;
1180 
1181  /* Build the list of columns to be fetched from the foreign server. */
1182  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1183 
1184  /*
1185  * Ensure that the outer plan produces a tuple whose descriptor
1186  * matches our scan tuple slot. This is safe because all scans and
1187  * joins support projection, so we never need to insert a Result node.
1188  * Also, remove the local conditions from outer plan's quals, lest
1189  * they will be evaluated twice, once by the local plan and once by
1190  * the scan.
1191  */
1192  if (outer_plan)
1193  {
1194  ListCell *lc;
1195 
1196  outer_plan->targetlist = fdw_scan_tlist;
1197 
1198  foreach(lc, local_exprs)
1199  {
1200  Join *join_plan = (Join *) outer_plan;
1201  Node *qual = lfirst(lc);
1202 
1203  outer_plan->qual = list_delete(outer_plan->qual, qual);
1204 
1205  /*
1206  * For an inner join the local conditions of foreign scan plan
1207  * can be part of the joinquals as well.
1208  */
1209  if (join_plan->jointype == JOIN_INNER)
1210  join_plan->joinqual = list_delete(join_plan->joinqual,
1211  qual);
1212  }
1213  }
1214  }
1215 
1216  /*
1217  * Build the query string to be sent for execution, and identify
1218  * expressions to be sent as parameters.
1219  */
1220  initStringInfo(&sql);
1221  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1222  remote_conds, best_path->path.pathkeys,
1223  &retrieved_attrs, &params_list);
1224 
1225  /*
1226  * Build the fdw_private list that will be available to the executor.
1227  * Items in the list must match order in enum FdwScanPrivateIndex.
1228  */
1229  fdw_private = list_make5(makeString(sql.data),
1230  remote_conds,
1231  retrieved_attrs,
1232  makeInteger(fpinfo->fetch_size),
1233  makeInteger(foreignrel->umid));
1234  if (foreignrel->reloptkind == RELOPT_JOINREL)
1235  fdw_private = lappend(fdw_private,
1236  makeString(fpinfo->relation_name->data));
1237 
1238  /*
1239  * Create the ForeignScan node for the given relation.
1240  *
1241  * Note that the remote parameter expressions are stored in the fdw_exprs
1242  * field of the finished plan node; we can't keep them in private state
1243  * because then they wouldn't be subject to later planner processing.
1244  */
1245  return make_foreignscan(tlist,
1246  local_exprs,
1247  scan_relid,
1248  params_list,
1249  fdw_private,
1250  fdw_scan_tlist,
1251  remote_exprs,
1252  outer_plan);
1253 }
1254 
1255 /*
1256  * postgresBeginForeignScan
1257  * Initiate an executor scan of a foreign PostgreSQL table.
1258  */
1259 static void
1261 {
1262  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1263  EState *estate = node->ss.ps.state;
1264  PgFdwScanState *fsstate;
1265  UserMapping *user;
1266  int numParams;
1267 
1268  /*
1269  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1270  */
1271  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1272  return;
1273 
1274  /*
1275  * We'll save private state in node->fdw_state.
1276  */
1277  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1278  node->fdw_state = (void *) fsstate;
1279 
1280  /*
1281  * Obtain the foreign server where to connect and user mapping to use for
1282  * connection. For base relations we obtain this information from
1283  * catalogs. For join relations, this information is frozen at the time of
1284  * planning to ensure that the join is safe to pushdown. In case the
1285  * information goes stale between planning and execution, plan will be
1286  * invalidated and replanned.
1287  */
1288  if (fsplan->scan.scanrelid > 0)
1289  {
1290  ForeignTable *table;
1291 
1292  /*
1293  * Identify which user to do the remote access as. This should match
1294  * what ExecCheckRTEPerms() does.
1295  */
1296  RangeTblEntry *rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
1297  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1298 
1299  fsstate->rel = node->ss.ss_currentRelation;
1300  table = GetForeignTable(RelationGetRelid(fsstate->rel));
1301 
1302  user = GetUserMapping(userid, table->serverid);
1303  }
1304  else
1305  {
1307 
1308  user = GetUserMappingById(umid);
1309  Assert(fsplan->fs_server == user->serverid);
1310  }
1311 
1312  /*
1313  * Get connection to the foreign server. Connection manager will
1314  * establish new connection if necessary.
1315  */
1316  fsstate->conn = GetConnection(user, false);
1317 
1318  /* Assign a unique ID for my cursor */
1319  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1320  fsstate->cursor_exists = false;
1321 
1322  /* Get private info created by planner functions. */
1323  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1325  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1327  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1329 
1330  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1331  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1332  "postgres_fdw tuple data",
1336  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1337  "postgres_fdw temporary data",
1341 
1342  /*
1343  * Get info we'll need for converting data fetched from the foreign server
1344  * into local representation and error reporting during that process.
1345  */
1346  if (fsplan->scan.scanrelid > 0)
1347  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1348  else
1349  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1350 
1351  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1352 
1353  /*
1354  * Prepare for processing of parameters used in remote query, if any.
1355  */
1356  numParams = list_length(fsplan->fdw_exprs);
1357  fsstate->numParams = numParams;
1358  if (numParams > 0)
1360  fsplan->fdw_exprs,
1361  numParams,
1362  &fsstate->param_flinfo,
1363  &fsstate->param_exprs,
1364  &fsstate->param_values);
1365 }
1366 
1367 /*
1368  * postgresIterateForeignScan
1369  * Retrieve next row from the result set, or clear tuple slot to indicate
1370  * EOF.
1371  */
1372 static TupleTableSlot *
1374 {
1375  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1376  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1377 
1378  /*
1379  * If this is the first call after Begin or ReScan, we need to create the
1380  * cursor on the remote side.
1381  */
1382  if (!fsstate->cursor_exists)
1383  create_cursor(node);
1384 
1385  /*
1386  * Get some more tuples, if we've run out.
1387  */
1388  if (fsstate->next_tuple >= fsstate->num_tuples)
1389  {
1390  /* No point in another fetch if we already detected EOF, though. */
1391  if (!fsstate->eof_reached)
1392  fetch_more_data(node);
1393  /* If we didn't get any tuples, must be end of data. */
1394  if (fsstate->next_tuple >= fsstate->num_tuples)
1395  return ExecClearTuple(slot);
1396  }
1397 
1398  /*
1399  * Return the next tuple.
1400  */
1401  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1402  slot,
1403  InvalidBuffer,
1404  false);
1405 
1406  return slot;
1407 }
1408 
1409 /*
1410  * postgresReScanForeignScan
1411  * Restart the scan.
1412  */
1413 static void
1415 {
1416  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1417  char sql[64];
1418  PGresult *res;
1419 
1420  /* If we haven't created the cursor yet, nothing to do. */
1421  if (!fsstate->cursor_exists)
1422  return;
1423 
1424  /*
1425  * If any internal parameters affecting this node have changed, we'd
1426  * better destroy and recreate the cursor. Otherwise, rewinding it should
1427  * be good enough. If we've only fetched zero or one batch, we needn't
1428  * even rewind the cursor, just rescan what we have.
1429  */
1430  if (node->ss.ps.chgParam != NULL)
1431  {
1432  fsstate->cursor_exists = false;
1433  snprintf(sql, sizeof(sql), "CLOSE c%u",
1434  fsstate->cursor_number);
1435  }
1436  else if (fsstate->fetch_ct_2 > 1)
1437  {
1438  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1439  fsstate->cursor_number);
1440  }
1441  else
1442  {
1443  /* Easy: just rescan what we already have in memory, if anything */
1444  fsstate->next_tuple = 0;
1445  return;
1446  }
1447 
1448  /*
1449  * We don't use a PG_TRY block here, so be careful not to throw error
1450  * without releasing the PGresult.
1451  */
1452  res = pgfdw_exec_query(fsstate->conn, sql);
1453  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1454  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1455  PQclear(res);
1456 
1457  /* Now force a fresh FETCH. */
1458  fsstate->tuples = NULL;
1459  fsstate->num_tuples = 0;
1460  fsstate->next_tuple = 0;
1461  fsstate->fetch_ct_2 = 0;
1462  fsstate->eof_reached = false;
1463 }
1464 
1465 /*
1466  * postgresEndForeignScan
1467  * Finish scanning foreign table and dispose objects used for this scan
1468  */
1469 static void
1471 {
1472  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1473 
1474  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1475  if (fsstate == NULL)
1476  return;
1477 
1478  /* Close the cursor if open, to prevent accumulation of cursors */
1479  if (fsstate->cursor_exists)
1480  close_cursor(fsstate->conn, fsstate->cursor_number);
1481 
1482  /* Release remote connection */
1483  ReleaseConnection(fsstate->conn);
1484  fsstate->conn = NULL;
1485 
1486  /* MemoryContexts will be deleted automatically. */
1487 }
1488 
1489 /*
1490  * postgresAddForeignUpdateTargets
1491  * Add resjunk column(s) needed for update/delete on a foreign table
1492  */
1493 static void
1495  RangeTblEntry *target_rte,
1496  Relation target_relation)
1497 {
1498  Var *var;
1499  const char *attrname;
1500  TargetEntry *tle;
1501 
1502  /*
1503  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1504  */
1505 
1506  /* Make a Var representing the desired value */
1507  var = makeVar(parsetree->resultRelation,
1509  TIDOID,
1510  -1,
1511  InvalidOid,
1512  0);
1513 
1514  /* Wrap it in a resjunk TLE with the right name ... */
1515  attrname = "ctid";
1516 
1517  tle = makeTargetEntry((Expr *) var,
1518  list_length(parsetree->targetList) + 1,
1519  pstrdup(attrname),
1520  true);
1521 
1522  /* ... and add it to the query's targetlist */
1523  parsetree->targetList = lappend(parsetree->targetList, tle);
1524 }
1525 
1526 /*
1527  * postgresPlanForeignModify
1528  * Plan an insert/update/delete operation on a foreign table
1529  */
1530 static List *
1532  ModifyTable *plan,
1533  Index resultRelation,
1534  int subplan_index)
1535 {
1536  CmdType operation = plan->operation;
1537  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1538  Relation rel;
1539  StringInfoData sql;
1540  List *targetAttrs = NIL;
1541  List *returningList = NIL;
1542  List *retrieved_attrs = NIL;
1543  bool doNothing = false;
1544 
1545  initStringInfo(&sql);
1546 
1547  /*
1548  * Core code already has some lock on each rel being planned, so we can
1549  * use NoLock here.
1550  */
1551  rel = heap_open(rte->relid, NoLock);
1552 
1553  /*
1554  * In an INSERT, we transmit all columns that are defined in the foreign
1555  * table. In an UPDATE, we transmit only columns that were explicitly
1556  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1557  * (We can't do that for INSERT since we would miss sending default values
1558  * for columns not listed in the source statement.)
1559  */
1560  if (operation == CMD_INSERT)
1561  {
1562  TupleDesc tupdesc = RelationGetDescr(rel);
1563  int attnum;
1564 
1565  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1566  {
1567  Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1568 
1569  if (!attr->attisdropped)
1570  targetAttrs = lappend_int(targetAttrs, attnum);
1571  }
1572  }
1573  else if (operation == CMD_UPDATE)
1574  {
1575  int col;
1576 
1577  col = -1;
1578  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1579  {
1580  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1582 
1583  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1584  elog(ERROR, "system-column update is not supported");
1585  targetAttrs = lappend_int(targetAttrs, attno);
1586  }
1587  }
1588 
1589  /*
1590  * Extract the relevant RETURNING list if any.
1591  */
1592  if (plan->returningLists)
1593  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1594 
1595  /*
1596  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1597  * should have already been rejected in the optimizer, as presently there
1598  * is no way to recognize an arbiter index on a foreign table. Only DO
1599  * NOTHING is supported without an inference specification.
1600  */
1601  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1602  doNothing = true;
1603  else if (plan->onConflictAction != ONCONFLICT_NONE)
1604  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1605  (int) plan->onConflictAction);
1606 
1607  /*
1608  * Construct the SQL command string.
1609  */
1610  switch (operation)
1611  {
1612  case CMD_INSERT:
1613  deparseInsertSql(&sql, root, resultRelation, rel,
1614  targetAttrs, doNothing, returningList,
1615  &retrieved_attrs);
1616  break;
1617  case CMD_UPDATE:
1618  deparseUpdateSql(&sql, root, resultRelation, rel,
1619  targetAttrs, returningList,
1620  &retrieved_attrs);
1621  break;
1622  case CMD_DELETE:
1623  deparseDeleteSql(&sql, root, resultRelation, rel,
1624  returningList,
1625  &retrieved_attrs);
1626  break;
1627  default:
1628  elog(ERROR, "unexpected operation: %d", (int) operation);
1629  break;
1630  }
1631 
1632  heap_close(rel, NoLock);
1633 
1634  /*
1635  * Build the fdw_private list that will be available to the executor.
1636  * Items in the list must match enum FdwModifyPrivateIndex, above.
1637  */
1638  return list_make4(makeString(sql.data),
1639  targetAttrs,
1640  makeInteger((retrieved_attrs != NIL)),
1641  retrieved_attrs);
1642 }
1643 
1644 /*
1645  * postgresBeginForeignModify
1646  * Begin an insert/update/delete operation on a foreign table
1647  */
1648 static void
1650  ResultRelInfo *resultRelInfo,
1651  List *fdw_private,
1652  int subplan_index,
1653  int eflags)
1654 {
1655  PgFdwModifyState *fmstate;
1656  EState *estate = mtstate->ps.state;
1657  CmdType operation = mtstate->operation;
1658  Relation rel = resultRelInfo->ri_RelationDesc;
1659  RangeTblEntry *rte;
1660  Oid userid;
1661  ForeignTable *table;
1662  UserMapping *user;
1663  AttrNumber n_params;
1664  Oid typefnoid;
1665  bool isvarlena;
1666  ListCell *lc;
1667 
1668  /*
1669  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1670  * stays NULL.
1671  */
1672  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1673  return;
1674 
1675  /* Begin constructing PgFdwModifyState. */
1676  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1677  fmstate->rel = rel;
1678 
1679  /*
1680  * Identify which user to do the remote access as. This should match what
1681  * ExecCheckRTEPerms() does.
1682  */
1683  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1684  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1685 
1686  /* Get info about foreign table. */
1687  table = GetForeignTable(RelationGetRelid(rel));
1688  user = GetUserMapping(userid, table->serverid);
1689 
1690  /* Open connection; report that we'll create a prepared statement. */
1691  fmstate->conn = GetConnection(user, true);
1692  fmstate->p_name = NULL; /* prepared statement not made yet */
1693 
1694  /* Deconstruct fdw_private data. */
1695  fmstate->query = strVal(list_nth(fdw_private,
1697  fmstate->target_attrs = (List *) list_nth(fdw_private,
1699  fmstate->has_returning = intVal(list_nth(fdw_private,
1701  fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1703 
1704  /* Create context for per-tuple temp workspace. */
1705  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1706  "postgres_fdw temporary data",
1710 
1711  /* Prepare for input conversion of RETURNING results. */
1712  if (fmstate->has_returning)
1714 
1715  /* Prepare for output conversion of parameters used in prepared stmt. */
1716  n_params = list_length(fmstate->target_attrs) + 1;
1717  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1718  fmstate->p_nums = 0;
1719 
1720  if (operation == CMD_UPDATE || operation == CMD_DELETE)
1721  {
1722  /* Find the ctid resjunk column in the subplan's result */
1723  Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1724 
1726  "ctid");
1727  if (!AttributeNumberIsValid(fmstate->ctidAttno))
1728  elog(ERROR, "could not find junk ctid column");
1729 
1730  /* First transmittable parameter will be ctid */
1731  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1732  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1733  fmstate->p_nums++;
1734  }
1735 
1736  if (operation == CMD_INSERT || operation == CMD_UPDATE)
1737  {
1738  /* Set up for remaining transmittable parameters */
1739  foreach(lc, fmstate->target_attrs)
1740  {
1741  int attnum = lfirst_int(lc);
1742  Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1743 
1744  Assert(!attr->attisdropped);
1745 
1746  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1747  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1748  fmstate->p_nums++;
1749  }
1750  }
1751 
1752  Assert(fmstate->p_nums <= n_params);
1753 
1754  resultRelInfo->ri_FdwState = fmstate;
1755 }
1756 
1757 /*
1758  * postgresExecForeignInsert
1759  * Insert one row into a foreign table
1760  */
1761 static TupleTableSlot *
1763  ResultRelInfo *resultRelInfo,
1764  TupleTableSlot *slot,
1765  TupleTableSlot *planSlot)
1766 {
1767  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1768  const char **p_values;
1769  PGresult *res;
1770  int n_rows;
1771 
1772  /* Set up the prepared statement on the remote server, if we didn't yet */
1773  if (!fmstate->p_name)
1774  prepare_foreign_modify(fmstate);
1775 
1776  /* Convert parameters needed by prepared statement to text form */
1777  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1778 
1779  /*
1780  * Execute the prepared statement.
1781  */
1782  if (!PQsendQueryPrepared(fmstate->conn,
1783  fmstate->p_name,
1784  fmstate->p_nums,
1785  p_values,
1786  NULL,
1787  NULL,
1788  0))
1789  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1790 
1791  /*
1792  * Get the result, and check for success.
1793  *
1794  * We don't use a PG_TRY block here, so be careful not to throw error
1795  * without releasing the PGresult.
1796  */
1797  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1798  if (PQresultStatus(res) !=
1800  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1801 
1802  /* Check number of rows affected, and fetch RETURNING tuple if any */
1803  if (fmstate->has_returning)
1804  {
1805  n_rows = PQntuples(res);
1806  if (n_rows > 0)
1807  store_returning_result(fmstate, slot, res);
1808  }
1809  else
1810  n_rows = atoi(PQcmdTuples(res));
1811 
1812  /* And clean up */
1813  PQclear(res);
1814 
1815  MemoryContextReset(fmstate->temp_cxt);
1816 
1817  /* Return NULL if nothing was inserted on the remote end */
1818  return (n_rows > 0) ? slot : NULL;
1819 }
1820 
1821 /*
1822  * postgresExecForeignUpdate
1823  * Update one row in a foreign table
1824  */
1825 static TupleTableSlot *
1827  ResultRelInfo *resultRelInfo,
1828  TupleTableSlot *slot,
1829  TupleTableSlot *planSlot)
1830 {
1831  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1832  Datum datum;
1833  bool isNull;
1834  const char **p_values;
1835  PGresult *res;
1836  int n_rows;
1837 
1838  /* Set up the prepared statement on the remote server, if we didn't yet */
1839  if (!fmstate->p_name)
1840  prepare_foreign_modify(fmstate);
1841 
1842  /* Get the ctid that was passed up as a resjunk column */
1843  datum = ExecGetJunkAttribute(planSlot,
1844  fmstate->ctidAttno,
1845  &isNull);
1846  /* shouldn't ever get a null result... */
1847  if (isNull)
1848  elog(ERROR, "ctid is NULL");
1849 
1850  /* Convert parameters needed by prepared statement to text form */
1851  p_values = convert_prep_stmt_params(fmstate,
1852  (ItemPointer) DatumGetPointer(datum),
1853  slot);
1854 
1855  /*
1856  * Execute the prepared statement.
1857  */
1858  if (!PQsendQueryPrepared(fmstate->conn,
1859  fmstate->p_name,
1860  fmstate->p_nums,
1861  p_values,
1862  NULL,
1863  NULL,
1864  0))
1865  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1866 
1867  /*
1868  * Get the result, and check for success.
1869  *
1870  * We don't use a PG_TRY block here, so be careful not to throw error
1871  * without releasing the PGresult.
1872  */
1873  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1874  if (PQresultStatus(res) !=
1876  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1877 
1878  /* Check number of rows affected, and fetch RETURNING tuple if any */
1879  if (fmstate->has_returning)
1880  {
1881  n_rows = PQntuples(res);
1882  if (n_rows > 0)
1883  store_returning_result(fmstate, slot, res);
1884  }
1885  else
1886  n_rows = atoi(PQcmdTuples(res));
1887 
1888  /* And clean up */
1889  PQclear(res);
1890 
1891  MemoryContextReset(fmstate->temp_cxt);
1892 
1893  /* Return NULL if nothing was updated on the remote end */
1894  return (n_rows > 0) ? slot : NULL;
1895 }
1896 
1897 /*
1898  * postgresExecForeignDelete
1899  * Delete one row from a foreign table
1900  */
1901 static TupleTableSlot *
1903  ResultRelInfo *resultRelInfo,
1904  TupleTableSlot *slot,
1905  TupleTableSlot *planSlot)
1906 {
1907  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1908  Datum datum;
1909  bool isNull;
1910  const char **p_values;
1911  PGresult *res;
1912  int n_rows;
1913 
1914  /* Set up the prepared statement on the remote server, if we didn't yet */
1915  if (!fmstate->p_name)
1916  prepare_foreign_modify(fmstate);
1917 
1918  /* Get the ctid that was passed up as a resjunk column */
1919  datum = ExecGetJunkAttribute(planSlot,
1920  fmstate->ctidAttno,
1921  &isNull);
1922  /* shouldn't ever get a null result... */
1923  if (isNull)
1924  elog(ERROR, "ctid is NULL");
1925 
1926  /* Convert parameters needed by prepared statement to text form */
1927  p_values = convert_prep_stmt_params(fmstate,
1928  (ItemPointer) DatumGetPointer(datum),
1929  NULL);
1930 
1931  /*
1932  * Execute the prepared statement.
1933  */
1934  if (!PQsendQueryPrepared(fmstate->conn,
1935  fmstate->p_name,
1936  fmstate->p_nums,
1937  p_values,
1938  NULL,
1939  NULL,
1940  0))
1941  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1942 
1943  /*
1944  * Get the result, and check for success.
1945  *
1946  * We don't use a PG_TRY block here, so be careful not to throw error
1947  * without releasing the PGresult.
1948  */
1949  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1950  if (PQresultStatus(res) !=
1952  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1953 
1954  /* Check number of rows affected, and fetch RETURNING tuple if any */
1955  if (fmstate->has_returning)
1956  {
1957  n_rows = PQntuples(res);
1958  if (n_rows > 0)
1959  store_returning_result(fmstate, slot, res);
1960  }
1961  else
1962  n_rows = atoi(PQcmdTuples(res));
1963 
1964  /* And clean up */
1965  PQclear(res);
1966 
1967  MemoryContextReset(fmstate->temp_cxt);
1968 
1969  /* Return NULL if nothing was deleted on the remote end */
1970  return (n_rows > 0) ? slot : NULL;
1971 }
1972 
1973 /*
1974  * postgresEndForeignModify
1975  * Finish an insert/update/delete operation on a foreign table
1976  */
1977 static void
1979  ResultRelInfo *resultRelInfo)
1980 {
1981  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1982 
1983  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1984  if (fmstate == NULL)
1985  return;
1986 
1987  /* If we created a prepared statement, destroy it */
1988  if (fmstate->p_name)
1989  {
1990  char sql[64];
1991  PGresult *res;
1992 
1993  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
1994 
1995  /*
1996  * We don't use a PG_TRY block here, so be careful not to throw error
1997  * without releasing the PGresult.
1998  */
1999  res = pgfdw_exec_query(fmstate->conn, sql);
2000  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2001  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2002  PQclear(res);
2003  fmstate->p_name = NULL;
2004  }
2005 
2006  /* Release remote connection */
2007  ReleaseConnection(fmstate->conn);
2008  fmstate->conn = NULL;
2009 }
2010 
2011 /*
2012  * postgresIsForeignRelUpdatable
2013  * Determine whether a foreign table supports INSERT, UPDATE and/or
2014  * DELETE.
2015  */
2016 static int
2018 {
2019  bool updatable;
2020  ForeignTable *table;
2021  ForeignServer *server;
2022  ListCell *lc;
2023 
2024  /*
2025  * By default, all postgres_fdw foreign tables are assumed updatable. This
2026  * can be overridden by a per-server setting, which in turn can be
2027  * overridden by a per-table setting.
2028  */
2029  updatable = true;
2030 
2031  table = GetForeignTable(RelationGetRelid(rel));
2032  server = GetForeignServer(table->serverid);
2033 
2034  foreach(lc, server->options)
2035  {
2036  DefElem *def = (DefElem *) lfirst(lc);
2037 
2038  if (strcmp(def->defname, "updatable") == 0)
2039  updatable = defGetBoolean(def);
2040  }
2041  foreach(lc, table->options)
2042  {
2043  DefElem *def = (DefElem *) lfirst(lc);
2044 
2045  if (strcmp(def->defname, "updatable") == 0)
2046  updatable = defGetBoolean(def);
2047  }
2048 
2049  /*
2050  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2051  */
2052  return updatable ?
2053  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2054 }
2055 
2056 /*
2057  * postgresRecheckForeignScan
2058  * Execute a local join execution plan for a foreign join
2059  */
2060 static bool
2062 {
2063  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2065  TupleTableSlot *result;
2066 
2067  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2068  if (scanrelid > 0)
2069  return true;
2070 
2071  Assert(outerPlan != NULL);
2072 
2073  /* Execute a local join execution plan */
2074  result = ExecProcNode(outerPlan);
2075  if (TupIsNull(result))
2076  return false;
2077 
2078  /* Store result in the given slot */
2079  ExecCopySlot(slot, result);
2080 
2081  return true;
2082 }
2083 
2084 /*
2085  * postgresPlanDirectModify
2086  * Consider a direct foreign table modification
2087  *
2088  * Decide whether it is safe to modify a foreign table directly, and if so,
2089  * rewrite subplan accordingly.
2090  */
2091 static bool
2093  ModifyTable *plan,
2094  Index resultRelation,
2095  int subplan_index)
2096 {
2097  CmdType operation = plan->operation;
2098  Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
2099  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
2100  Relation rel;
2101  StringInfoData sql;
2102  ForeignScan *fscan;
2103  List *targetAttrs = NIL;
2104  List *remote_conds;
2105  List *params_list = NIL;
2106  List *returningList = NIL;
2107  List *retrieved_attrs = NIL;
2108 
2109  /*
2110  * Decide whether it is safe to modify a foreign table directly.
2111  */
2112 
2113  /*
2114  * The table modification must be an UPDATE or DELETE.
2115  */
2116  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2117  return false;
2118 
2119  /*
2120  * It's unsafe to modify a foreign table directly if there are any local
2121  * joins needed.
2122  */
2123  if (!IsA(subplan, ForeignScan))
2124  return false;
2125 
2126  /*
2127  * It's unsafe to modify a foreign table directly if there are any quals
2128  * that should be evaluated locally.
2129  */
2130  if (subplan->qual != NIL)
2131  return false;
2132 
2133  /*
2134  * We can't handle an UPDATE or DELETE on a foreign join for now.
2135  */
2136  fscan = (ForeignScan *) subplan;
2137  if (fscan->scan.scanrelid == 0)
2138  return false;
2139 
2140  /*
2141  * It's unsafe to update a foreign table directly, if any expressions to
2142  * assign to the target columns are unsafe to evaluate remotely.
2143  */
2144  if (operation == CMD_UPDATE)
2145  {
2146  RelOptInfo *baserel = root->simple_rel_array[resultRelation];
2147  int col;
2148 
2149  /*
2150  * We transmit only columns that were explicitly targets of the
2151  * UPDATE, so as to avoid unnecessary data transmission.
2152  */
2153  col = -1;
2154  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2155  {
2156  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2158  TargetEntry *tle;
2159 
2160  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2161  elog(ERROR, "system-column update is not supported");
2162 
2163  tle = get_tle_by_resno(subplan->targetlist, attno);
2164 
2165  if (!tle)
2166  elog(ERROR, "attribute number %d not found in subplan targetlist",
2167  attno);
2168 
2169  if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
2170  return false;
2171 
2172  targetAttrs = lappend_int(targetAttrs, attno);
2173  }
2174  }
2175 
2176  /*
2177  * Ok, rewrite subplan so as to modify the foreign table directly.
2178  */
2179  initStringInfo(&sql);
2180 
2181  /*
2182  * Core code already has some lock on each rel being planned, so we can
2183  * use NoLock here.
2184  */
2185  rel = heap_open(rte->relid, NoLock);
2186 
2187  /*
2188  * Extract the baserestrictinfo clauses that can be evaluated remotely.
2189  */
2190  remote_conds = (List *) list_nth(fscan->fdw_private,
2192 
2193  /*
2194  * Extract the relevant RETURNING list if any.
2195  */
2196  if (plan->returningLists)
2197  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2198 
2199  /*
2200  * Construct the SQL command string.
2201  */
2202  switch (operation)
2203  {
2204  case CMD_UPDATE:
2205  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2206  ((Plan *) fscan)->targetlist,
2207  targetAttrs,
2208  remote_conds, &params_list,
2209  returningList, &retrieved_attrs);
2210  break;
2211  case CMD_DELETE:
2212  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2213  remote_conds, &params_list,
2214  returningList, &retrieved_attrs);
2215  break;
2216  default:
2217  elog(ERROR, "unexpected operation: %d", (int) operation);
2218  break;
2219  }
2220 
2221  /*
2222  * Update the operation info.
2223  */
2224  fscan->operation = operation;
2225 
2226  /*
2227  * Update the fdw_exprs list that will be available to the executor.
2228  */
2229  fscan->fdw_exprs = params_list;
2230 
2231  /*
2232  * Update the fdw_private list that will be available to the executor.
2233  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2234  */
2235  fscan->fdw_private = list_make4(makeString(sql.data),
2236  makeInteger((retrieved_attrs != NIL)),
2237  retrieved_attrs,
2238  makeInteger(plan->canSetTag));
2239 
2240  heap_close(rel, NoLock);
2241  return true;
2242 }
2243 
2244 /*
2245  * postgresBeginDirectModify
2246  * Prepare a direct foreign table modification
2247  */
2248 static void
2250 {
2251  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2252  EState *estate = node->ss.ps.state;
2253  PgFdwDirectModifyState *dmstate;
2254  RangeTblEntry *rte;
2255  Oid userid;
2256  ForeignTable *table;
2257  UserMapping *user;
2258  int numParams;
2259 
2260  /*
2261  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2262  */
2263  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2264  return;
2265 
2266  /*
2267  * We'll save private state in node->fdw_state.
2268  */
2269  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2270  node->fdw_state = (void *) dmstate;
2271 
2272  /*
2273  * Identify which user to do the remote access as. This should match what
2274  * ExecCheckRTEPerms() does.
2275  */
2276  rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2277  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2278 
2279  /* Get info about foreign table. */
2280  dmstate->rel = node->ss.ss_currentRelation;
2281  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2282  user = GetUserMapping(userid, table->serverid);
2283 
2284  /*
2285  * Get connection to the foreign server. Connection manager will
2286  * establish new connection if necessary.
2287  */
2288  dmstate->conn = GetConnection(user, false);
2289 
2290  /* Initialize state variable */
2291  dmstate->num_tuples = -1; /* -1 means not set yet */
2292 
2293  /* Get private info created by planner functions. */
2294  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2296  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2298  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2300  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2302 
2303  /* Create context for per-tuple temp workspace. */
2304  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2305  "postgres_fdw temporary data",
2309 
2310  /* Prepare for input conversion of RETURNING results. */
2311  if (dmstate->has_returning)
2312  dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2313 
2314  /*
2315  * Prepare for processing of parameters used in remote query, if any.
2316  */
2317  numParams = list_length(fsplan->fdw_exprs);
2318  dmstate->numParams = numParams;
2319  if (numParams > 0)
2321  fsplan->fdw_exprs,
2322  numParams,
2323  &dmstate->param_flinfo,
2324  &dmstate->param_exprs,
2325  &dmstate->param_values);
2326 }
2327 
2328 /*
2329  * postgresIterateDirectModify
2330  * Execute a direct foreign table modification
2331  */
2332 static TupleTableSlot *
2334 {
2336  EState *estate = node->ss.ps.state;
2337  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2338 
2339  /*
2340  * If this is the first call after Begin, execute the statement.
2341  */
2342  if (dmstate->num_tuples == -1)
2343  execute_dml_stmt(node);
2344 
2345  /*
2346  * If the local query doesn't specify RETURNING, just clear tuple slot.
2347  */
2348  if (!resultRelInfo->ri_projectReturning)
2349  {
2350  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2351  Instrumentation *instr = node->ss.ps.instrument;
2352 
2353  Assert(!dmstate->has_returning);
2354 
2355  /* Increment the command es_processed count if necessary. */
2356  if (dmstate->set_processed)
2357  estate->es_processed += dmstate->num_tuples;
2358 
2359  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2360  if (instr)
2361  instr->tuplecount += dmstate->num_tuples;
2362 
2363  return ExecClearTuple(slot);
2364  }
2365 
2366  /*
2367  * Get the next RETURNING tuple.
2368  */
2369  return get_returning_data(node);
2370 }
2371 
2372 /*
2373  * postgresEndDirectModify
2374  * Finish a direct foreign table modification
2375  */
2376 static void
2378 {
2380 
2381  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2382  if (dmstate == NULL)
2383  return;
2384 
2385  /* Release PGresult */
2386  if (dmstate->result)
2387  PQclear(dmstate->result);
2388 
2389  /* Release remote connection */
2390  ReleaseConnection(dmstate->conn);
2391  dmstate->conn = NULL;
2392 
2393  /* MemoryContext will be deleted automatically. */
2394 }
2395 
2396 /*
2397  * postgresExplainForeignScan
2398  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2399  */
2400 static void
2402 {
2403  List *fdw_private;
2404  char *sql;
2405  char *relations;
2406 
2407  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2408 
2409  /*
2410  * Add names of relation handled by the foreign scan when the scan is a
2411  * join
2412  */
2413  if (list_length(fdw_private) > FdwScanPrivateRelations)
2414  {
2415  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2416  ExplainPropertyText("Relations", relations, es);
2417  }
2418 
2419  /*
2420  * Add remote query, when VERBOSE option is specified.
2421  */
2422  if (es->verbose)
2423  {
2424  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2425  ExplainPropertyText("Remote SQL", sql, es);
2426  }
2427 }
2428 
2429 /*
2430  * postgresExplainForeignModify
2431  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2432  */
2433 static void
2435  ResultRelInfo *rinfo,
2436  List *fdw_private,
2437  int subplan_index,
2438  ExplainState *es)
2439 {
2440  if (es->verbose)
2441  {
2442  char *sql = strVal(list_nth(fdw_private,
2444 
2445  ExplainPropertyText("Remote SQL", sql, es);
2446  }
2447 }
2448 
2449 /*
2450  * postgresExplainDirectModify
2451  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2452  * foreign table directly
2453  */
2454 static void
2456 {
2457  List *fdw_private;
2458  char *sql;
2459 
2460  if (es->verbose)
2461  {
2462  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2463  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2464  ExplainPropertyText("Remote SQL", sql, es);
2465  }
2466 }
2467 
2468 
2469 /*
2470  * estimate_path_cost_size
2471  * Get cost and size estimates for a foreign scan on given foreign relation
2472  * either a base relation or a join between foreign relations.
2473  *
2474  * param_join_conds are the parameterization clauses with outer relations.
2475  * pathkeys specify the expected sort order if any for given path being costed.
2476  *
2477  * The function returns the cost and size estimates in p_row, p_width,
2478  * p_startup_cost and p_total_cost variables.
2479  */
2480 static void
2482  RelOptInfo *foreignrel,
2483  List *param_join_conds,
2484  List *pathkeys,
2485  double *p_rows, int *p_width,
2486  Cost *p_startup_cost, Cost *p_total_cost)
2487 {
2488  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2489  double rows;
2490  double retrieved_rows;
2491  int width;
2492  Cost startup_cost;
2493  Cost total_cost;
2494  Cost cpu_per_tuple;
2495 
2496  /*
2497  * If the table or the server is configured to use remote estimates,
2498  * connect to the foreign server and execute EXPLAIN to estimate the
2499  * number of rows selected by the restriction+join clauses. Otherwise,
2500  * estimate rows using whatever statistics we have locally, in a way
2501  * similar to ordinary tables.
2502  */
2503  if (fpinfo->use_remote_estimate)
2504  {
2505  List *remote_param_join_conds;
2506  List *local_param_join_conds;
2507  StringInfoData sql;
2508  PGconn *conn;
2509  Selectivity local_sel;
2510  QualCost local_cost;
2511  List *fdw_scan_tlist = NIL;
2512  List *remote_conds;
2513 
2514  /* Required only to be passed to deparseSelectStmtForRel */
2515  List *retrieved_attrs;
2516 
2517  /*
2518  * param_join_conds might contain both clauses that are safe to send
2519  * across, and clauses that aren't.
2520  */
2521  classifyConditions(root, foreignrel, param_join_conds,
2522  &remote_param_join_conds, &local_param_join_conds);
2523 
2524  /* Build the list of columns to be fetched from the foreign server. */
2525  if (foreignrel->reloptkind == RELOPT_JOINREL)
2526  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2527  else
2528  fdw_scan_tlist = NIL;
2529 
2530  /*
2531  * The complete list of remote conditions includes everything from
2532  * baserestrictinfo plus any extra join_conds relevant to this
2533  * particular path.
2534  */
2535  remote_conds = list_concat(list_copy(remote_param_join_conds),
2536  fpinfo->remote_conds);
2537 
2538  /*
2539  * Construct EXPLAIN query including the desired SELECT, FROM, and
2540  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2541  * values, so don't request params_list.
2542  */
2543  initStringInfo(&sql);
2544  appendStringInfoString(&sql, "EXPLAIN ");
2545  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2546  remote_conds, pathkeys, &retrieved_attrs,
2547  NULL);
2548 
2549  /* Get the remote estimate */
2550  conn = GetConnection(fpinfo->user, false);
2551  get_remote_estimate(sql.data, conn, &rows, &width,
2552  &startup_cost, &total_cost);
2553  ReleaseConnection(conn);
2554 
2555  retrieved_rows = rows;
2556 
2557  /* Factor in the selectivity of the locally-checked quals */
2558  local_sel = clauselist_selectivity(root,
2559  local_param_join_conds,
2560  foreignrel->relid,
2561  JOIN_INNER,
2562  NULL);
2563  local_sel *= fpinfo->local_conds_sel;
2564 
2565  rows = clamp_row_est(rows * local_sel);
2566 
2567  /* Add in the eval cost of the locally-checked quals */
2568  startup_cost += fpinfo->local_conds_cost.startup;
2569  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2570  cost_qual_eval(&local_cost, local_param_join_conds, root);
2571  startup_cost += local_cost.startup;
2572  total_cost += local_cost.per_tuple * retrieved_rows;
2573  }
2574  else
2575  {
2576  Cost run_cost = 0;
2577 
2578  /*
2579  * We don't support join conditions in this mode (hence, no
2580  * parameterized paths can be made).
2581  */
2582  Assert(param_join_conds == NIL);
2583 
2584  /*
2585  * Use rows/width estimates made by set_baserel_size_estimates() for
2586  * base foreign relations and set_joinrel_size_estimates() for join
2587  * between foreign relations.
2588  */
2589  rows = foreignrel->rows;
2590  width = foreignrel->reltarget->width;
2591 
2592  /* Back into an estimate of the number of retrieved rows. */
2593  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2594 
2595  /*
2596  * We will come here again and again with different set of pathkeys
2597  * that caller wants to cost. We don't need to calculate the cost of
2598  * bare scan each time. Instead, use the costs if we have cached them
2599  * already.
2600  */
2601  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2602  {
2603  startup_cost = fpinfo->rel_startup_cost;
2604  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2605  }
2606  else if (foreignrel->reloptkind != RELOPT_JOINREL)
2607  {
2608  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2609  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2610 
2611  /*
2612  * Cost as though this were a seqscan, which is pessimistic. We
2613  * effectively imagine the local_conds are being evaluated
2614  * remotely, too.
2615  */
2616  startup_cost = 0;
2617  run_cost = 0;
2618  run_cost += seq_page_cost * foreignrel->pages;
2619 
2620  startup_cost += foreignrel->baserestrictcost.startup;
2621  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2622  run_cost += cpu_per_tuple * foreignrel->tuples;
2623  }
2624  else
2625  {
2626  PgFdwRelationInfo *fpinfo_i;
2627  PgFdwRelationInfo *fpinfo_o;
2628  QualCost join_cost;
2629  QualCost remote_conds_cost;
2630  double nrows;
2631 
2632  /* For join we expect inner and outer relations set */
2633  Assert(fpinfo->innerrel && fpinfo->outerrel);
2634 
2635  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2636  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2637 
2638  /* Estimate of number of rows in cross product */
2639  nrows = fpinfo_i->rows * fpinfo_o->rows;
2640  /* Clamp retrieved rows estimate to at most size of cross product */
2641  retrieved_rows = Min(retrieved_rows, nrows);
2642 
2643  /*
2644  * The cost of foreign join is estimated as cost of generating
2645  * rows for the joining relations + cost for applying quals on the
2646  * rows.
2647  */
2648 
2649  /* Calculate the cost of clauses pushed down the foreign server */
2650  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2651  /* Calculate the cost of applying join clauses */
2652  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2653 
2654  /*
2655  * Startup cost includes startup cost of joining relations and the
2656  * startup cost for join and other clauses. We do not include the
2657  * startup cost specific to join strategy (e.g. setting up hash
2658  * tables) since we do not know what strategy the foreign server
2659  * is going to use.
2660  */
2661  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2662  startup_cost += join_cost.startup;
2663  startup_cost += remote_conds_cost.startup;
2664  startup_cost += fpinfo->local_conds_cost.startup;
2665 
2666  /*
2667  * Run time cost includes:
2668  *
2669  * 1. Run time cost (total_cost - startup_cost) of relations being
2670  * joined
2671  *
2672  * 2. Run time cost of applying join clauses on the cross product
2673  * of the joining relations.
2674  *
2675  * 3. Run time cost of applying pushed down other clauses on the
2676  * result of join
2677  *
2678  * 4. Run time cost of applying nonpushable other clauses locally
2679  * on the result fetched from the foreign server.
2680  */
2681  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2682  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2683  run_cost += nrows * join_cost.per_tuple;
2684  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2685  run_cost += nrows * remote_conds_cost.per_tuple;
2686  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2687  }
2688 
2689  /*
2690  * Without remote estimates, we have no real way to estimate the cost
2691  * of generating sorted output. It could be free if the query plan
2692  * the remote side would have chosen generates properly-sorted output
2693  * anyway, but in most cases it will cost something. Estimate a value
2694  * high enough that we won't pick the sorted path when the ordering
2695  * isn't locally useful, but low enough that we'll err on the side of
2696  * pushing down the ORDER BY clause when it's useful to do so.
2697  */
2698  if (pathkeys != NIL)
2699  {
2700  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2701  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2702  }
2703 
2704  total_cost = startup_cost + run_cost;
2705  }
2706 
2707  /*
2708  * Cache the costs for scans without any pathkeys or parameterization
2709  * before adding the costs for transferring data from the foreign server.
2710  * These costs are useful for costing the join between this relation and
2711  * another foreign relation or to calculate the costs of paths with
2712  * pathkeys for this relation, when the costs can not be obtained from the
2713  * foreign server. This function will be called at least once for every
2714  * foreign relation without pathkeys and parameterization.
2715  */
2716  if (pathkeys == NIL && param_join_conds == NIL)
2717  {
2718  fpinfo->rel_startup_cost = startup_cost;
2719  fpinfo->rel_total_cost = total_cost;
2720  }
2721 
2722  /*
2723  * Add some additional cost factors to account for connection overhead
2724  * (fdw_startup_cost), transferring data across the network
2725  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2726  * (cpu_tuple_cost per retrieved row).
2727  */
2728  startup_cost += fpinfo->fdw_startup_cost;
2729  total_cost += fpinfo->fdw_startup_cost;
2730  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2731  total_cost += cpu_tuple_cost * retrieved_rows;
2732 
2733  /* Return results. */
2734  *p_rows = rows;
2735  *p_width = width;
2736  *p_startup_cost = startup_cost;
2737  *p_total_cost = total_cost;
2738 }
2739 
2740 /*
2741  * Estimate costs of executing a SQL statement remotely.
2742  * The given "sql" must be an EXPLAIN command.
2743  */
2744 static void
2745 get_remote_estimate(const char *sql, PGconn *conn,
2746  double *rows, int *width,
2747  Cost *startup_cost, Cost *total_cost)
2748 {
2749  PGresult *volatile res = NULL;
2750 
2751  /* PGresult must be released before leaving this function. */
2752  PG_TRY();
2753  {
2754  char *line;
2755  char *p;
2756  int n;
2757 
2758  /*
2759  * Execute EXPLAIN remotely.
2760  */
2761  res = pgfdw_exec_query(conn, sql);
2762  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2763  pgfdw_report_error(ERROR, res, conn, false, sql);
2764 
2765  /*
2766  * Extract cost numbers for topmost plan node. Note we search for a
2767  * left paren from the end of the line to avoid being confused by
2768  * other uses of parentheses.
2769  */
2770  line = PQgetvalue(res, 0, 0);
2771  p = strrchr(line, '(');
2772  if (p == NULL)
2773  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2774  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2775  startup_cost, total_cost, rows, width);
2776  if (n != 4)
2777  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2778 
2779  PQclear(res);
2780  res = NULL;
2781  }
2782  PG_CATCH();
2783  {
2784  if (res)
2785  PQclear(res);
2786  PG_RE_THROW();
2787  }
2788  PG_END_TRY();
2789 }
2790 
2791 /*
2792  * Detect whether we want to process an EquivalenceClass member.
2793  *
2794  * This is a callback for use by generate_implied_equalities_for_column.
2795  */
2796 static bool
2799  void *arg)
2800 {
2802  Expr *expr = em->em_expr;
2803 
2804  /*
2805  * If we've identified what we're processing in the current scan, we only
2806  * want to match that expression.
2807  */
2808  if (state->current != NULL)
2809  return equal(expr, state->current);
2810 
2811  /*
2812  * Otherwise, ignore anything we've already processed.
2813  */
2814  if (list_member(state->already_used, expr))
2815  return false;
2816 
2817  /* This is the new target to process. */
2818  state->current = expr;
2819  return true;
2820 }
2821 
2822 /*
2823  * Create cursor for node's query with current parameter values.
2824  */
2825 static void
2827 {
2828  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2829  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2830  int numParams = fsstate->numParams;
2831  const char **values = fsstate->param_values;
2832  PGconn *conn = fsstate->conn;
2834  PGresult *res;
2835 
2836  /*
2837  * Construct array of query parameter values in text format. We do the
2838  * conversions in the short-lived per-tuple context, so as not to cause a
2839  * memory leak over repeated scans.
2840  */
2841  if (numParams > 0)
2842  {
2843  MemoryContext oldcontext;
2844 
2845  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2846 
2847  process_query_params(econtext,
2848  fsstate->param_flinfo,
2849  fsstate->param_exprs,
2850  values);
2851 
2852  MemoryContextSwitchTo(oldcontext);
2853  }
2854 
2855  /* Construct the DECLARE CURSOR command */
2856  initStringInfo(&buf);
2857  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2858  fsstate->cursor_number, fsstate->query);
2859 
2860  /*
2861  * Notice that we pass NULL for paramTypes, thus forcing the remote server
2862  * to infer types for all parameters. Since we explicitly cast every
2863  * parameter (see deparse.c), the "inference" is trivial and will produce
2864  * the desired result. This allows us to avoid assuming that the remote
2865  * server has the same OIDs we do for the parameters' types.
2866  */
2867  if (!PQsendQueryParams(conn, buf.data, numParams,
2868  NULL, values, NULL, NULL, 0))
2869  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2870 
2871  /*
2872  * Get the result, and check for success.
2873  *
2874  * We don't use a PG_TRY block here, so be careful not to throw error
2875  * without releasing the PGresult.
2876  */
2877  res = pgfdw_get_result(conn, buf.data);
2878  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2879  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2880  PQclear(res);
2881 
2882  /* Mark the cursor as created, and show no tuples have been retrieved */
2883  fsstate->cursor_exists = true;
2884  fsstate->tuples = NULL;
2885  fsstate->num_tuples = 0;
2886  fsstate->next_tuple = 0;
2887  fsstate->fetch_ct_2 = 0;
2888  fsstate->eof_reached = false;
2889 
2890  /* Clean up */
2891  pfree(buf.data);
2892 }
2893 
2894 /*
2895  * Fetch some more rows from the node's cursor.
2896  */
2897 static void
2899 {
2900  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2901  PGresult *volatile res = NULL;
2902  MemoryContext oldcontext;
2903 
2904  /*
2905  * We'll store the tuples in the batch_cxt. First, flush the previous
2906  * batch.
2907  */
2908  fsstate->tuples = NULL;
2909  MemoryContextReset(fsstate->batch_cxt);
2910  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
2911 
2912  /* PGresult must be released before leaving this function. */
2913  PG_TRY();
2914  {
2915  PGconn *conn = fsstate->conn;
2916  char sql[64];
2917  int numrows;
2918  int i;
2919 
2920  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
2921  fsstate->fetch_size, fsstate->cursor_number);
2922 
2923  res = pgfdw_exec_query(conn, sql);
2924  /* On error, report the original query, not the FETCH. */
2925  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2926  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
2927 
2928  /* Convert the data into HeapTuples */
2929  numrows = PQntuples(res);
2930  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
2931  fsstate->num_tuples = numrows;
2932  fsstate->next_tuple = 0;
2933 
2934  for (i = 0; i < numrows; i++)
2935  {
2936  Assert(IsA(node->ss.ps.plan, ForeignScan));
2937 
2938  fsstate->tuples[i] =
2940  fsstate->rel,
2941  fsstate->attinmeta,
2942  fsstate->retrieved_attrs,
2943  node,
2944  fsstate->temp_cxt);
2945  }
2946 
2947  /* Update fetch_ct_2 */
2948  if (fsstate->fetch_ct_2 < 2)
2949  fsstate->fetch_ct_2++;
2950 
2951  /* Must be EOF if we didn't get as many tuples as we asked for. */
2952  fsstate->eof_reached = (numrows < fsstate->fetch_size);
2953 
2954  PQclear(res);
2955  res = NULL;
2956  }
2957  PG_CATCH();
2958  {
2959  if (res)
2960  PQclear(res);
2961  PG_RE_THROW();
2962  }
2963  PG_END_TRY();
2964 
2965  MemoryContextSwitchTo(oldcontext);
2966 }
2967 
2968 /*
2969  * Force assorted GUC parameters to settings that ensure that we'll output
2970  * data values in a form that is unambiguous to the remote server.
2971  *
2972  * This is rather expensive and annoying to do once per row, but there's
2973  * little choice if we want to be sure values are transmitted accurately;
2974  * we can't leave the settings in place between rows for fear of affecting
2975  * user-visible computations.
2976  *
2977  * We use the equivalent of a function SET option to allow the settings to
2978  * persist only until the caller calls reset_transmission_modes(). If an
2979  * error is thrown in between, guc.c will take care of undoing the settings.
2980  *
2981  * The return value is the nestlevel that must be passed to
2982  * reset_transmission_modes() to undo things.
2983  */
2984 int
2986 {
2987  int nestlevel = NewGUCNestLevel();
2988 
2989  /*
2990  * The values set here should match what pg_dump does. See also
2991  * configure_remote_session in connection.c.
2992  */
2993  if (DateStyle != USE_ISO_DATES)
2994  (void) set_config_option("datestyle", "ISO",
2996  GUC_ACTION_SAVE, true, 0, false);
2998  (void) set_config_option("intervalstyle", "postgres",
3000  GUC_ACTION_SAVE, true, 0, false);
3001  if (extra_float_digits < 3)
3002  (void) set_config_option("extra_float_digits", "3",
3004  GUC_ACTION_SAVE, true, 0, false);
3005 
3006  return nestlevel;
3007 }
3008 
3009 /*
3010  * Undo the effects of set_transmission_modes().
3011  */
3012 void
3014 {
3015  AtEOXact_GUC(true, nestlevel);
3016 }
3017 
3018 /*
3019  * Utility routine to close a cursor.
3020  */
3021 static void
3023 {
3024  char sql[64];
3025  PGresult *res;
3026 
3027  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3028 
3029  /*
3030  * We don't use a PG_TRY block here, so be careful not to throw error
3031  * without releasing the PGresult.
3032  */
3033  res = pgfdw_exec_query(conn, sql);
3034  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3035  pgfdw_report_error(ERROR, res, conn, true, sql);
3036  PQclear(res);
3037 }
3038 
3039 /*
3040  * prepare_foreign_modify
3041  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3042  */
3043 static void
3045 {
3046  char prep_name[NAMEDATALEN];
3047  char *p_name;
3048  PGresult *res;
3049 
3050  /* Construct name we'll use for the prepared statement. */
3051  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3052  GetPrepStmtNumber(fmstate->conn));
3053  p_name = pstrdup(prep_name);
3054 
3055  /*
3056  * We intentionally do not specify parameter types here, but leave the
3057  * remote server to derive them by default. This avoids possible problems
3058  * with the remote server using different type OIDs than we do. All of
3059  * the prepared statements we use in this module are simple enough that
3060  * the remote server will make the right choices.
3061  */
3062  if (!PQsendPrepare(fmstate->conn,
3063  p_name,
3064  fmstate->query,
3065  0,
3066  NULL))
3067  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3068 
3069  /*
3070  * Get the result, and check for success.
3071  *
3072  * We don't use a PG_TRY block here, so be careful not to throw error
3073  * without releasing the PGresult.
3074  */
3075  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3076  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3077  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3078  PQclear(res);
3079 
3080  /* This action shows that the prepare has been done. */
3081  fmstate->p_name = p_name;
3082 }
3083 
3084 /*
3085  * convert_prep_stmt_params
3086  * Create array of text strings representing parameter values
3087  *
3088  * tupleid is ctid to send, or NULL if none
3089  * slot is slot to get remaining parameters from, or NULL if none
3090  *
3091  * Data is constructed in temp_cxt; caller should reset that after use.
3092  */
3093 static const char **
3095  ItemPointer tupleid,
3096  TupleTableSlot *slot)
3097 {
3098  const char **p_values;
3099  int pindex = 0;
3100  MemoryContext oldcontext;
3101 
3102  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3103 
3104  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3105 
3106  /* 1st parameter should be ctid, if it's in use */
3107  if (tupleid != NULL)
3108  {
3109  /* don't need set_transmission_modes for TID output */
3110  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3111  PointerGetDatum(tupleid));
3112  pindex++;
3113  }
3114 
3115  /* get following parameters from slot */
3116  if (slot != NULL && fmstate->target_attrs != NIL)
3117  {
3118  int nestlevel;
3119  ListCell *lc;
3120 
3121  nestlevel = set_transmission_modes();
3122 
3123  foreach(lc, fmstate->target_attrs)
3124  {
3125  int attnum = lfirst_int(lc);
3126  Datum value;
3127  bool isnull;
3128 
3129  value = slot_getattr(slot, attnum, &isnull);
3130  if (isnull)
3131  p_values[pindex] = NULL;
3132  else
3133  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3134  value);
3135  pindex++;
3136  }
3137 
3138  reset_transmission_modes(nestlevel);
3139  }
3140 
3141  Assert(pindex == fmstate->p_nums);
3142 
3143  MemoryContextSwitchTo(oldcontext);
3144 
3145  return p_values;
3146 }
3147 
3148 /*
3149  * store_returning_result
3150  * Store the result of a RETURNING clause
3151  *
3152  * On error, be sure to release the PGresult on the way out. Callers do not
3153  * have PG_TRY blocks to ensure this happens.
3154  */
3155 static void
3157  TupleTableSlot *slot, PGresult *res)
3158 {
3159  PG_TRY();
3160  {
3161  HeapTuple newtup;
3162 
3163  newtup = make_tuple_from_result_row(res, 0,
3164  fmstate->rel,
3165  fmstate->attinmeta,
3166  fmstate->retrieved_attrs,
3167  NULL,
3168  fmstate->temp_cxt);
3169  /* tuple will be deleted when it is cleared from the slot */
3170  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3171  }
3172  PG_CATCH();
3173  {
3174  if (res)
3175  PQclear(res);
3176  PG_RE_THROW();
3177  }
3178  PG_END_TRY();
3179 }
3180 
3181 /*
3182  * Execute a direct UPDATE/DELETE statement.
3183  */
3184 static void
3186 {
3188  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3189  int numParams = dmstate->numParams;
3190  const char **values = dmstate->param_values;
3191 
3192  /*
3193  * Construct array of query parameter values in text format.
3194  */
3195  if (numParams > 0)
3196  process_query_params(econtext,
3197  dmstate->param_flinfo,
3198  dmstate->param_exprs,
3199  values);
3200 
3201  /*
3202  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3203  * to infer types for all parameters. Since we explicitly cast every
3204  * parameter (see deparse.c), the "inference" is trivial and will produce
3205  * the desired result. This allows us to avoid assuming that the remote
3206  * server has the same OIDs we do for the parameters' types.
3207  */
3208  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3209  NULL, values, NULL, NULL, 0))
3210  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3211 
3212  /*
3213  * Get the result, and check for success.
3214  *
3215  * We don't use a PG_TRY block here, so be careful not to throw error
3216  * without releasing the PGresult.
3217  */
3218  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3219  if (PQresultStatus(dmstate->result) !=
3221  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3222  dmstate->query);
3223 
3224  /* Get the number of rows affected. */
3225  if (dmstate->has_returning)
3226  dmstate->num_tuples = PQntuples(dmstate->result);
3227  else
3228  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3229 }
3230 
3231 /*
3232  * Get the result of a RETURNING clause.
3233  */
3234 static TupleTableSlot *
3236 {
3238  EState *estate = node->ss.ps.state;
3239  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3240  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3241 
3242  Assert(resultRelInfo->ri_projectReturning);
3243 
3244  /* If we didn't get any tuples, must be end of data. */
3245  if (dmstate->next_tuple >= dmstate->num_tuples)
3246  return ExecClearTuple(slot);
3247 
3248  /* Increment the command es_processed count if necessary. */
3249  if (dmstate->set_processed)
3250  estate->es_processed += 1;
3251 
3252  /*
3253  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3254  * tuple. (has_returning is false when the local query is of the form
3255  * "UPDATE/DELETE .. RETURNING 1" for example.)
3256  */
3257  if (!dmstate->has_returning)
3258  ExecStoreAllNullTuple(slot);
3259  else
3260  {
3261  /*
3262  * On error, be sure to release the PGresult on the way out. Callers
3263  * do not have PG_TRY blocks to ensure this happens.
3264  */
3265  PG_TRY();
3266  {
3267  HeapTuple newtup;
3268 
3269  newtup = make_tuple_from_result_row(dmstate->result,
3270  dmstate->next_tuple,
3271  dmstate->rel,
3272  dmstate->attinmeta,
3273  dmstate->retrieved_attrs,
3274  NULL,
3275  dmstate->temp_cxt);
3276  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3277  }
3278  PG_CATCH();
3279  {
3280  if (dmstate->result)
3281  PQclear(dmstate->result);
3282  PG_RE_THROW();
3283  }
3284  PG_END_TRY();
3285  }
3286  dmstate->next_tuple++;
3287 
3288  /* Make slot available for evaluation of the local query RETURNING list. */
3289  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3290 
3291  return slot;
3292 }
3293 
3294 /*
3295  * Prepare for processing of parameters used in remote query.
3296  */
3297 static void
3299  List *fdw_exprs,
3300  int numParams,
3301  FmgrInfo **param_flinfo,
3302  List **param_exprs,
3303  const char ***param_values)
3304 {
3305  int i;
3306  ListCell *lc;
3307 
3308  Assert(numParams > 0);
3309 
3310  /* Prepare for output conversion of parameters used in remote query. */
3311  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3312 
3313  i = 0;
3314  foreach(lc, fdw_exprs)
3315  {
3316  Node *param_expr = (Node *) lfirst(lc);
3317  Oid typefnoid;
3318  bool isvarlena;
3319 
3320  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3321  fmgr_info(typefnoid, &(*param_flinfo)[i]);
3322  i++;
3323  }
3324 
3325  /*
3326  * Prepare remote-parameter expressions for evaluation. (Note: in
3327  * practice, we expect that all these expressions will be just Params, so
3328  * we could possibly do something more efficient than using the full
3329  * expression-eval machinery for this. But probably there would be little
3330  * benefit, and it'd require postgres_fdw to know more than is desirable
3331  * about Param evaluation.)
3332  */
3333  *param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
3334 
3335  /* Allocate buffer for text form of query parameters. */
3336  *param_values = (const char **) palloc0(numParams * sizeof(char *));
3337 }
3338 
3339 /*
3340  * Construct array of query parameter values in text format.
3341  */
3342 static void
3344  FmgrInfo *param_flinfo,
3345  List *param_exprs,
3346  const char **param_values)
3347 {
3348  int nestlevel;
3349  int i;
3350  ListCell *lc;
3351 
3352  nestlevel = set_transmission_modes();
3353 
3354  i = 0;
3355  foreach(lc, param_exprs)
3356  {
3357  ExprState *expr_state = (ExprState *) lfirst(lc);
3358  Datum expr_value;
3359  bool isNull;
3360 
3361  /* Evaluate the parameter expression */
3362  expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
3363 
3364  /*
3365  * Get string representation of each parameter value by invoking
3366  * type-specific output function, unless the value is null.
3367  */
3368  if (isNull)
3369  param_values[i] = NULL;
3370  else
3371  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
3372 
3373  i++;
3374  }
3375 
3376  reset_transmission_modes(nestlevel);
3377 }
3378 
3379 /*
3380  * postgresAnalyzeForeignTable
3381  * Test whether analyzing this foreign table is supported
3382  */
3383 static bool
3385  AcquireSampleRowsFunc *func,
3386  BlockNumber *totalpages)
3387 {
3388  ForeignTable *table;
3389  UserMapping *user;
3390  PGconn *conn;
3391  StringInfoData sql;
3392  PGresult *volatile res = NULL;
3393 
3394  /* Return the row-analysis function pointer */
3396 
3397  /*
3398  * Now we have to get the number of pages. It's annoying that the ANALYZE
3399  * API requires us to return that now, because it forces some duplication
3400  * of effort between this routine and postgresAcquireSampleRowsFunc. But
3401  * it's probably not worth redefining that API at this point.
3402  */
3403 
3404  /*
3405  * Get the connection to use. We do the remote access as the table's
3406  * owner, even if the ANALYZE was started by some other user.
3407  */
3408  table = GetForeignTable(RelationGetRelid(relation));
3409  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3410  conn = GetConnection(user, false);
3411 
3412  /*
3413  * Construct command to get page count for relation.
3414  */
3415  initStringInfo(&sql);
3416  deparseAnalyzeSizeSql(&sql, relation);
3417 
3418  /* In what follows, do not risk leaking any PGresults. */
3419  PG_TRY();
3420  {
3421  res = pgfdw_exec_query(conn, sql.data);
3422  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3423  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3424 
3425  if (PQntuples(res) != 1 || PQnfields(res) != 1)
3426  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3427  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3428 
3429  PQclear(res);
3430  res = NULL;
3431  }
3432  PG_CATCH();
3433  {
3434  if (res)
3435  PQclear(res);
3436  PG_RE_THROW();
3437  }
3438  PG_END_TRY();
3439 
3440  ReleaseConnection(conn);
3441 
3442  return true;
3443 }
3444 
3445 /*
3446  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3447  *
3448  * We fetch the whole table from the remote side and pick out some sample rows.
3449  *
3450  * Selected rows are returned in the caller-allocated array rows[],
3451  * which must have at least targrows entries.
3452  * The actual number of rows selected is returned as the function result.
3453  * We also count the total number of rows in the table and return it into
3454  * *totalrows. Note that *totaldeadrows is always set to 0.
3455  *
3456  * Note that the returned list of rows is not always in order by physical
3457  * position in the table. Therefore, correlation estimates derived later
3458  * may be meaningless, but it's OK because we don't use the estimates
3459  * currently (the planner only pays attention to correlation for indexscans).
3460  */
3461 static int
3463  HeapTuple *rows, int targrows,
3464  double *totalrows,
3465  double *totaldeadrows)
3466 {
3467  PgFdwAnalyzeState astate;
3468  ForeignTable *table;
3469  ForeignServer *server;
3470  UserMapping *user;
3471  PGconn *conn;
3472  unsigned int cursor_number;
3473  StringInfoData sql;
3474  PGresult *volatile res = NULL;
3475 
3476  /* Initialize workspace state */
3477  astate.rel = relation;
3479 
3480  astate.rows = rows;
3481  astate.targrows = targrows;
3482  astate.numrows = 0;
3483  astate.samplerows = 0;
3484  astate.rowstoskip = -1; /* -1 means not set yet */
3485  reservoir_init_selection_state(&astate.rstate, targrows);
3486 
3487  /* Remember ANALYZE context, and create a per-tuple temp context */
3488  astate.anl_cxt = CurrentMemoryContext;
3490  "postgres_fdw temporary data",
3494 
3495  /*
3496  * Get the connection to use. We do the remote access as the table's
3497  * owner, even if the ANALYZE was started by some other user.
3498  */
3499  table = GetForeignTable(RelationGetRelid(relation));
3500  server = GetForeignServer(table->serverid);
3501  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3502  conn = GetConnection(user, false);
3503 
3504  /*
3505  * Construct cursor that retrieves whole rows from remote.
3506  */
3507  cursor_number = GetCursorNumber(conn);
3508  initStringInfo(&sql);
3509  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3510  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3511 
3512  /* In what follows, do not risk leaking any PGresults. */
3513  PG_TRY();
3514  {
3515  res = pgfdw_exec_query(conn, sql.data);
3516  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3517  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3518  PQclear(res);
3519  res = NULL;
3520 
3521  /* Retrieve and process rows a batch at a time. */
3522  for (;;)
3523  {
3524  char fetch_sql[64];
3525  int fetch_size;
3526  int numrows;
3527  int i;
3528  ListCell *lc;
3529 
3530  /* Allow users to cancel long query */
3532 
3533  /*
3534  * XXX possible future improvement: if rowstoskip is large, we
3535  * could issue a MOVE rather than physically fetching the rows,
3536  * then just adjust rowstoskip and samplerows appropriately.
3537  */
3538 
3539  /* The fetch size is arbitrary, but shouldn't be enormous. */
3540  fetch_size = 100;
3541  foreach(lc, server->options)
3542  {
3543  DefElem *def = (DefElem *) lfirst(lc);
3544 
3545  if (strcmp(def->defname, "fetch_size") == 0)
3546  {
3547  fetch_size = strtol(defGetString(def), NULL, 10);
3548  break;
3549  }
3550  }
3551  foreach(lc, table->options)
3552  {
3553  DefElem *def = (DefElem *) lfirst(lc);
3554 
3555  if (strcmp(def->defname, "fetch_size") == 0)
3556  {
3557  fetch_size = strtol(defGetString(def), NULL, 10);
3558  break;
3559  }
3560  }
3561 
3562  /* Fetch some rows */
3563  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3564  fetch_size, cursor_number);
3565 
3566  res = pgfdw_exec_query(conn, fetch_sql);
3567  /* On error, report the original query, not the FETCH. */
3568  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3569  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3570 
3571  /* Process whatever we got. */
3572  numrows = PQntuples(res);
3573  for (i = 0; i < numrows; i++)
3574  analyze_row_processor(res, i, &astate);
3575 
3576  PQclear(res);
3577  res = NULL;
3578 
3579  /* Must be EOF if we didn't get all the rows requested. */
3580  if (numrows < fetch_size)
3581  break;
3582  }
3583 
3584  /* Close the cursor, just to be tidy. */
3585  close_cursor(conn, cursor_number);
3586  }
3587  PG_CATCH();
3588  {
3589  if (res)
3590  PQclear(res);
3591  PG_RE_THROW();
3592  }
3593  PG_END_TRY();
3594 
3595  ReleaseConnection(conn);
3596 
3597  /* We assume that we have no dead tuple. */
3598  *totaldeadrows = 0.0;
3599 
3600  /* We've retrieved all living tuples from foreign server. */
3601  *totalrows = astate.samplerows;
3602 
3603  /*
3604  * Emit some interesting relation info
3605  */
3606  ereport(elevel,
3607  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3608  RelationGetRelationName(relation),
3609  astate.samplerows, astate.numrows)));
3610 
3611  return astate.numrows;
3612 }
3613 
3614 /*
3615  * Collect sample rows from the result of query.
3616  * - Use all tuples in sample until target # of samples are collected.
3617  * - Subsequently, replace already-sampled tuples randomly.
3618  */
3619 static void
3621 {
3622  int targrows = astate->targrows;
3623  int pos; /* array index to store tuple in */
3624  MemoryContext oldcontext;
3625 
3626  /* Always increment sample row counter. */
3627  astate->samplerows += 1;
3628 
3629  /*
3630  * Determine the slot where this sample row should be stored. Set pos to
3631  * negative value to indicate the row should be skipped.
3632  */
3633  if (astate->numrows < targrows)
3634  {
3635  /* First targrows rows are always included into the sample */
3636  pos = astate->numrows++;
3637  }
3638  else
3639  {
3640  /*
3641  * Now we start replacing tuples in the sample until we reach the end
3642  * of the relation. Same algorithm as in acquire_sample_rows in
3643  * analyze.c; see Jeff Vitter's paper.
3644  */
3645  if (astate->rowstoskip < 0)
3646  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3647 
3648  if (astate->rowstoskip <= 0)
3649  {
3650  /* Choose a random reservoir element to replace. */
3651  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3652  Assert(pos >= 0 && pos < targrows);
3653  heap_freetuple(astate->rows[pos]);
3654  }
3655  else
3656  {
3657  /* Skip this tuple. */
3658  pos = -1;
3659  }
3660 
3661  astate->rowstoskip -= 1;
3662  }
3663 
3664  if (pos >= 0)
3665  {
3666  /*
3667  * Create sample tuple from current result row, and store it in the
3668  * position determined above. The tuple has to be created in anl_cxt.
3669  */
3670  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3671 
3672  astate->rows[pos] = make_tuple_from_result_row(res, row,
3673  astate->rel,
3674  astate->attinmeta,
3675  astate->retrieved_attrs,
3676  NULL,
3677  astate->temp_cxt);
3678 
3679  MemoryContextSwitchTo(oldcontext);
3680  }
3681 }
3682 
3683 /*
3684  * Import a foreign schema
3685  */
3686 static List *
3688 {
3689  List *commands = NIL;
3690  bool import_collate = true;
3691  bool import_default = false;
3692  bool import_not_null = true;
3693  ForeignServer *server;
3694  UserMapping *mapping;
3695  PGconn *conn;
3697  PGresult *volatile res = NULL;
3698  int numrows,
3699  i;
3700  ListCell *lc;
3701 
3702  /* Parse statement options */
3703  foreach(lc, stmt->options)
3704  {
3705  DefElem *def = (DefElem *) lfirst(lc);
3706 
3707  if (strcmp(def->defname, "import_collate") == 0)
3708  import_collate = defGetBoolean(def);
3709  else if (strcmp(def->defname, "import_default") == 0)
3710  import_default = defGetBoolean(def);
3711  else if (strcmp(def->defname, "import_not_null") == 0)
3712  import_not_null = defGetBoolean(def);
3713  else
3714  ereport(ERROR,
3715  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3716  errmsg("invalid option \"%s\"", def->defname)));
3717  }
3718 
3719  /*
3720  * Get connection to the foreign server. Connection manager will
3721  * establish new connection if necessary.
3722  */
3723  server = GetForeignServer(serverOid);
3724  mapping = GetUserMapping(GetUserId(), server->serverid);
3725  conn = GetConnection(mapping, false);
3726 
3727  /* Don't attempt to import collation if remote server hasn't got it */
3728  if (PQserverVersion(conn) < 90100)
3729  import_collate = false;
3730 
3731  /* Create workspace for strings */
3732  initStringInfo(&buf);
3733 
3734  /* In what follows, do not risk leaking any PGresults. */
3735  PG_TRY();
3736  {
3737  /* Check that the schema really exists */
3738  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3739  deparseStringLiteral(&buf, stmt->remote_schema);
3740 
3741  res = pgfdw_exec_query(conn, buf.data);
3742  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3743  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3744 
3745  if (PQntuples(res) != 1)
3746  ereport(ERROR,
3747  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3748  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3749  stmt->remote_schema, server->servername)));
3750 
3751  PQclear(res);
3752  res = NULL;
3753  resetStringInfo(&buf);
3754 
3755  /*
3756  * Fetch all table data from this schema, possibly restricted by
3757  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
3758  * to EXCEPT/LIMIT TO here, because the core code will filter the
3759  * statements we return according to those lists anyway. But it
3760  * should save a few cycles to not process excluded tables in the
3761  * first place.)
3762  *
3763  * Note: because we run the connection with search_path restricted to
3764  * pg_catalog, the format_type() and pg_get_expr() outputs will always
3765  * include a schema name for types/functions in other schemas, which
3766  * is what we want.
3767  */
3768  if (import_collate)
3770  "SELECT relname, "
3771  " attname, "
3772  " format_type(atttypid, atttypmod), "
3773  " attnotnull, "
3774  " pg_get_expr(adbin, adrelid), "
3775  " collname, "
3776  " collnsp.nspname "
3777  "FROM pg_class c "
3778  " JOIN pg_namespace n ON "
3779  " relnamespace = n.oid "
3780  " LEFT JOIN pg_attribute a ON "
3781  " attrelid = c.oid AND attnum > 0 "
3782  " AND NOT attisdropped "
3783  " LEFT JOIN pg_attrdef ad ON "
3784  " adrelid = c.oid AND adnum = attnum "
3785  " LEFT JOIN pg_collation coll ON "
3786  " coll.oid = attcollation "
3787  " LEFT JOIN pg_namespace collnsp ON "
3788  " collnsp.oid = collnamespace ");
3789  else
3791  "SELECT relname, "
3792  " attname, "
3793  " format_type(atttypid, atttypmod), "
3794  " attnotnull, "
3795  " pg_get_expr(adbin, adrelid), "
3796  " NULL, NULL "
3797  "FROM pg_class c "
3798  " JOIN pg_namespace n ON "
3799  " relnamespace = n.oid "
3800  " LEFT JOIN pg_attribute a ON "
3801  " attrelid = c.oid AND attnum > 0 "
3802  " AND NOT attisdropped "
3803  " LEFT JOIN pg_attrdef ad ON "
3804  " adrelid = c.oid AND adnum = attnum ");
3805 
3807  "WHERE c.relkind IN ('r', 'v', 'f', 'm') "
3808  " AND n.nspname = ");
3809  deparseStringLiteral(&buf, stmt->remote_schema);
3810 
3811  /* Apply restrictions for LIMIT TO and EXCEPT */
3812  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3814  {
3815  bool first_item = true;
3816 
3817  appendStringInfoString(&buf, " AND c.relname ");
3818  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3819  appendStringInfoString(&buf, "NOT ");
3820  appendStringInfoString(&buf, "IN (");
3821 
3822  /* Append list of table names within IN clause */
3823  foreach(lc, stmt->table_list)
3824  {
3825  RangeVar *rv = (RangeVar *) lfirst(lc);
3826 
3827  if (first_item)
3828  first_item = false;
3829  else
3830  appendStringInfoString(&buf, ", ");
3831  deparseStringLiteral(&buf, rv->relname);
3832  }
3833  appendStringInfoChar(&buf, ')');
3834  }
3835 
3836  /* Append ORDER BY at the end of query to ensure output ordering */
3837  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3838 
3839  /* Fetch the data */
3840  res = pgfdw_exec_query(conn, buf.data);
3841  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3842  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3843 
3844  /* Process results */
3845  numrows = PQntuples(res);
3846  /* note: incrementation of i happens in inner loop's while() test */
3847  for (i = 0; i < numrows;)
3848  {
3849  char *tablename = PQgetvalue(res, i, 0);
3850  bool first_item = true;
3851 
3852  resetStringInfo(&buf);
3853  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3854  quote_identifier(tablename));
3855 
3856  /* Scan all rows for this table */
3857  do
3858  {
3859  char *attname;
3860  char *typename;
3861  char *attnotnull;
3862  char *attdefault;
3863  char *collname;
3864  char *collnamespace;
3865 
3866  /* If table has no columns, we'll see nulls here */
3867  if (PQgetisnull(res, i, 1))
3868  continue;
3869 
3870  attname = PQgetvalue(res, i, 1);
3871  typename = PQgetvalue(res, i, 2);
3872  attnotnull = PQgetvalue(res, i, 3);
3873  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3874  PQgetvalue(res, i, 4);
3875  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
3876  PQgetvalue(res, i, 5);
3877  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
3878  PQgetvalue(res, i, 6);
3879 
3880  if (first_item)
3881  first_item = false;
3882  else
3883  appendStringInfoString(&buf, ",\n");
3884 
3885  /* Print column name and type */
3886  appendStringInfo(&buf, " %s %s",
3887  quote_identifier(attname),
3888  typename);
3889 
3890  /*
3891  * Add column_name option so that renaming the foreign table's
3892  * column doesn't break the association to the underlying
3893  * column.
3894  */
3895  appendStringInfoString(&buf, " OPTIONS (column_name ");
3896  deparseStringLiteral(&buf, attname);
3897  appendStringInfoChar(&buf, ')');
3898 
3899  /* Add COLLATE if needed */
3900  if (import_collate && collname != NULL && collnamespace != NULL)
3901  appendStringInfo(&buf, " COLLATE %s.%s",
3902  quote_identifier(collnamespace),
3903  quote_identifier(collname));
3904 
3905  /* Add DEFAULT if needed */
3906  if (import_default && attdefault != NULL)
3907  appendStringInfo(&buf, " DEFAULT %s", attdefault);
3908 
3909  /* Add NOT NULL if needed */
3910  if (import_not_null && attnotnull[0] == 't')
3911  appendStringInfoString(&buf, " NOT NULL");
3912  }
3913  while (++i < numrows &&
3914  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
3915 
3916  /*
3917  * Add server name and table-level options. We specify remote
3918  * schema and table name as options (the latter to ensure that
3919  * renaming the foreign table doesn't break the association).
3920  */
3921  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
3922  quote_identifier(server->servername));
3923 
3924  appendStringInfoString(&buf, "schema_name ");
3925  deparseStringLiteral(&buf, stmt->remote_schema);
3926  appendStringInfoString(&buf, ", table_name ");
3927  deparseStringLiteral(&buf, tablename);
3928 
3929  appendStringInfoString(&buf, ");");
3930 
3931  commands = lappend(commands, pstrdup(buf.data));
3932  }
3933 
3934  /* Clean up */
3935  PQclear(res);
3936  res = NULL;
3937  }
3938  PG_CATCH();
3939  {
3940  if (res)
3941  PQclear(res);
3942  PG_RE_THROW();
3943  }
3944  PG_END_TRY();
3945 
3946  ReleaseConnection(conn);
3947 
3948  return commands;
3949 }
3950 
3951 /*
3952  * Assess whether the join between inner and outer relations can be pushed down
3953  * to the foreign server. As a side effect, save information we obtain in this
3954  * function to PgFdwRelationInfo passed in.
3955  *
3956  * Joins that satisfy conditions below are safe to push down.
3957  *
3958  * 1) Join type is INNER or OUTER (one of LEFT/RIGHT/FULL)
3959  * 2) Both outer and inner portions are safe to push-down
3960  * 3) All join conditions are safe to push down
3961  * 4) No relation has local filter (this can be relaxed for INNER JOIN, if we
3962  * can move unpushable clauses upwards in the join tree).
3963  */
3964 static bool
3966  RelOptInfo *outerrel, RelOptInfo *innerrel,
3967  JoinPathExtraData *extra)
3968 {
3969  PgFdwRelationInfo *fpinfo;
3970  PgFdwRelationInfo *fpinfo_o;
3971  PgFdwRelationInfo *fpinfo_i;
3972  ListCell *lc;
3973  List *joinclauses;
3974  List *otherclauses;
3975 
3976  /*
3977  * Core code may call GetForeignJoinPaths hook even when the join relation
3978  * doesn't have a valid user mapping associated with it. See
3979  * build_join_rel() for details. We can't push down such join, since there
3980  * doesn't exist a user mapping which can be used to connect to the
3981  * foreign server.
3982  */
3983  if (!OidIsValid(joinrel->umid))
3984  return false;
3985 
3986  /*
3987  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
3988  * Constructing queries representing SEMI and ANTI joins is hard, hence
3989  * not considered right now.
3990  */
3991  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
3992  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
3993  return false;
3994 
3995  /*
3996  * If either of the joining relations is marked as unsafe to pushdown, the
3997  * join can not be pushed down.
3998  */
3999  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4000  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4001  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4002  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4003  !fpinfo_i || !fpinfo_i->pushdown_safe)
4004  return false;
4005 
4006  /*
4007  * If joining relations have local conditions, those conditions are
4008  * required to be applied before joining the relations. Hence the join can
4009  * not be pushed down.
4010  */
4011  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4012  return false;
4013 
4014  /* Separate restrict list into join quals and quals on join relation */
4015  if (IS_OUTER_JOIN(jointype))
4016  extract_actual_join_clauses(extra->restrictlist, &joinclauses, &otherclauses);
4017  else
4018  {
4019  /*
4020  * Unlike an outer join, for inner join, the join result contains only
4021  * the rows which satisfy join clauses, similar to the other clause.
4022  * Hence all clauses can be treated as other quals. This helps to push
4023  * a join down to the foreign server even if some of its join quals
4024  * are not safe to pushdown.
4025  */
4026  otherclauses = extract_actual_clauses(extra->restrictlist, false);
4027  joinclauses = NIL;
4028  }
4029 
4030  /* Join quals must be safe to push down. */
4031  foreach(lc, joinclauses)
4032  {
4033  Expr *expr = (Expr *) lfirst(lc);
4034 
4035  if (!is_foreign_expr(root, joinrel, expr))
4036  return false;
4037  }
4038 
4039  /* Save the join clauses, for later use. */
4040  fpinfo->joinclauses = joinclauses;
4041 
4042  /*
4043  * Other clauses are applied after the join has been performed and thus
4044  * need not be all pushable. We will push those which can be pushed to
4045  * reduce the number of rows fetched from the foreign server. Rest of them
4046  * will be applied locally after fetching join result. Add them to fpinfo
4047  * so that other joins involving this joinrel will know that this joinrel
4048  * has local clauses.
4049  */
4050  foreach(lc, otherclauses)
4051  {
4052  Expr *expr = (Expr *) lfirst(lc);
4053 
4054  if (!is_foreign_expr(root, joinrel, expr))
4055  fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4056  else
4057  fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4058  }
4059 
4060  fpinfo->outerrel = outerrel;
4061  fpinfo->innerrel = innerrel;
4062  fpinfo->jointype = jointype;
4063 
4064  /*
4065  * Pull the other remote conditions from the joining relations into join
4066  * clauses or other remote clauses (remote_conds) of this relation wherever
4067  * possible. This avoids building subqueries at every join step, which is
4068  * not currently supported by the deparser logic.
4069  *
4070  * For an inner join, clauses from both the relations are added to the
4071  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from the
4072  * outer side are added to remote_conds since those can be evaluated after
4073  * the join is evaluated. The clauses from inner side are added to the
4074  * joinclauses, since they need to evaluated while constructing the join.
4075  *
4076  * For a FULL OUTER JOIN, the other clauses from either relation can not be
4077  * added to the joinclauses or remote_conds, since each relation acts as an
4078  * outer relation for the other. Consider such full outer join as
4079  * unshippable because of the reasons mentioned above in this comment.
4080  *
4081  * The joining sides can not have local conditions, thus no need to test
4082  * shippability of the clauses being pulled up.
4083  */
4084  switch (jointype)
4085  {
4086  case JOIN_INNER:
4087  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4088  list_copy(fpinfo_i->remote_conds));
4089  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4090  list_copy(fpinfo_o->remote_conds));
4091  break;
4092 
4093  case JOIN_LEFT:
4094  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4095  list_copy(fpinfo_i->remote_conds));
4096  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4097  list_copy(fpinfo_o->remote_conds));
4098  break;
4099 
4100  case JOIN_RIGHT:
4101  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4102  list_copy(fpinfo_o->remote_conds));
4103  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4104  list_copy(fpinfo_i->remote_conds));
4105  break;
4106 
4107  case JOIN_FULL:
4108  if (fpinfo_i->remote_conds || fpinfo_o->remote_conds)
4109  return false;
4110  break;
4111 
4112  default:
4113  /* Should not happen, we have just check this above */
4114  elog(ERROR, "unsupported join type %d", jointype);
4115  }
4116 
4117  /*
4118  * For an inner join, as explained above all restrictions can be treated
4119  * alike. Treating the pushed down conditions as join conditions allows a
4120  * top level full outer join to be deparsed without requiring subqueries.
4121  */
4122  if (jointype == JOIN_INNER)
4123  {
4124  Assert(!fpinfo->joinclauses);
4125  fpinfo->joinclauses = fpinfo->remote_conds;
4126  fpinfo->remote_conds = NIL;
4127  }
4128 
4129  /* Mark that this join can be pushed down safely */
4130  fpinfo->pushdown_safe = true;
4131 
4132  /*
4133  * If user is willing to estimate cost for a scan of either of the joining
4134  * relations using EXPLAIN, he intends to estimate scans on that relation
4135  * more accurately. Then, it makes sense to estimate the cost the join
4136  * with that relation more accurately using EXPLAIN.
4137  */
4138  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4139  fpinfo_i->use_remote_estimate;
4140 
4141  /*
4142  * Since both the joining relations come from the same server, the server
4143  * level options should have same value for both the relations. Pick from
4144  * any side.
4145  */
4146  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4147  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4148 
4149  /*
4150  * Set cached relation costs to some negative value, so that we can detect
4151  * when they are set to some sensible costs, during one (usually the
4152  * first) of the calls to estimate_path_cost_size().
4153  */
4154  fpinfo->rel_startup_cost = -1;
4155  fpinfo->rel_total_cost = -1;
4156 
4157  /*
4158  * Set fetch size to maximum of the joining sides, since we are expecting
4159  * the rows returned by the join to be proportional to the relation sizes.
4160  */
4161  if (fpinfo_o->fetch_size > fpinfo_i->fetch_size)
4162  fpinfo->fetch_size = fpinfo_o->fetch_size;
4163  else
4164  fpinfo->fetch_size = fpinfo_i->fetch_size;
4165 
4166  /*
4167  * Set the string describing this join relation to be used in EXPLAIN
4168  * output of corresponding ForeignScan.
4169  */
4170  fpinfo->relation_name = makeStringInfo();
4171  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4172  fpinfo_o->relation_name->data,
4173  get_jointype_name(fpinfo->jointype),
4174  fpinfo_i->relation_name->data);
4175 
4176  return true;
4177 }
4178 
4179 static void
4181  Path *epq_path)
4182 {
4183  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4184  ListCell *lc;
4185 
4186  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4187 
4188  /* Create one path for each set of pathkeys we found above. */
4189  foreach(lc, useful_pathkeys_list)
4190  {
4191  double rows;
4192  int width;
4193  Cost startup_cost;
4194  Cost total_cost;
4195  List *useful_pathkeys = lfirst(lc);
4196 
4197  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4198  &rows, &width, &startup_cost, &total_cost);
4199 
4200  add_path(rel, (Path *)
4201  create_foreignscan_path(root, rel,
4202  NULL,
4203  rows,
4204  startup_cost,
4205  total_cost,
4206  useful_pathkeys,
4207  NULL,
4208  epq_path,
4209  NIL));
4210  }
4211 }
4212 
4213 /*
4214  * postgresGetForeignJoinPaths
4215  * Add possible ForeignPath to joinrel, if join is safe to push down.
4216  */
4217 static void
4219  RelOptInfo *joinrel,
4220  RelOptInfo *outerrel,
4221  RelOptInfo *innerrel,
4222  JoinType jointype,
4223  JoinPathExtraData *extra)
4224 {
4225  PgFdwRelationInfo *fpinfo;
4226  ForeignPath *joinpath;
4227  double rows;
4228  int width;
4229  Cost startup_cost;
4230  Cost total_cost;
4231  Path *epq_path; /* Path to create plan to be executed when
4232  * EvalPlanQual gets triggered. */
4233 
4234  /*
4235  * Skip if this join combination has been considered already.
4236  */
4237  if (joinrel->fdw_private)
4238  return;
4239 
4240  /*
4241  * Create unfinished PgFdwRelationInfo entry which is used to indicate
4242  * that the join relation is already considered, so that we won't waste
4243  * time in judging safety of join pushdown and adding the same paths again
4244  * if found safe. Once we know that this join can be pushed down, we fill
4245  * the entry.
4246  */
4247  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4248  fpinfo->pushdown_safe = false;
4249  joinrel->fdw_private = fpinfo;
4250  /* attrs_used is only for base relations. */
4251  fpinfo->attrs_used = NULL;
4252 
4253  /*
4254  * In case there is a possibility that EvalPlanQual will be executed, we
4255  * should be able to reconstruct the row, from base relations applying all
4256  * the conditions. We create a local plan from a suitable local path
4257  * available in the path list. In case such a path doesn't exist, we can
4258  * not push the join to the foreign server since we won't be able to
4259  * reconstruct the row for EvalPlanQual(). Find an alternative local path
4260  * before we add ForeignPath, lest the new path would kick possibly the
4261  * only local path. Do this before calling foreign_join_ok(), since that
4262  * function updates fpinfo and marks it as pushable if the join is found
4263  * to be pushable.
4264  */
4265  if (root->parse->commandType == CMD_DELETE ||
4266  root->parse->commandType == CMD_UPDATE ||
4267  root->rowMarks)
4268  {
4269  epq_path = GetExistingLocalJoinPath(joinrel);
4270  if (!epq_path)
4271  {
4272  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4273  return;
4274  }
4275  }
4276  else
4277  epq_path = NULL;
4278 
4279  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4280  {
4281  /* Free path required for EPQ if we copied one; we don't need it now */
4282  if (epq_path)
4283  pfree(epq_path);
4284  return;
4285  }
4286 
4287  /*
4288  * Compute the selectivity and cost of the local_conds, so we don't have
4289  * to do it over again for each path. The best we can do for these
4290  * conditions is to estimate selectivity on the basis of local statistics.
4291  * The local conditions are applied after the join has been computed on
4292  * the remote side like quals in WHERE clause, so pass jointype as
4293  * JOIN_INNER.
4294  */
4295  fpinfo->local_conds_sel = clauselist_selectivity(root,
4296  fpinfo->local_conds,
4297  0,
4298  JOIN_INNER,
4299  NULL);
4300  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4301 
4302  /*
4303  * If we are going to estimate the costs using EXPLAIN, we will need
4304  * connection information. Fill it here.
4305  */
4306  if (fpinfo->use_remote_estimate)
4307  fpinfo->user = GetUserMappingById(joinrel->umid);
4308  else
4309  {
4310  fpinfo->user = NULL;
4311 
4312  /*
4313  * If we are going to estimate costs locally, estimate the join clause
4314  * selectivity here while we have special join info.
4315  */
4316  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4317  0, fpinfo->jointype,
4318  extra->sjinfo);
4319 
4320  }
4321  fpinfo->server = GetForeignServer(joinrel->serverid);
4322 
4323  /* Estimate costs for bare join relation */
4324  estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4325  &width, &startup_cost, &total_cost);
4326  /* Now update this information in the joinrel */
4327  joinrel->rows = rows;
4328  joinrel->reltarget->width = width;
4329  fpinfo->rows = rows;
4330  fpinfo->width = width;
4331  fpinfo->startup_cost = startup_cost;
4332  fpinfo->total_cost = total_cost;
4333 
4334  /*
4335  * Create a new join path and add it to the joinrel which represents a
4336  * join between foreign tables.
4337  */
4338  joinpath = create_foreignscan_path(root,
4339  joinrel,
4340  NULL, /* default pathtarget */
4341  rows,
4342  startup_cost,
4343  total_cost,
4344  NIL, /* no pathkeys */
4345  NULL, /* no required_outer */
4346  epq_path,
4347  NULL); /* no fdw_private */
4348 
4349  /* Add generated path into joinrel by add_path(). */
4350  add_path(joinrel, (Path *) joinpath);
4351 
4352  /* Consider pathkeys for the join relation */
4353  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4354 
4355  /* XXX Consider parameterized paths for the join relation */
4356 }
4357 
4358 /*
4359  * Create a tuple from the specified row of the PGresult.
4360  *
4361  * rel is the local representation of the foreign table, attinmeta is
4362  * conversion data for the rel's tupdesc, and retrieved_attrs is an
4363  * integer list of the table column numbers present in the PGresult.
4364  * temp_context is a working context that can be reset after each tuple.
4365  */
4366 static HeapTuple
4368  int row,
4369  Relation rel,
4370  AttInMetadata *attinmeta,
4371  List *retrieved_attrs,
4372  ForeignScanState *fsstate,
4373  MemoryContext temp_context)
4374 {
4375  HeapTuple tuple;
4376  TupleDesc tupdesc;
4377  Datum *values;
4378  bool *nulls;
4379  ItemPointer ctid = NULL;
4380  ConversionLocation errpos;
4381  ErrorContextCallback errcallback;
4382  MemoryContext oldcontext;
4383  ListCell *lc;
4384  int j;
4385 
4386  Assert(row < PQntuples(res));
4387 
4388  /*
4389  * Do the following work in a temp context that we reset after each tuple.
4390  * This cleans up not only the data we have direct access to, but any
4391  * cruft the I/O functions might leak.
4392  */
4393  oldcontext = MemoryContextSwitchTo(temp_context);
4394 
4395  if (rel)
4396  tupdesc = RelationGetDescr(rel);
4397  else
4398  {
4399  PgFdwScanState *fdw_sstate;
4400 
4401  Assert(fsstate);
4402  fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
4403  tupdesc = fdw_sstate->tupdesc;
4404  }
4405 
4406  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
4407  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
4408  /* Initialize to nulls for any columns not present in result */
4409  memset(nulls, true, tupdesc->natts * sizeof(bool));
4410 
4411  /*
4412  * Set up and install callback to report where conversion error occurs.
4413  */
4414  errpos.rel = rel;
4415  errpos.cur_attno = 0;
4416  errpos.fsstate = fsstate;
4417  errcallback.callback = conversion_error_callback;
4418  errcallback.arg = (void *) &errpos;
4419  errcallback.previous = error_context_stack;
4420  error_context_stack = &errcallback;
4421 
4422  /*
4423  * i indexes columns in the relation, j indexes columns in the PGresult.
4424  */
4425  j = 0;
4426  foreach(lc, retrieved_attrs)
4427  {
4428  int i = lfirst_int(lc);
4429  char *valstr;
4430 
4431  /* fetch next column's textual value */
4432  if (PQgetisnull(res, row, j))
4433  valstr = NULL;
4434  else
4435  valstr = PQgetvalue(res, row, j);
4436 
4437  /* convert value to internal representation */
4438  errpos.cur_attno = i;
4439  if (i > 0)
4440  {
4441  /* ordinary column */
4442  Assert(i <= tupdesc->natts);
4443  nulls[i - 1] = (valstr == NULL);
4444  /* Apply the input function even to nulls, to support domains */
4445  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
4446  valstr,
4447  attinmeta->attioparams[i - 1],
4448  attinmeta->atttypmods[i - 1]);
4449  }
4450  else if (i == SelfItemPointerAttributeNumber)
4451  {
4452  /* ctid --- note we ignore any other system column in result */
4453  if (valstr != NULL)
4454  {
4455  Datum datum;
4456 
4457  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
4458  ctid = (ItemPointer) DatumGetPointer(datum);
4459  }
4460  }
4461  errpos.cur_attno = 0;
4462 
4463  j++;
4464  }
4465 
4466  /* Uninstall error context callback. */
4467  error_context_stack = errcallback.previous;
4468 
4469  /*
4470  * Check we got the expected number of columns. Note: j == 0 and
4471  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
4472  */
4473  if (j > 0 && j != PQnfields(res))
4474  elog(ERROR, "remote query result does not match the foreign table");
4475 
4476  /*
4477  * Build the result tuple in caller's memory context.
4478  */
4479  MemoryContextSwitchTo(oldcontext);
4480 
4481  tuple = heap_form_tuple(tupdesc, values, nulls);
4482 
4483  /*
4484  * If we have a CTID to return, install it in both t_self and t_ctid.
4485  * t_self is the normal place, but if the tuple is converted to a
4486  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
4487  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
4488  */
4489  if (ctid)
4490  tuple->t_self = tuple->t_data->t_ctid = *ctid;
4491 
4492  /*
4493  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
4494  * heap_form_tuple. heap_form_tuple actually creates the tuple with
4495  * DatumTupleFields, not HeapTupleFields, but the executor expects
4496  * HeapTupleFields and will happily extract system columns on that
4497  * assumption. If we don't do this then, for example, the tuple length
4498  * ends up in the xmin field, which isn't what we want.
4499  */
4503 
4504  /* Clean up */
4505  MemoryContextReset(temp_context);
4506 
4507  return tuple;
4508 }
4509 
4510 /*
4511  * Callback function which is called when error occurs during column value
4512  * conversion. Print names of column and relation.
4513  */
4514 static void
4516 {
4517  const char *attname = NULL;
4518  const char *relname = NULL;
4519  ConversionLocation *errpos = (ConversionLocation *) arg;
4520 
4521  if (errpos->rel)
4522  {
4523  /* error occurred in a scan against a foreign table */
4524  TupleDesc tupdesc = RelationGetDescr(errpos->rel);
4525 
4526  if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
4527  attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname);
4528  else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
4529  attname = "ctid";
4530 
4531  relname = RelationGetRelationName(errpos->rel);
4532  }
4533  else
4534  {
4535  /* error occurred in a scan against a foreign join */
4536  ForeignScanState *fsstate = errpos->fsstate;
4537  ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan;
4538  EState *estate = fsstate->ss.ps.state;
4539  TargetEntry *tle;
4540  Var *var;
4541  RangeTblEntry *rte;
4542 
4543  Assert(IsA(fsplan, ForeignScan));
4544  tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist,
4545  errpos->cur_attno - 1);
4546  Assert(IsA(tle, TargetEntry));
4547  var = (Var *) tle->expr;
4548  Assert(IsA(var, Var));
4549 
4550  rte = rt_fetch(var->varno, estate->es_range_table);
4551  relname = get_rel_name(rte->relid);
4552  attname = get_relid_attribute_name(rte->relid, var->varattno);
4553  }
4554 
4555  if (attname && relname)
4556  errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
4557 }
4558 
4559 /*
4560  * Find an equivalence class member expression, all of whose Vars, come from
4561  * the indicated relation.
4562  */
4563 extern Expr *
4565 {
4566  ListCell *lc_em;
4567 
4568  foreach(lc_em, ec->ec_members)
4569  {
4570  EquivalenceMember *em = lfirst(lc_em);
4571 
4572  if (bms_is_subset(em->em_relids, rel->relids))
4573  {
4574  /*
4575  * If there is more than one equivalence member whose Vars are
4576  * taken entirely from this relation, we'll be content to choose
4577  * any one of those.
4578  */
4579  return em->em_expr;
4580  }
4581  }
4582 
4583  /* We didn't find any suitable equivalence class expression */
4584  return NULL;
4585 }
GetForeignPlan_function GetForeignPlan
Definition: fdwapi.h:173
bool has_eclass_joins
Definition: relation.h:541
Value * makeString(char *str)
Definition: value.c:53
BeginForeignScan_function BeginForeignScan
Definition: fdwapi.h:174
ExecForeignDelete_function ExecForeignDelete
Definition: fdwapi.h:196
#define PG_RETURN_POINTER(x)
Definition: fmgr.h:305
RelOptInfo * find_childrel_top_parent(PlannerInfo *root, RelOptInfo *rel)
Definition: relnode.c:940
EndDirectModify_function EndDirectModify
Definition: fdwapi.h:202
#define NIL
Definition: pg_list.h:69
List * rowMarks
Definition: relation.h:250
ScanState ss
Definition: execnodes.h:1588
void deparseUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1283
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:323
Datum postgres_fdw_handler(PG_FUNCTION_ARGS)
Definition: postgres_fdw.c:417
static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:776
Definition: fmgr.h:53
List * qual
Definition: plannodes.h:122
static struct @76 value
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2664
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
FdwModifyPrivateIndex
Definition: postgres_fdw.c:90
Relation ri_RelationDesc
Definition: execnodes.h:328
TupleTableSlot * ExecProcNode(PlanState *node)
Definition: execProcnode.c:374
#define IsA(nodeptr, _type_)
Definition: nodes.h:542
static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
Query * parse
Definition: relation.h:151
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2600
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition: pathnode.c:412
static List * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
ParamPathInfo * get_baserel_parampathinfo(PlannerInfo *root, RelOptInfo *baserel, Relids required_outer)
Definition: relnode.c:999
ExplainForeignScan_function ExplainForeignScan
Definition: fdwapi.h:210
RelOptKind reloptkind
Definition: relation.h:478
Index scanrelid
Definition: plannodes.h:287
List * query_pathkeys
Definition: relation.h:254
Instrumentation * instrument
Definition: execnodes.h:1033
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1165
Oid fs_server
Definition: plannodes.h:536
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:9518
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3050
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg)
static TupleTableSlot * postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path)
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
void extract_actual_join_clauses(List *restrictinfo_list, List **joinquals, List **otherquals)
Definition: restrictinfo.c:420
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:515
HeapTuple * rows
Definition: postgres_fdw.c:229
AttrNumber ExecFindJunkAttributeInTlist(List *targetlist, const char *attrName)
Definition: execJunk.c:221
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2716
#define RelationGetDescr(relation)
Definition: rel.h:353
#define DEBUG3
Definition: elog.h:23
Oid GetUserId(void)
Definition: miscinit.c:282
AnalyzeForeignTable_function AnalyzeForeignTable
Definition: fdwapi.h:215
static void create_cursor(ForeignScanState *node)
const char ** param_values
Definition: postgres_fdw.c:208
Definition: plannodes.h:96
#define PointerGetDatum(X)
Definition: postgres.h:564
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:2997
static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
List * param_exprs
Definition: postgres_fdw.c:143
int(* AcquireSampleRowsFunc)(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
Definition: fdwapi.h:132
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:194
bool eclass_useful_for_merging(PlannerInfo *root, EquivalenceClass *eclass, RelOptInfo *rel)
Definition: equivclass.c:2265
char * pstrdup(const char *in)
Definition: mcxt.c:1168
ExprContext * ps_ExprContext
Definition: execnodes.h:1058
static void postgresBeginForeignScan(ForeignScanState *node, int eflags)
double tuples
Definition: relation.h:521
List * baserestrictinfo
Definition: relation.h:536
List * fdw_exprs
Definition: plannodes.h:537
PG_FUNCTION_INFO_V1(postgres_fdw_handler)
Relids clause_relids
Definition: relation.h:1595
AttInMetadata * attinmeta
Definition: postgres_fdw.c:195
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist, List *remote_conds, List *pathkeys, List **retrieved_attrs, List **params_list)
Definition: deparse.c:763
#define Min(x, y)
Definition: c.h:798
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:907
ForeignServer * server
Definition: postgres_fdw.h:78
bool pseudoconstant
Definition: relation.h:1592
int resultRelation
Definition: parsenodes.h:114
List * fdw_private
Definition: plannodes.h:538
static List * get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:683
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:442
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IS_OUTER_JOIN(jointype)
Definition: nodes.h:676
double sampler_random_fract(SamplerRandomState randstate)
Definition: sampling.c:238
static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
uint64 fetch_size
Definition: logging.c:21
#define InvalidBuffer
Definition: buf.h:25
void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, List **local_conds)
Definition: deparse.c:171
int set_transmission_modes(void)
List * list_copy(const List *oldlist)
Definition: list.c:1160
Definition: nodes.h:491
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:326
int IntervalStyle
Definition: globals.c:106
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
bool canSetTag
Definition: plannodes.h:184
CmdType operation
Definition: execnodes.h:1121
List * list_concat(List *list1, List *list2)
Definition: list.c:321
int32 * atttypmods
Definition: funcapi.h:47
PathKey * make_canonical_pathkey(PlannerInfo *root, EquivalenceClass *eclass, Oid opfamily, int strategy, bool nulls_first)
Definition: pathkeys.c:51
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1251
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:138
uint32 BlockNumber
Definition: block.h:31
EquivalenceClass * right_ec
Definition: relation.h:1629
List * retrieved_attrs
Definition: postgres_fdw.c:135
void reservoir_init_selection_state(ReservoirState rs, int n)
Definition: sampling.c:129
List * fdw_scan_tlist
Definition: plannodes.h:539
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:548
double Selectivity
Definition: nodes.h:593
Relation ss_currentRelation
Definition: execnodes.h:1249
EState * state
Definition: execnodes.h:1029
List * es_range_table
Definition: execnodes.h:362
static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Definition: postgres_fdw.c:471
Form_pg_class rd_rel
Definition: rel.h:83
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1306
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5434
static int postgresIsForeignRelUpdatable(Relation rel)
Definition: primnodes.h:148
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2656
static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
struct ErrorContextCallback * previous
Definition: elog.h:237
#define OidIsValid(objectId)
Definition: c.h:530
Oid * attioparams
Definition: funcapi.h:44
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra)
List * retrieved_attrs
Definition: postgres_fdw.c:178
List * mergeopfamilies
Definition: relation.h:1625
static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
int natts
Definition: tupdesc.h:73
CmdType operation
Definition: plannodes.h:535
List * plans
Definition: plannodes.h:188
static TupleTableSlot * postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
RelOptInfo * outerrel
Definition: postgres_fdw.h:91
static void close_cursor(PGconn *conn, unsigned int cursor_number)
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2579
Cost startup
Definition: relation.h:45
void pull_varattnos(Node *node, Index varno, Bitmapset **varattnos)
Definition: var.c:219
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:142
AddForeignUpdateTargets_function AddForeignUpdateTargets
Definition: fdwapi.h:191
Index ri_RangeTableIndex
Definition: execnodes.h:327
#define ALLOCSET_SMALL_MINSIZE
Definition: memutils.h:150
static void postgresEndDirectModify(ForeignScanState *node)
#define USE_ISO_DATES
Definition: miscadmin.h:209
JoinType
Definition: nodes.h:627
List * targetList
Definition: parsenodes.h:131
void deparseInsertSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1220
unsigned int cursor_number
Definition: postgres_fdw.c:139
struct RelOptInfo ** simple_rel_array
Definition: relation.h:175
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1943
ItemPointerData * ItemPointer
Definition: itemptr.h:52
HeapTupleHeader t_data
Definition: htup.h:67
ErrorContextCallback * error_context_stack
Definition: elog.c:89
void ReleaseConnection(PGconn *conn)
Definition: connection.c:411
static char * relname(char const *dir, char const *base)
Definition: zic.c:755
RecheckForeignScan_function RecheckForeignScan
Definition: fdwapi.h:207
IterateDirectModify_function IterateDirectModify
Definition: fdwapi.h:201
#define list_make1(x1)
Definition: pg_list.h:133
#define NAMEDATALEN
PlanState ps
Definition: execnodes.h:1248
char * relname
Definition: primnodes.h:74
Value * makeInteger(long i)
Definition: value.c:23
void ExplainPropertyText(const char *qlabel, const char *value, ExplainState *es)
Definition: explain.c:2991
JoinType jointype
Definition: plannodes.h:598
static void postgresEndForeignScan(ForeignScanState *node)
Relids lateral_relids
Definition: relation.h:506
bool defGetBoolean(DefElem *def)
Definition: define.c:111
Cost per_tuple
Definition: relation.h:46
#define TIDOID
Definition: pg_type.h:332
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execQual.c:4452
void pfree(void *pointer)
Definition: mcxt.c:995
MemoryContext es_query_cxt
Definition: execnodes.h:386
void deparseDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1392
SpecialJoinInfo * sjinfo
Definition: relation.h:1994
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
ForeignScanState * fsstate
Definition: postgres_fdw.c:257
#define linitial(l)
Definition: pg_list.h:110
#define planner_rt_fetch(rti, root)
Definition: relation.h:314
#define ERROR
Definition: elog.h:43
PlanState ps
Definition: execnodes.h:1120
GetForeignJoinPaths_function GetForeignJoinPaths
Definition: fdwapi.h:185
const char ** param_values
Definition: postgres_fdw.c:144
bool list_member(const List *list, const void *datum)
Definition: list.c:444
List * list_append_unique_ptr(List *list, void *datum)
Definition: list.c:975
#define lfirst_int(lc)
Definition: pg_list.h:107
void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root)
Definition: costsize.c:3187
PGconn * conn
Definition: streamutil.c:45
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:160
char * defGetString(DefElem *def)
Definition: define.c:49
ItemPointerData t_ctid
Definition: htup_details.h:150
ItemPointerData t_self
Definition: htup.h:65
Selectivity local_conds_sel
Definition: postgres_fdw.h:56
static List * postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
bool bms_is_subset(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:307
#define outerPlanState(node)
Definition: execnodes.h:1072
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
static void postgresBeginDirectModify(ForeignScanState *node, int eflags)
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3006
void * list_nth(const List *list, int n)
Definition: list.c:410
#define NoLock
Definition: lockdefs.h:34
List * ExtractExtensionList(const char *extensionsString, bool warnOnMissing)
Definition: option.c:327
Selectivity joinclause_sel
Definition: postgres_fdw.h:59
static char * buf
Definition: pg_test_fsync.c:65
ForeignPath * create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, PathTarget *target, double rows, Cost startup_cost, Cost total_cost, List *pathkeys, Relids required_outer, Path *fdw_outerpath, List *fdw_private)
Definition: pathnode.c:1847
List * joininfo
Definition: relation.h:539
Relids ec_relids
Definition: relation.h:697
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition: guc.c:4977
const char * get_jointype_name(JoinType jointype)
Definition: deparse.c:1064
GetForeignRelSize_function GetForeignRelSize
Definition: fdwapi.h:171
EndForeignScan_function EndForeignScan
Definition: fdwapi.h:177
#define CStringGetDatum(X)
Definition: postgres.h:586
ExplainDirectModify_function ExplainDirectModify
Definition: fdwapi.h:212
#define HeapTupleHeaderSetXmax(tup, xid)
Definition: htup_details.h:349
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
#define InvalidTransactionId
Definition: transam.h:31
#define RelationGetRelationName(relation)
Definition: rel.h:361
#define TupIsNull(slot)
Definition: tuptable.h:138
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
struct PgFdwDirectModifyState PgFdwDirectModifyState
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:537
Relids relids
Definition: relation.h:481
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
ImportForeignSchema_function ImportForeignSchema
Definition: fdwapi.h:218
MemoryContext temp_cxt
Definition: postgres_fdw.c:216
FmgrInfo * param_flinfo
Definition: postgres_fdw.c:142
#define list_make5(x1, x2, x3, x4, x5)
Definition: pg_list.h:137
PlanForeignModify_function PlanForeignModify
Definition: fdwapi.h:192
static unsigned int cursor_number
Definition: connection.c:59
AttInMetadata * attinmeta
Definition: postgres_fdw.c:131
EndForeignModify_function EndForeignModify
Definition: fdwapi.h:197
PG_MODULE_MAGIC
Definition: postgres_fdw.c:42
void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *remote_conds, List **params_list, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1414
#define ereport(elevel, rest)
Definition: elog.h:122
#define rt_fetch(rangetable_index, rangetable)
Definition: parsetree.h:31
TargetEntry * makeTargetEntry(Expr *expr, AttrNumber resno, char *resname, bool resjunk)
Definition: makefuncs.c:235
Var * makeVar(Index varno, AttrNumber varattno, Oid vartype, int32 vartypmod, Oid varcollid, Index varlevelsup)
Definition: makefuncs.c:67
ReservoirStateData rstate
Definition: postgres_fdw.c:236
void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
Definition: deparse.c:1508
List * lappend_int(List *list, int datum)
Definition: list.c:146
Index relid
Definition: relation.h:509
Bitmapset * chgParam
Definition: execnodes.h:1052
AttrNumber ctidAttno
Definition: postgres_fdw.c:181
#define outerPlan(node)
Definition: plannodes.h:151
List * lappend(List *list, void *datum)
Definition: list.c:128
GetForeignPaths_function GetForeignPaths
Definition: fdwapi.h:172
Relids lateral_referencers
Definition: relation.h:517
Expr * clause
Definition: relation.h:1584
static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node)
bool bms_is_empty(const Bitmapset *a)
Definition: bitmapset.c:633
PlanDirectModify_function PlanDirectModify
Definition: fdwapi.h:199
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
PlanState ** mt_plans
Definition: execnodes.h:1124
#define ALLOCSET_SMALL_INITSIZE
Definition: memutils.h:151
Oid serverid
Definition: relation.h:528
char * get_relid_attribute_name(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:801
List * ec_opfamilies
Definition: relation.h:692
AttInMetadata * attinmeta
Definition: postgres_fdw.c:225
List * exprs
Definition: relation.h:802
double tuplecount
Definition: instrument.h:54
static void postgresGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Definition: postgres_fdw.c:883
#define AttributeNumberIsValid(attributeNumber)
Definition: attnum.h:34
static int elevel
Definition: vacuumlazy.c:130
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
FmgrInfo * p_flinfo
Definition: postgres_fdw.c:183
UserMapping * user
Definition: postgres_fdw.h:79
BeginDirectModify_function BeginDirectModify
Definition: fdwapi.h:200
void deparseStringLiteral(StringInfo buf, const char *val)
Definition: deparse.c:1748
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:436
void * palloc0(Size size)
Definition: mcxt.c:923
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:98
static void conversion_error_callback(void *arg)
uintptr_t Datum
Definition: postgres.h:374
void set_baserel_size_estimates(PlannerInfo *root, RelOptInfo *rel)
Definition: costsize.c:3754
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:1071
Relids em_relids
Definition: relation.h:741
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1298
bool verbose
Definition: explain.h:31
static TupleTableSlot * get_returning_data(ForeignScanState *node)
List * restrictlist
Definition: relation.h:1992
unsigned int Index
Definition: c.h:361
MemoryContext temp_cxt
Definition: postgres_fdw.c:157
Datum InputFunctionCall(FmgrInfo *flinfo, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1893
Plan * plan
Definition: execnodes.h:1027
FdwScanPrivateIndex
Definition: postgres_fdw.c:60
double rows
Definition: relation.h:484
#define InvalidOid
Definition: postgres_ext.h:36
Oid umid
Definition: relation.h:529
int extra_float_digits
Definition: float.c:68
void * ri_FdwState
Definition: execnodes.h:337
Bitmapset * updatedCols
Definition: parsenodes.h:871
StringInfo relation_name
Definition: postgres_fdw.h:88
bool list_member_ptr(const List *list, const void *datum)
Definition: list.c:465
void PQclear(PGresult *res)
Definition: fe-exec.c:650
void * fdw_private
Definition: relation.h:533
ExecForeignUpdate_function ExecForeignUpdate
Definition: fdwapi.h:195
CmdType commandType
Definition: parsenodes.h:103
struct ConversionLocation ConversionLocation
static TupleTableSlot * postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:96
List * pathkeys
Definition: relation.h:882
#define PG_CATCH()
Definition: elog.h:292
MemoryContext temp_cxt
Definition: postgres_fdw.c:186
TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: execTuples.c:798
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:483
#define makeNode(_type_)
Definition: nodes.h:539
Bitmapset * attrs_used
Definition: postgres_fdw.h:52
MemoryContext temp_cxt
Definition: postgres_fdw.c:240
Datum tidin(PG_FUNCTION_ARGS)
Definition: tid.c:52
BlockNumber pages
Definition: relation.h:520
#define NULL
Definition: c.h:226
static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *baserel, List *join_conds, List *pathkeys, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost)
Expr * find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
#define Assert(condition)
Definition: c.h:667
#define lfirst(lc)
Definition: pg_list.h:106
char * aliasname
Definition: primnodes.h:41
ReScanForeignScan_function ReScanForeignScan
Definition: fdwapi.h:176
List * eq_classes
Definition: relation.h:231
static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot)
Definition: regguts.h:313
ForeignTable * table
Definition: postgres_fdw.h:77
OnConflictAction onConflictAction
Definition: plannodes.h:195
static void fetch_more_data(ForeignScanState *node)
Expr * expr
Definition: primnodes.h:1280
HeapTuple * tuples
Definition: postgres_fdw.c:147
IterateForeignScan_function IterateForeignScan
Definition: fdwapi.h:175
Path * GetExistingLocalJoinPath(RelOptInfo *joinrel)
Definition: foreign.c:826
struct PgFdwScanState PgFdwScanState
EquivalenceClass * pk_eclass
Definition: relation.h:769
List * ppi_clauses
Definition: relation.h:829
void deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
Definition: deparse.c:1488
#define linitial_oid(l)
Definition: pg_list.h:112
Oid serverid
Definition: foreign.h:67
#define DEFAULT_FDW_SORT_MULTIPLIER
Definition: postgres_fdw.c:51
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:217
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:432
void reset_transmission_modes(int nestlevel)
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:41
static int list_length(const List *l)
Definition: pg_list.h:89
bool ec_has_volatile
Definition: relation.h:700
MemoryContext anl_cxt
Definition: postgres_fdw.c:239
List * extract_actual_clauses(List *restrictinfo_list, bool pseudoconstant)
Definition: restrictinfo.c:391
#define MAXALIGN(LEN)
Definition: c.h:580
TupleDesc tupdesc
Definition: postgres_fdw.c:130
int DateStyle
Definition: globals.c:104
ForeignScan * make_foreignscan(List *qptlist, List *qpqual, Index scanrelid, List *fdw_exprs, List *fdw_private, List *fdw_scan_tlist, List *fdw_recheck_quals, Plan *outer_plan)
Definition: createplan.c:4903
#define PG_RE_THROW()
Definition: elog.h:313
double cpu_tuple_cost
Definition: costsize.c:106
#define InvalidAttrNumber
Definition: attnum.h:23
List * targetlist
Definition: plannodes.h:121
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1307
double ppi_rows
Definition: relation.h:828
static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
#define DatumGetPointer(X)
Definition: postgres.h:557
bool join_clause_is_movable_to(RestrictInfo *rinfo, RelOptInfo *baserel)
Definition: restrictinfo.c:476
static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
#define list_make4(x1, x2, x3, x4)
Definition: pg_list.h:136
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:442
static Datum values[MAXATTR]
Definition: bootstrap.c:160
static void get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost)
List * options
Definition: foreign.h:68
RelOptInfo * innerrel
Definition: postgres_fdw.h:92
int NewGUCNestLevel(void)
Definition: guc.c:4963
int width
Definition: relation.h:805
List * build_tlist_to_deparse(RelOptInfo *foreignrel)
Definition: deparse.c:725
static char * user
Definition: pg_regress.c:90
void(* callback)(void *arg)
Definition: elog.h:238
#define intVal(v)
Definition: value.h:52
static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags)
static void prepare_foreign_modify(PgFdwModifyState *fmstate)
#define ALLOCSET_SMALL_MAXSIZE
Definition: memutils.h:152
void * palloc(Size size)
Definition: mcxt.c:894
EquivalenceClass * left_ec
Definition: relation.h:1628
int errmsg(const char *fmt,...)
Definition: elog.c:797
CmdType operation
Definition: plannodes.h:183
static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
void deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetlist, List *targetAttrs, List *remote_conds, List **params_list, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1325
FdwDirectModifyPrivateIndex
Definition: postgres_fdw.c:111
Datum ExecGetJunkAttribute(TupleTableSlot *slot, AttrNumber attno, bool *isNull)
Definition: execJunk.c:248
char * servername
Definition: foreign.h:50
ExplainForeignModify_function ExplainForeignModify
Definition: fdwapi.h:211
static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation)
bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
Definition: deparse.c:197
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:143
int i
UserMapping * GetUserMappingById(Oid umid)
Definition: foreign.c:167
static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo)
struct PgFdwModifyState PgFdwModifyState
TargetEntry * get_tle_by_resno(List *tlist, AttrNumber resno)
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1075
IsForeignRelUpdatable_function IsForeignRelUpdatable
Definition: fdwapi.h:198
#define errcontext
Definition: elog.h:164
#define NameStr(name)
Definition: c.h:494
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition: foreign.c:219
static TupleTableSlot * postgresIterateDirectModify(ForeignScanState *node)
void * arg
AttInMetadata * attinmeta
Definition: postgres_fdw.c:168
List * returningLists
Definition: plannodes.h:190
static void execute_dml_stmt(ForeignScanState *node)
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
char * defname
Definition: parsenodes.h:665
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:144
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
FmgrInfo * attinfuncs
Definition: funcapi.h:41
Relids ppi_req_outer
Definition: relation.h:827
#define SelfItemPointerAttributeNumber
Definition: sysattr.h:21
#define DEFAULT_FDW_STARTUP_COST
Definition: postgres_fdw.c:45
#define elog
Definition: elog.h:218
Alias * eref
Definition: parsenodes.h:863
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:446
Selectivity clauselist_selectivity(PlannerInfo *root, List *clauses, int varRelid, JoinType jointype, SpecialJoinInfo *sjinfo)
Definition: clausesel.c:92
AttrNumber cur_attno
Definition: postgres_fdw.c:249
List * list_delete(List *list, void *datum)
Definition: list.c:567
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3075
#define PG_TRY()
Definition: elog.h:283
MemoryContext batch_cxt
Definition: postgres_fdw.c:156
#define BTLessStrategyNumber
Definition: stratnum.h:29
static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra)
BeginForeignModify_function BeginForeignModify
Definition: fdwapi.h:193
struct PgFdwAnalyzeState PgFdwAnalyzeState
double clamp_row_est(double nrows)
Definition: costsize.c:165
double seq_page_cost
Definition: costsize.c:104
List * shippable_extensions
Definition: postgres_fdw.h:74
Bitmapset * bms_del_member(Bitmapset *a, int x)
Definition: bitmapset.c:705
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:5806
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:459
Definition: pg_list.h:45
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1694
struct PathTarget * reltarget
Definition: relation.h:492
List * options
Definition: foreign.h:53
#define EXEC_FLAG_EXPLAIN_ONLY
Definition: executor.h:57
int16 AttrNumber
Definition: attnum.h:21
#define RelationGetRelid(relation)
Definition: rel.h:341
QualCost baserestrictcost
Definition: relation.h:538
void update_mergeclause_eclasses(PlannerInfo *root, RestrictInfo *restrictinfo)
Definition: pathkeys.c:928
CmdType
Definition: nodes.h:603
List * joinqual
Definition: plannodes.h:599
Oid serverid
Definition: foreign.h:47
#define PG_END_TRY()
Definition: elog.h:299
Definition: relation.h:862
static void postgresReScanForeignScan(ForeignScanState *node)
#define HeapTupleHeaderSetCmin(tup, cid)
Definition: htup_details.h:366
#define DEFAULT_FDW_TUPLE_COST
Definition: postgres_fdw.c:48
double Cost
Definition: nodes.h:594
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1210
QualCost local_conds_cost
Definition: postgres_fdw.h:55
SamplerRandomState randstate
Definition: sampling.h:50
double reservoir_get_next_S(ReservoirState rs, double t, int n)
Definition: sampling.c:142
List * ec_members
Definition: relation.h:694
List * generate_implied_equalities_for_column(PlannerInfo *root, RelOptInfo *rel, ec_matches_callback_type callback, void *callback_arg, Relids prohibited_rels)
Definition: equivclass.c:2044
#define INTSTYLE_POSTGRES
Definition: miscadmin.h:229
ImportForeignSchemaType list_type
Definition: parsenodes.h:2040
#define ExecEvalExpr(expr, econtext, isNull, isDone)
Definition: executor.h:72
#define HeapTupleHeaderSetXmin(tup, xid)
Definition: htup_details.h:288