PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "commands/defrem.h"
20 #include "commands/explain.h"
21 #include "commands/vacuum.h"
22 #include "foreign/fdwapi.h"
23 #include "funcapi.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "nodes/nodeFuncs.h"
27 #include "optimizer/cost.h"
28 #include "optimizer/pathnode.h"
29 #include "optimizer/paths.h"
30 #include "optimizer/planmain.h"
31 #include "optimizer/prep.h"
32 #include "optimizer/restrictinfo.h"
33 #include "optimizer/var.h"
34 #include "parser/parsetree.h"
35 #include "utils/builtins.h"
36 #include "utils/guc.h"
37 #include "utils/lsyscache.h"
38 #include "utils/memutils.h"
39 
40 
42 
43 /* Default CPU cost to start up a foreign query. */
44 #define DEFAULT_FDW_STARTUP_COST 100.0
45 
46 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
47 #define DEFAULT_FDW_TUPLE_COST 0.01
48 
49 /*
50  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
51  * foreign table. This information is collected by postgresGetForeignRelSize.
52  */
53 typedef struct PgFdwRelationInfo
54 {
55  /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */
58 
59  /* Bitmap of attr numbers we need to fetch from the remote server. */
61 
62  /* Cost and selectivity of local_conds. */
65 
66  /* Estimated size and cost for a scan with baserestrictinfo quals. */
67  double rows;
68  int width;
71 
72  /* Options extracted from catalogs. */
76 
77  /* Cached catalog information. */
80  UserMapping *user; /* only set in use_remote_estimate mode */
82 
83 /*
84  * Indexes of FDW-private information stored in fdw_private lists.
85  *
86  * We store various information in ForeignScan.fdw_private to pass it from
87  * planner to executor. Currently we store:
88  *
89  * 1) SELECT statement text to be sent to the remote server
90  * 2) Integer list of attribute numbers retrieved by the SELECT
91  *
92  * These items are indexed with the enum FdwScanPrivateIndex, so an item
93  * can be fetched with list_nth(). For example, to get the SELECT statement:
94  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
95  */
97 {
98  /* SQL statement to execute remotely (as a String node) */
100  /* Integer list of attribute numbers retrieved by the SELECT */
102 };
103 
104 /*
105  * Similarly, this enum describes what's kept in the fdw_private list for
106  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
107  *
108  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
109  * 2) Integer list of target attribute numbers for INSERT/UPDATE
110  * (NIL for a DELETE)
111  * 3) Boolean flag showing if there's a RETURNING clause
112  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
113  */
115 {
116  /* SQL statement to execute remotely (as a String node) */
118  /* Integer list of target attribute numbers for INSERT/UPDATE */
120  /* has-returning flag (as an integer Value node) */
122  /* Integer list of attribute numbers retrieved by RETURNING */
124 };
125 
126 /*
127  * Execution state of a foreign scan using postgres_fdw.
128  */
129 typedef struct PgFdwScanState
130 {
131  Relation rel; /* relcache entry for the foreign table */
132  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
133 
134  /* extracted fdw_private data */
135  char *query; /* text of SELECT command */
136  List *retrieved_attrs; /* list of retrieved attribute numbers */
137 
138  /* for remote query execution */
139  PGconn *conn; /* connection for the scan */
140  unsigned int cursor_number; /* quasi-unique ID for my cursor */
141  bool cursor_exists; /* have we created the cursor? */
142  int numParams; /* number of parameters passed to query */
143  FmgrInfo *param_flinfo; /* output conversion functions for them */
144  List *param_exprs; /* executable expressions for param values */
145  const char **param_values; /* textual values of query parameters */
146 
147  /* for storing result tuples */
148  HeapTuple *tuples; /* array of currently-retrieved tuples */
149  int num_tuples; /* # of tuples in array */
150  int next_tuple; /* index of next one to return */
151 
152  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
153  int fetch_ct_2; /* Min(# of fetches done, 2) */
154  bool eof_reached; /* true if last fetch reached EOF */
155 
156  /* working memory contexts */
157  MemoryContext batch_cxt; /* context holding current batch of tuples */
158  MemoryContext temp_cxt; /* context for per-tuple temporary data */
160 
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166  Relation rel; /* relcache entry for the foreign table */
167  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
168 
169  /* for remote query execution */
170  PGconn *conn; /* connection for the scan */
171  char *p_name; /* name of prepared statement, if created */
172 
173  /* extracted fdw_private data */
174  char *query; /* text of INSERT/UPDATE/DELETE command */
175  List *target_attrs; /* list of target attribute numbers */
176  bool has_returning; /* is there a RETURNING clause? */
177  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
178 
179  /* info about parameters for prepared statement */
180  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181  int p_nums; /* number of parameters to transmit */
182  FmgrInfo *p_flinfo; /* output conversion functions for them */
183 
184  /* working memory context */
185  MemoryContext temp_cxt; /* context for per-tuple temporary data */
187 
188 /*
189  * Workspace for analyzing a foreign table.
190  */
191 typedef struct PgFdwAnalyzeState
192 {
193  Relation rel; /* relcache entry for the foreign table */
194  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
195  List *retrieved_attrs; /* attr numbers retrieved by query */
196 
197  /* collected sample rows */
198  HeapTuple *rows; /* array of size targrows */
199  int targrows; /* target # of sample rows */
200  int numrows; /* # of sample rows collected */
201 
202  /* for random sampling */
203  double samplerows; /* # of rows fetched */
204  double rowstoskip; /* # of rows to skip before next sample */
205  double rstate; /* random state */
206 
207  /* working memory contexts */
208  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
209  MemoryContext temp_cxt; /* context for per-tuple temporary data */
211 
212 /*
213  * Identify the attribute where data conversion fails.
214  */
215 typedef struct ConversionLocation
216 {
217  Relation rel; /* foreign table's relcache entry */
218  AttrNumber cur_attno; /* attribute number being processed, or 0 */
220 
221 /* Callback argument for ec_member_matches_foreign */
222 typedef struct
223 {
224  Expr *current; /* current expr, or NULL if not yet found */
225  List *already_used; /* expressions already dealt with */
227 
228 /*
229  * SQL functions
230  */
232 
234 
235 /*
236  * FDW callback routines
237  */
238 static void postgresGetForeignRelSize(PlannerInfo *root,
239  RelOptInfo *baserel,
240  Oid foreigntableid);
241 static void postgresGetForeignPaths(PlannerInfo *root,
242  RelOptInfo *baserel,
243  Oid foreigntableid);
245  RelOptInfo *baserel,
246  Oid foreigntableid,
247  ForeignPath *best_path,
248  List *tlist,
249  List *scan_clauses);
250 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
253 static void postgresEndForeignScan(ForeignScanState *node);
254 static void postgresAddForeignUpdateTargets(Query *parsetree,
255  RangeTblEntry *target_rte,
256  Relation target_relation);
258  ModifyTable *plan,
259  Index resultRelation,
260  int subplan_index);
261 static void postgresBeginForeignModify(ModifyTableState *mtstate,
262  ResultRelInfo *resultRelInfo,
263  List *fdw_private,
264  int subplan_index,
265  int eflags);
267  ResultRelInfo *resultRelInfo,
268  TupleTableSlot *slot,
269  TupleTableSlot *planSlot);
271  ResultRelInfo *resultRelInfo,
272  TupleTableSlot *slot,
273  TupleTableSlot *planSlot);
275  ResultRelInfo *resultRelInfo,
276  TupleTableSlot *slot,
277  TupleTableSlot *planSlot);
278 static void postgresEndForeignModify(EState *estate,
279  ResultRelInfo *resultRelInfo);
282  ExplainState *es);
284  ResultRelInfo *rinfo,
285  List *fdw_private,
286  int subplan_index,
287  ExplainState *es);
288 static bool postgresAnalyzeForeignTable(Relation relation,
289  AcquireSampleRowsFunc *func,
290  BlockNumber *totalpages);
291 
292 /*
293  * Helper functions
294  */
295 static void estimate_path_cost_size(PlannerInfo *root,
296  RelOptInfo *baserel,
297  List *join_conds,
298  double *p_rows, int *p_width,
299  Cost *p_startup_cost, Cost *p_total_cost);
300 static void get_remote_estimate(const char *sql,
301  PGconn *conn,
302  double *rows,
303  int *width,
304  Cost *startup_cost,
305  Cost *total_cost);
306 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
308  void *arg);
309 static void create_cursor(ForeignScanState *node);
310 static void fetch_more_data(ForeignScanState *node);
311 static void close_cursor(PGconn *conn, unsigned int cursor_number);
312 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
313 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
314  ItemPointer tupleid,
315  TupleTableSlot *slot);
316 static void store_returning_result(PgFdwModifyState *fmstate,
317  TupleTableSlot *slot, PGresult *res);
318 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
319  HeapTuple *rows, int targrows,
320  double *totalrows,
321  double *totaldeadrows);
322 static void analyze_row_processor(PGresult *res, int row,
323  PgFdwAnalyzeState *astate);
325  int row,
326  Relation rel,
327  AttInMetadata *attinmeta,
328  List *retrieved_attrs,
329  MemoryContext temp_context);
330 static void conversion_error_callback(void *arg);
331 
332 
333 /*
334  * Foreign-data wrapper handler function: return a struct with pointers
335  * to my callback routines.
336  */
337 Datum
339 {
340  FdwRoutine *routine = makeNode(FdwRoutine);
341 
342  /* Functions for scanning foreign tables */
350 
351  /* Functions for updating foreign tables */
360 
361  /* Support functions for EXPLAIN */
364 
365  /* Support functions for ANALYZE */
367 
368  PG_RETURN_POINTER(routine);
369 }
370 
371 /*
372  * postgresGetForeignRelSize
373  * Estimate # of rows and width of the result of the scan
374  *
375  * We should consider the effect of all baserestrictinfo clauses here, but
376  * not any join clauses.
377  */
378 static void
380  RelOptInfo *baserel,
381  Oid foreigntableid)
382 {
383  PgFdwRelationInfo *fpinfo;
384  ListCell *lc;
385 
386  /*
387  * We use PgFdwRelationInfo to pass various information to subsequent
388  * functions.
389  */
390  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
391  baserel->fdw_private = (void *) fpinfo;
392 
393  /* Look up foreign-table catalog info. */
394  fpinfo->table = GetForeignTable(foreigntableid);
395  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
396 
397  /*
398  * Extract user-settable option values. Note that per-table setting of
399  * use_remote_estimate overrides per-server setting.
400  */
401  fpinfo->use_remote_estimate = false;
404 
405  foreach(lc, fpinfo->server->options)
406  {
407  DefElem *def = (DefElem *) lfirst(lc);
408 
409  if (strcmp(def->defname, "use_remote_estimate") == 0)
410  fpinfo->use_remote_estimate = defGetBoolean(def);
411  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
412  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
413  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
414  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
415  }
416  foreach(lc, fpinfo->table->options)
417  {
418  DefElem *def = (DefElem *) lfirst(lc);
419 
420  if (strcmp(def->defname, "use_remote_estimate") == 0)
421  {
422  fpinfo->use_remote_estimate = defGetBoolean(def);
423  break; /* only need the one value */
424  }
425  }
426 
427  /*
428  * If the table or the server is configured to use remote estimates,
429  * identify which user to do remote access as during planning. This
430  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
431  * permissions, the query would have failed at runtime anyway.
432  */
433  if (fpinfo->use_remote_estimate)
434  {
435  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
436  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
437 
438  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
439  }
440  else
441  fpinfo->user = NULL;
442 
443  /*
444  * Identify which baserestrictinfo clauses can be sent to the remote
445  * server and which can't.
446  */
447  classifyConditions(root, baserel,
448  &fpinfo->remote_conds, &fpinfo->local_conds);
449 
450  /*
451  * Identify which attributes will need to be retrieved from the remote
452  * server. These include all attrs needed for joins or final output, plus
453  * all attrs used in the local_conds. (Note: if we end up using a
454  * parameterized scan, it's possible that some of the join clauses will be
455  * sent to the remote and thus we wouldn't really need to retrieve the
456  * columns used in them. Doesn't seem worth detecting that case though.)
457  */
458  fpinfo->attrs_used = NULL;
459  pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
460  &fpinfo->attrs_used);
461  foreach(lc, fpinfo->local_conds)
462  {
463  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
464 
465  pull_varattnos((Node *) rinfo->clause, baserel->relid,
466  &fpinfo->attrs_used);
467  }
468 
469  /*
470  * Compute the selectivity and cost of the local_conds, so we don't have
471  * to do it over again for each path. The best we can do for these
472  * conditions is to estimate selectivity on the basis of local statistics.
473  */
475  fpinfo->local_conds,
476  baserel->relid,
477  JOIN_INNER,
478  NULL);
479 
480  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
481 
482  /*
483  * If the table or the server is configured to use remote estimates,
484  * connect to the foreign server and execute EXPLAIN to estimate the
485  * number of rows selected by the restriction clauses, as well as the
486  * average row width. Otherwise, estimate using whatever statistics we
487  * have locally, in a way similar to ordinary tables.
488  */
489  if (fpinfo->use_remote_estimate)
490  {
491  /*
492  * Get cost/size estimates with help of remote server. Save the
493  * values in fpinfo so we don't need to do it again to generate the
494  * basic foreign path.
495  */
496  estimate_path_cost_size(root, baserel, NIL,
497  &fpinfo->rows, &fpinfo->width,
498  &fpinfo->startup_cost, &fpinfo->total_cost);
499 
500  /* Report estimated baserel size to planner. */
501  baserel->rows = fpinfo->rows;
502  baserel->width = fpinfo->width;
503  }
504  else
505  {
506  /*
507  * If the foreign table has never been ANALYZEd, it will have relpages
508  * and reltuples equal to zero, which most likely has nothing to do
509  * with reality. We can't do a whole lot about that if we're not
510  * allowed to consult the remote server, but we can use a hack similar
511  * to plancat.c's treatment of empty relations: use a minimum size
512  * estimate of 10 pages, and divide by the column-datatype-based width
513  * estimate to get the corresponding number of tuples.
514  */
515  if (baserel->pages == 0 && baserel->tuples == 0)
516  {
517  baserel->pages = 10;
518  baserel->tuples =
519  (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData));
520  }
521 
522  /* Estimate baserel size as best we can with local statistics. */
523  set_baserel_size_estimates(root, baserel);
524 
525  /* Fill in basically-bogus cost estimates for use later. */
526  estimate_path_cost_size(root, baserel, NIL,
527  &fpinfo->rows, &fpinfo->width,
528  &fpinfo->startup_cost, &fpinfo->total_cost);
529  }
530 }
531 
532 /*
533  * postgresGetForeignPaths
534  * Create possible scan paths for a scan on the foreign table
535  */
536 static void
538  RelOptInfo *baserel,
539  Oid foreigntableid)
540 {
541  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
542  ForeignPath *path;
543  List *join_quals;
544  Relids required_outer;
545  double rows;
546  int width;
547  Cost startup_cost;
548  Cost total_cost;
549  ListCell *lc;
550 
551  /*
552  * Create simplest ForeignScan path node and add it to baserel. This path
553  * corresponds to SeqScan path of regular tables (though depending on what
554  * baserestrict conditions we were able to send to remote, there might
555  * actually be an indexscan happening there). We already did all the work
556  * to estimate cost and size of this path.
557  */
558  path = create_foreignscan_path(root, baserel,
559  fpinfo->rows,
560  fpinfo->startup_cost,
561  fpinfo->total_cost,
562  NIL, /* no pathkeys */
563  NULL, /* no outer rel either */
564  NIL); /* no fdw_private list */
565  add_path(baserel, (Path *) path);
566 
567  /*
568  * If we're not using remote estimates, stop here. We have no way to
569  * estimate whether any join clauses would be worth sending across, so
570  * don't bother building parameterized paths.
571  */
572  if (!fpinfo->use_remote_estimate)
573  return;
574 
575  /*
576  * As a crude first hack, we consider each available join clause and try
577  * to make a parameterized path using just that clause. Later we should
578  * consider combinations of clauses, probably.
579  */
580 
581  /* Scan the rel's join clauses */
582  foreach(lc, baserel->joininfo)
583  {
584  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
585 
586  /* Check if clause can be moved to this rel */
587  if (!join_clause_is_movable_to(rinfo, baserel))
588  continue;
589 
590  /* See if it is safe to send to remote */
591  if (!is_foreign_expr(root, baserel, rinfo->clause))
592  continue;
593 
594  /*
595  * OK, get a cost estimate from the remote, and make a path.
596  */
597  join_quals = list_make1(rinfo);
598  estimate_path_cost_size(root, baserel, join_quals,
599  &rows, &width,
600  &startup_cost, &total_cost);
601 
602  /* Must calculate required outer rels for this path */
603  required_outer = bms_union(rinfo->clause_relids,
604  baserel->lateral_relids);
605  /* We do not want the foreign rel itself listed in required_outer */
606  required_outer = bms_del_member(required_outer, baserel->relid);
607  /* Enforce convention that required_outer is exactly NULL if empty */
608  if (bms_is_empty(required_outer))
609  required_outer = NULL;
610 
611  path = create_foreignscan_path(root, baserel,
612  rows,
613  startup_cost,
614  total_cost,
615  NIL, /* no pathkeys */
616  required_outer,
617  NIL); /* no fdw_private list */
618  add_path(baserel, (Path *) path);
619  }
620 
621  /*
622  * The above scan examined only "generic" join clauses, not those that
623  * were absorbed into EquivalenceClauses. See if we can make anything out
624  * of EquivalenceClauses.
625  */
626  if (baserel->has_eclass_joins)
627  {
628  /*
629  * We repeatedly scan the eclass list looking for column references
630  * (or expressions) belonging to the foreign rel. Each time we find
631  * one, we generate a list of equivalence joinclauses for it, and then
632  * try to make those into foreign paths. Repeat till there are no
633  * more candidate EC members.
634  */
636 
637  arg.already_used = NIL;
638  for (;;)
639  {
640  List *clauses;
641 
642  /* Make clauses, skipping any that join to lateral_referencers */
643  arg.current = NULL;
645  baserel,
647  (void *) &arg,
648  baserel->lateral_referencers);
649 
650  /* Done if there are no more expressions in the foreign rel */
651  if (arg.current == NULL)
652  {
653  Assert(clauses == NIL);
654  break;
655  }
656 
657  /* Scan the extracted join clauses */
658  foreach(lc, clauses)
659  {
660  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
661 
662  /* Check if clause can be moved to this rel */
663  if (!join_clause_is_movable_to(rinfo, baserel))
664  continue;
665 
666  /* See if it is safe to send to remote */
667  if (!is_foreign_expr(root, baserel, rinfo->clause))
668  continue;
669 
670  /*
671  * OK, get a cost estimate from the remote, and make a path.
672  */
673  join_quals = list_make1(rinfo);
674  estimate_path_cost_size(root, baserel, join_quals,
675  &rows, &width,
676  &startup_cost, &total_cost);
677 
678  /* Must calculate required outer rels for this path */
679  required_outer = bms_union(rinfo->clause_relids,
680  baserel->lateral_relids);
681  required_outer = bms_del_member(required_outer, baserel->relid);
682  if (bms_is_empty(required_outer))
683  required_outer = NULL;
684 
685  path = create_foreignscan_path(root, baserel,
686  rows,
687  startup_cost,
688  total_cost,
689  NIL, /* no pathkeys */
690  required_outer,
691  NIL); /* no fdw_private */
692  add_path(baserel, (Path *) path);
693  }
694 
695  /* Try again, now ignoring the expression we found this time */
696  arg.already_used = lappend(arg.already_used, arg.current);
697  }
698  }
699 }
700 
701 /*
702  * postgresGetForeignPlan
703  * Create ForeignScan plan node which implements selected best path
704  */
705 static ForeignScan *
707  RelOptInfo *baserel,
708  Oid foreigntableid,
709  ForeignPath *best_path,
710  List *tlist,
711  List *scan_clauses)
712 {
713  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
714  Index scan_relid = baserel->relid;
715  List *fdw_private;
716  List *remote_conds = NIL;
717  List *local_exprs = NIL;
718  List *params_list = NIL;
719  List *retrieved_attrs;
720  StringInfoData sql;
721  ListCell *lc;
722 
723  /*
724  * Separate the scan_clauses into those that can be executed remotely and
725  * those that can't. baserestrictinfo clauses that were previously
726  * determined to be safe or unsafe by classifyConditions are shown in
727  * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
728  * scan_clauses list should be a join clause that was found safe by
729  * postgresGetForeignPaths.
730  *
731  * Note: for clauses extracted from EquivalenceClasses, it's possible that
732  * what we get here is a different representation of the clause than what
733  * postgresGetForeignPaths saw; for example we might get a commuted
734  * version of the clause. So we can't insist on simple equality as we do
735  * for the baserestrictinfo clauses.
736  *
737  * This code must match "extract_actual_clauses(scan_clauses, false)"
738  * except for the additional decision about remote versus local execution.
739  * Note however that we only strip the RestrictInfo nodes from the
740  * local_exprs list, since appendWhereClause expects a list of
741  * RestrictInfos.
742  */
743  foreach(lc, scan_clauses)
744  {
745  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
746 
747  Assert(IsA(rinfo, RestrictInfo));
748 
749  /* Ignore any pseudoconstants, they're dealt with elsewhere */
750  if (rinfo->pseudoconstant)
751  continue;
752 
753  if (list_member_ptr(fpinfo->remote_conds, rinfo))
754  remote_conds = lappend(remote_conds, rinfo);
755  else if (list_member_ptr(fpinfo->local_conds, rinfo))
756  local_exprs = lappend(local_exprs, rinfo->clause);
757  else
758  {
759  Assert(is_foreign_expr(root, baserel, rinfo->clause));
760  remote_conds = lappend(remote_conds, rinfo);
761  }
762  }
763 
764  /*
765  * Build the query string to be sent for execution, and identify
766  * expressions to be sent as parameters.
767  */
768  initStringInfo(&sql);
769  deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
770  &retrieved_attrs);
771  if (remote_conds)
772  appendWhereClause(&sql, root, baserel, remote_conds,
773  true, &params_list);
774 
775  /*
776  * Add FOR UPDATE/SHARE if appropriate. We apply locking during the
777  * initial row fetch, rather than later on as is done for local tables.
778  * The extra roundtrips involved in trying to duplicate the local
779  * semantics exactly don't seem worthwhile (see also comments for
780  * RowMarkType).
781  *
782  * Note: because we actually run the query as a cursor, this assumes that
783  * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3.
784  */
785  if (baserel->relid == root->parse->resultRelation &&
786  (root->parse->commandType == CMD_UPDATE ||
787  root->parse->commandType == CMD_DELETE))
788  {
789  /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
790  appendStringInfoString(&sql, " FOR UPDATE");
791  }
792  else
793  {
794  RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
795 
796  if (rc)
797  {
798  /*
799  * Relation is specified as a FOR UPDATE/SHARE target, so handle
800  * that.
801  *
802  * For now, just ignore any [NO] KEY specification, since (a) it's
803  * not clear what that means for a remote table that we don't have
804  * complete information about, and (b) it wouldn't work anyway on
805  * older remote servers. Likewise, we don't worry about NOWAIT.
806  */
807  switch (rc->strength)
808  {
809  case LCS_FORKEYSHARE:
810  case LCS_FORSHARE:
811  appendStringInfoString(&sql, " FOR SHARE");
812  break;
813  case LCS_FORNOKEYUPDATE:
814  case LCS_FORUPDATE:
815  appendStringInfoString(&sql, " FOR UPDATE");
816  break;
817  }
818  }
819  }
820 
821  /*
822  * Build the fdw_private list that will be available to the executor.
823  * Items in the list must match enum FdwScanPrivateIndex, above.
824  */
825  fdw_private = list_make2(makeString(sql.data),
826  retrieved_attrs);
827 
828  /*
829  * Create the ForeignScan node from target list, local filtering
830  * expressions, remote parameter expressions, and FDW private information.
831  *
832  * Note that the remote parameter expressions are stored in the fdw_exprs
833  * field of the finished plan node; we can't keep them in private state
834  * because then they wouldn't be subject to later planner processing.
835  */
836  return make_foreignscan(tlist,
837  local_exprs,
838  scan_relid,
839  params_list,
840  fdw_private);
841 }
842 
843 /*
844  * postgresBeginForeignScan
845  * Initiate an executor scan of a foreign PostgreSQL table.
846  */
847 static void
849 {
850  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
851  EState *estate = node->ss.ps.state;
852  PgFdwScanState *fsstate;
853  RangeTblEntry *rte;
854  Oid userid;
855  ForeignTable *table;
856  ForeignServer *server;
857  UserMapping *user;
858  int numParams;
859  int i;
860  ListCell *lc;
861 
862  /*
863  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
864  */
865  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
866  return;
867 
868  /*
869  * We'll save private state in node->fdw_state.
870  */
871  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
872  node->fdw_state = (void *) fsstate;
873 
874  /*
875  * Identify which user to do the remote access as. This should match what
876  * ExecCheckRTEPerms() does.
877  */
878  rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
879  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
880 
881  /* Get info about foreign table. */
882  fsstate->rel = node->ss.ss_currentRelation;
883  table = GetForeignTable(RelationGetRelid(fsstate->rel));
884  server = GetForeignServer(table->serverid);
885  user = GetUserMapping(userid, server->serverid);
886 
887  /*
888  * Get connection to the foreign server. Connection manager will
889  * establish new connection if necessary.
890  */
891  fsstate->conn = GetConnection(server, user, false);
892 
893  /* Assign a unique ID for my cursor */
894  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
895  fsstate->cursor_exists = false;
896 
897  /* Get private info created by planner functions. */
898  fsstate->query = strVal(list_nth(fsplan->fdw_private,
900  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
902 
903  /* Create contexts for batches of tuples and per-tuple temp workspace. */
904  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
905  "postgres_fdw tuple data",
909  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
910  "postgres_fdw temporary data",
914 
915  /* Get info we'll need for input data conversion. */
916  fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
917 
918  /* Prepare for output conversion of parameters used in remote query. */
919  numParams = list_length(fsplan->fdw_exprs);
920  fsstate->numParams = numParams;
921  fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
922 
923  i = 0;
924  foreach(lc, fsplan->fdw_exprs)
925  {
926  Node *param_expr = (Node *) lfirst(lc);
927  Oid typefnoid;
928  bool isvarlena;
929 
930  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
931  fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
932  i++;
933  }
934 
935  /*
936  * Prepare remote-parameter expressions for evaluation. (Note: in
937  * practice, we expect that all these expressions will be just Params, so
938  * we could possibly do something more efficient than using the full
939  * expression-eval machinery for this. But probably there would be little
940  * benefit, and it'd require postgres_fdw to know more than is desirable
941  * about Param evaluation.)
942  */
943  fsstate->param_exprs = (List *)
944  ExecInitExpr((Expr *) fsplan->fdw_exprs,
945  (PlanState *) node);
946 
947  /*
948  * Allocate buffer for text form of query parameters, if any.
949  */
950  if (numParams > 0)
951  fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
952  else
953  fsstate->param_values = NULL;
954 }
955 
956 /*
957  * postgresIterateForeignScan
958  * Retrieve next row from the result set, or clear tuple slot to indicate
959  * EOF.
960  */
961 static TupleTableSlot *
963 {
964  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
965  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
966 
967  /*
968  * If this is the first call after Begin or ReScan, we need to create the
969  * cursor on the remote side.
970  */
971  if (!fsstate->cursor_exists)
972  create_cursor(node);
973 
974  /*
975  * Get some more tuples, if we've run out.
976  */
977  if (fsstate->next_tuple >= fsstate->num_tuples)
978  {
979  /* No point in another fetch if we already detected EOF, though. */
980  if (!fsstate->eof_reached)
981  fetch_more_data(node);
982  /* If we didn't get any tuples, must be end of data. */
983  if (fsstate->next_tuple >= fsstate->num_tuples)
984  return ExecClearTuple(slot);
985  }
986 
987  /*
988  * Return the next tuple.
989  */
990  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
991  slot,
993  false);
994 
995  return slot;
996 }
997 
998 /*
999  * postgresReScanForeignScan
1000  * Restart the scan.
1001  */
1002 static void
1004 {
1005  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1006  char sql[64];
1007  PGresult *res;
1008 
1009  /* If we haven't created the cursor yet, nothing to do. */
1010  if (!fsstate->cursor_exists)
1011  return;
1012 
1013  /*
1014  * If any internal parameters affecting this node have changed, we'd
1015  * better destroy and recreate the cursor. Otherwise, rewinding it should
1016  * be good enough. If we've only fetched zero or one batch, we needn't
1017  * even rewind the cursor, just rescan what we have.
1018  */
1019  if (node->ss.ps.chgParam != NULL)
1020  {
1021  fsstate->cursor_exists = false;
1022  snprintf(sql, sizeof(sql), "CLOSE c%u",
1023  fsstate->cursor_number);
1024  }
1025  else if (fsstate->fetch_ct_2 > 1)
1026  {
1027  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1028  fsstate->cursor_number);
1029  }
1030  else
1031  {
1032  /* Easy: just rescan what we already have in memory, if anything */
1033  fsstate->next_tuple = 0;
1034  return;
1035  }
1036 
1037  /*
1038  * We don't use a PG_TRY block here, so be careful not to throw error
1039  * without releasing the PGresult.
1040  */
1041  res = PQexec(fsstate->conn, sql);
1042  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1043  pgfdw_report_error(ERROR, res, true, sql);
1044  PQclear(res);
1045 
1046  /* Now force a fresh FETCH. */
1047  fsstate->tuples = NULL;
1048  fsstate->num_tuples = 0;
1049  fsstate->next_tuple = 0;
1050  fsstate->fetch_ct_2 = 0;
1051  fsstate->eof_reached = false;
1052 }
1053 
1054 /*
1055  * postgresEndForeignScan
1056  * Finish scanning foreign table and dispose objects used for this scan
1057  */
1058 static void
1060 {
1061  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1062 
1063  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1064  if (fsstate == NULL)
1065  return;
1066 
1067  /* Close the cursor if open, to prevent accumulation of cursors */
1068  if (fsstate->cursor_exists)
1069  close_cursor(fsstate->conn, fsstate->cursor_number);
1070 
1071  /* Release remote connection */
1072  ReleaseConnection(fsstate->conn);
1073  fsstate->conn = NULL;
1074 
1075  /* MemoryContexts will be deleted automatically. */
1076 }
1077 
1078 /*
1079  * postgresAddForeignUpdateTargets
1080  * Add resjunk column(s) needed for update/delete on a foreign table
1081  */
1082 static void
1084  RangeTblEntry *target_rte,
1085  Relation target_relation)
1086 {
1087  Var *var;
1088  const char *attrname;
1089  TargetEntry *tle;
1090 
1091  /*
1092  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1093  */
1094 
1095  /* Make a Var representing the desired value */
1096  var = makeVar(parsetree->resultRelation,
1098  TIDOID,
1099  -1,
1100  InvalidOid,
1101  0);
1102 
1103  /* Wrap it in a resjunk TLE with the right name ... */
1104  attrname = "ctid";
1105 
1106  tle = makeTargetEntry((Expr *) var,
1107  list_length(parsetree->targetList) + 1,
1108  pstrdup(attrname),
1109  true);
1110 
1111  /* ... and add it to the query's targetlist */
1112  parsetree->targetList = lappend(parsetree->targetList, tle);
1113 }
1114 
1115 /*
1116  * postgresPlanForeignModify
1117  * Plan an insert/update/delete operation on a foreign table
1118  *
1119  * Note: currently, the plan tree generated for UPDATE/DELETE will always
1120  * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
1121  * and then the ModifyTable node will have to execute individual remote
1122  * UPDATE/DELETE commands. If there are no local conditions or joins
1123  * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
1124  * and then do nothing at ModifyTable. Room for future optimization ...
1125  */
1126 static List *
1128  ModifyTable *plan,
1129  Index resultRelation,
1130  int subplan_index)
1131 {
1132  CmdType operation = plan->operation;
1133  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1134  Relation rel;
1135  StringInfoData sql;
1136  List *targetAttrs = NIL;
1137  List *returningList = NIL;
1138  List *retrieved_attrs = NIL;
1139 
1140  initStringInfo(&sql);
1141 
1142  /*
1143  * Core code already has some lock on each rel being planned, so we can
1144  * use NoLock here.
1145  */
1146  rel = heap_open(rte->relid, NoLock);
1147 
1148  /*
1149  * In an INSERT, we transmit all columns that are defined in the foreign
1150  * table. In an UPDATE, we transmit only columns that were explicitly
1151  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1152  * (We can't do that for INSERT since we would miss sending default values
1153  * for columns not listed in the source statement.)
1154  */
1155  if (operation == CMD_INSERT)
1156  {
1157  TupleDesc tupdesc = RelationGetDescr(rel);
1158  int attnum;
1159 
1160  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1161  {
1162  Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1163 
1164  if (!attr->attisdropped)
1165  targetAttrs = lappend_int(targetAttrs, attnum);
1166  }
1167  }
1168  else if (operation == CMD_UPDATE)
1169  {
1170  Bitmapset *tmpset = bms_copy(rte->modifiedCols);
1171  AttrNumber col;
1172 
1173  while ((col = bms_first_member(tmpset)) >= 0)
1174  {
1176  if (col <= InvalidAttrNumber) /* shouldn't happen */
1177  elog(ERROR, "system-column update is not supported");
1178  targetAttrs = lappend_int(targetAttrs, col);
1179  }
1180  }
1181 
1182  /*
1183  * Extract the relevant RETURNING list if any.
1184  */
1185  if (plan->returningLists)
1186  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1187 
1188  /*
1189  * Construct the SQL command string.
1190  */
1191  switch (operation)
1192  {
1193  case CMD_INSERT:
1194  deparseInsertSql(&sql, root, resultRelation, rel,
1195  targetAttrs, returningList,
1196  &retrieved_attrs);
1197  break;
1198  case CMD_UPDATE:
1199  deparseUpdateSql(&sql, root, resultRelation, rel,
1200  targetAttrs, returningList,
1201  &retrieved_attrs);
1202  break;
1203  case CMD_DELETE:
1204  deparseDeleteSql(&sql, root, resultRelation, rel,
1205  returningList,
1206  &retrieved_attrs);
1207  break;
1208  default:
1209  elog(ERROR, "unexpected operation: %d", (int) operation);
1210  break;
1211  }
1212 
1213  heap_close(rel, NoLock);
1214 
1215  /*
1216  * Build the fdw_private list that will be available to the executor.
1217  * Items in the list must match enum FdwModifyPrivateIndex, above.
1218  */
1219  return list_make4(makeString(sql.data),
1220  targetAttrs,
1221  makeInteger((returningList != NIL)),
1222  retrieved_attrs);
1223 }
1224 
1225 /*
1226  * postgresBeginForeignModify
1227  * Begin an insert/update/delete operation on a foreign table
1228  */
1229 static void
1231  ResultRelInfo *resultRelInfo,
1232  List *fdw_private,
1233  int subplan_index,
1234  int eflags)
1235 {
1236  PgFdwModifyState *fmstate;
1237  EState *estate = mtstate->ps.state;
1238  CmdType operation = mtstate->operation;
1239  Relation rel = resultRelInfo->ri_RelationDesc;
1240  RangeTblEntry *rte;
1241  Oid userid;
1242  ForeignTable *table;
1243  ForeignServer *server;
1244  UserMapping *user;
1245  AttrNumber n_params;
1246  Oid typefnoid;
1247  bool isvarlena;
1248  ListCell *lc;
1249 
1250  /*
1251  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1252  * stays NULL.
1253  */
1254  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1255  return;
1256 
1257  /* Begin constructing PgFdwModifyState. */
1258  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1259  fmstate->rel = rel;
1260 
1261  /*
1262  * Identify which user to do the remote access as. This should match what
1263  * ExecCheckRTEPerms() does.
1264  */
1265  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1266  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1267 
1268  /* Get info about foreign table. */
1269  table = GetForeignTable(RelationGetRelid(rel));
1270  server = GetForeignServer(table->serverid);
1271  user = GetUserMapping(userid, server->serverid);
1272 
1273  /* Open connection; report that we'll create a prepared statement. */
1274  fmstate->conn = GetConnection(server, user, true);
1275  fmstate->p_name = NULL; /* prepared statement not made yet */
1276 
1277  /* Deconstruct fdw_private data. */
1278  fmstate->query = strVal(list_nth(fdw_private,
1280  fmstate->target_attrs = (List *) list_nth(fdw_private,
1282  fmstate->has_returning = intVal(list_nth(fdw_private,
1284  fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1286 
1287  /* Create context for per-tuple temp workspace. */
1288  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1289  "postgres_fdw temporary data",
1293 
1294  /* Prepare for input conversion of RETURNING results. */
1295  if (fmstate->has_returning)
1297 
1298  /* Prepare for output conversion of parameters used in prepared stmt. */
1299  n_params = list_length(fmstate->target_attrs) + 1;
1300  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1301  fmstate->p_nums = 0;
1302 
1303  if (operation == CMD_UPDATE || operation == CMD_DELETE)
1304  {
1305  /* Find the ctid resjunk column in the subplan's result */
1306  Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1307 
1309  "ctid");
1310  if (!AttributeNumberIsValid(fmstate->ctidAttno))
1311  elog(ERROR, "could not find junk ctid column");
1312 
1313  /* First transmittable parameter will be ctid */
1314  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1315  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1316  fmstate->p_nums++;
1317  }
1318 
1319  if (operation == CMD_INSERT || operation == CMD_UPDATE)
1320  {
1321  /* Set up for remaining transmittable parameters */
1322  foreach(lc, fmstate->target_attrs)
1323  {
1324  int attnum = lfirst_int(lc);
1325  Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1326 
1327  Assert(!attr->attisdropped);
1328 
1329  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1330  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1331  fmstate->p_nums++;
1332  }
1333  }
1334 
1335  Assert(fmstate->p_nums <= n_params);
1336 
1337  resultRelInfo->ri_FdwState = fmstate;
1338 }
1339 
1340 /*
1341  * postgresExecForeignInsert
1342  * Insert one row into a foreign table
1343  */
1344 static TupleTableSlot *
1346  ResultRelInfo *resultRelInfo,
1347  TupleTableSlot *slot,
1348  TupleTableSlot *planSlot)
1349 {
1350  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1351  const char **p_values;
1352  PGresult *res;
1353  int n_rows;
1354 
1355  /* Set up the prepared statement on the remote server, if we didn't yet */
1356  if (!fmstate->p_name)
1357  prepare_foreign_modify(fmstate);
1358 
1359  /* Convert parameters needed by prepared statement to text form */
1360  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1361 
1362  /*
1363  * Execute the prepared statement, and check for success.
1364  *
1365  * We don't use a PG_TRY block here, so be careful not to throw error
1366  * without releasing the PGresult.
1367  */
1368  res = PQexecPrepared(fmstate->conn,
1369  fmstate->p_name,
1370  fmstate->p_nums,
1371  p_values,
1372  NULL,
1373  NULL,
1374  0);
1375  if (PQresultStatus(res) !=
1377  pgfdw_report_error(ERROR, res, true, fmstate->query);
1378 
1379  /* Check number of rows affected, and fetch RETURNING tuple if any */
1380  if (fmstate->has_returning)
1381  {
1382  n_rows = PQntuples(res);
1383  if (n_rows > 0)
1384  store_returning_result(fmstate, slot, res);
1385  }
1386  else
1387  n_rows = atoi(PQcmdTuples(res));
1388 
1389  /* And clean up */
1390  PQclear(res);
1391 
1392  MemoryContextReset(fmstate->temp_cxt);
1393 
1394  /* Return NULL if nothing was inserted on the remote end */
1395  return (n_rows > 0) ? slot : NULL;
1396 }
1397 
1398 /*
1399  * postgresExecForeignUpdate
1400  * Update one row in a foreign table
1401  */
1402 static TupleTableSlot *
1404  ResultRelInfo *resultRelInfo,
1405  TupleTableSlot *slot,
1406  TupleTableSlot *planSlot)
1407 {
1408  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1409  Datum datum;
1410  bool isNull;
1411  const char **p_values;
1412  PGresult *res;
1413  int n_rows;
1414 
1415  /* Set up the prepared statement on the remote server, if we didn't yet */
1416  if (!fmstate->p_name)
1417  prepare_foreign_modify(fmstate);
1418 
1419  /* Get the ctid that was passed up as a resjunk column */
1420  datum = ExecGetJunkAttribute(planSlot,
1421  fmstate->ctidAttno,
1422  &isNull);
1423  /* shouldn't ever get a null result... */
1424  if (isNull)
1425  elog(ERROR, "ctid is NULL");
1426 
1427  /* Convert parameters needed by prepared statement to text form */
1428  p_values = convert_prep_stmt_params(fmstate,
1429  (ItemPointer) DatumGetPointer(datum),
1430  slot);
1431 
1432  /*
1433  * Execute the prepared statement, and check for success.
1434  *
1435  * We don't use a PG_TRY block here, so be careful not to throw error
1436  * without releasing the PGresult.
1437  */
1438  res = PQexecPrepared(fmstate->conn,
1439  fmstate->p_name,
1440  fmstate->p_nums,
1441  p_values,
1442  NULL,
1443  NULL,
1444  0);
1445  if (PQresultStatus(res) !=
1447  pgfdw_report_error(ERROR, res, true, fmstate->query);
1448 
1449  /* Check number of rows affected, and fetch RETURNING tuple if any */
1450  if (fmstate->has_returning)
1451  {
1452  n_rows = PQntuples(res);
1453  if (n_rows > 0)
1454  store_returning_result(fmstate, slot, res);
1455  }
1456  else
1457  n_rows = atoi(PQcmdTuples(res));
1458 
1459  /* And clean up */
1460  PQclear(res);
1461 
1462  MemoryContextReset(fmstate->temp_cxt);
1463 
1464  /* Return NULL if nothing was updated on the remote end */
1465  return (n_rows > 0) ? slot : NULL;
1466 }
1467 
1468 /*
1469  * postgresExecForeignDelete
1470  * Delete one row from a foreign table
1471  */
1472 static TupleTableSlot *
1474  ResultRelInfo *resultRelInfo,
1475  TupleTableSlot *slot,
1476  TupleTableSlot *planSlot)
1477 {
1478  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1479  Datum datum;
1480  bool isNull;
1481  const char **p_values;
1482  PGresult *res;
1483  int n_rows;
1484 
1485  /* Set up the prepared statement on the remote server, if we didn't yet */
1486  if (!fmstate->p_name)
1487  prepare_foreign_modify(fmstate);
1488 
1489  /* Get the ctid that was passed up as a resjunk column */
1490  datum = ExecGetJunkAttribute(planSlot,
1491  fmstate->ctidAttno,
1492  &isNull);
1493  /* shouldn't ever get a null result... */
1494  if (isNull)
1495  elog(ERROR, "ctid is NULL");
1496 
1497  /* Convert parameters needed by prepared statement to text form */
1498  p_values = convert_prep_stmt_params(fmstate,
1499  (ItemPointer) DatumGetPointer(datum),
1500  NULL);
1501 
1502  /*
1503  * Execute the prepared statement, and check for success.
1504  *
1505  * We don't use a PG_TRY block here, so be careful not to throw error
1506  * without releasing the PGresult.
1507  */
1508  res = PQexecPrepared(fmstate->conn,
1509  fmstate->p_name,
1510  fmstate->p_nums,
1511  p_values,
1512  NULL,
1513  NULL,
1514  0);
1515  if (PQresultStatus(res) !=
1517  pgfdw_report_error(ERROR, res, true, fmstate->query);
1518 
1519  /* Check number of rows affected, and fetch RETURNING tuple if any */
1520  if (fmstate->has_returning)
1521  {
1522  n_rows = PQntuples(res);
1523  if (n_rows > 0)
1524  store_returning_result(fmstate, slot, res);
1525  }
1526  else
1527  n_rows = atoi(PQcmdTuples(res));
1528 
1529  /* And clean up */
1530  PQclear(res);
1531 
1532  MemoryContextReset(fmstate->temp_cxt);
1533 
1534  /* Return NULL if nothing was deleted on the remote end */
1535  return (n_rows > 0) ? slot : NULL;
1536 }
1537 
1538 /*
1539  * postgresEndForeignModify
1540  * Finish an insert/update/delete operation on a foreign table
1541  */
1542 static void
1544  ResultRelInfo *resultRelInfo)
1545 {
1546  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1547 
1548  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1549  if (fmstate == NULL)
1550  return;
1551 
1552  /* If we created a prepared statement, destroy it */
1553  if (fmstate->p_name)
1554  {
1555  char sql[64];
1556  PGresult *res;
1557 
1558  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
1559 
1560  /*
1561  * We don't use a PG_TRY block here, so be careful not to throw error
1562  * without releasing the PGresult.
1563  */
1564  res = PQexec(fmstate->conn, sql);
1565  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1566  pgfdw_report_error(ERROR, res, true, sql);
1567  PQclear(res);
1568  fmstate->p_name = NULL;
1569  }
1570 
1571  /* Release remote connection */
1572  ReleaseConnection(fmstate->conn);
1573  fmstate->conn = NULL;
1574 }
1575 
1576 /*
1577  * postgresIsForeignRelUpdatable
1578  * Determine whether a foreign table supports INSERT, UPDATE and/or
1579  * DELETE.
1580  */
1581 static int
1583 {
1584  bool updatable;
1585  ForeignTable *table;
1586  ForeignServer *server;
1587  ListCell *lc;
1588 
1589  /*
1590  * By default, all postgres_fdw foreign tables are assumed updatable. This
1591  * can be overridden by a per-server setting, which in turn can be
1592  * overridden by a per-table setting.
1593  */
1594  updatable = true;
1595 
1596  table = GetForeignTable(RelationGetRelid(rel));
1597  server = GetForeignServer(table->serverid);
1598 
1599  foreach(lc, server->options)
1600  {
1601  DefElem *def = (DefElem *) lfirst(lc);
1602 
1603  if (strcmp(def->defname, "updatable") == 0)
1604  updatable = defGetBoolean(def);
1605  }
1606  foreach(lc, table->options)
1607  {
1608  DefElem *def = (DefElem *) lfirst(lc);
1609 
1610  if (strcmp(def->defname, "updatable") == 0)
1611  updatable = defGetBoolean(def);
1612  }
1613 
1614  /*
1615  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
1616  */
1617  return updatable ?
1618  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
1619 }
1620 
1621 /*
1622  * postgresExplainForeignScan
1623  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
1624  */
1625 static void
1627 {
1628  List *fdw_private;
1629  char *sql;
1630 
1631  if (es->verbose)
1632  {
1633  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
1634  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
1635  ExplainPropertyText("Remote SQL", sql, es);
1636  }
1637 }
1638 
1639 /*
1640  * postgresExplainForeignModify
1641  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
1642  */
1643 static void
1645  ResultRelInfo *rinfo,
1646  List *fdw_private,
1647  int subplan_index,
1648  ExplainState *es)
1649 {
1650  if (es->verbose)
1651  {
1652  char *sql = strVal(list_nth(fdw_private,
1654 
1655  ExplainPropertyText("Remote SQL", sql, es);
1656  }
1657 }
1658 
1659 
1660 /*
1661  * estimate_path_cost_size
1662  * Get cost and size estimates for a foreign scan
1663  *
1664  * We assume that all the baserestrictinfo clauses will be applied, plus
1665  * any join clauses listed in join_conds.
1666  */
1667 static void
1669  RelOptInfo *baserel,
1670  List *join_conds,
1671  double *p_rows, int *p_width,
1672  Cost *p_startup_cost, Cost *p_total_cost)
1673 {
1674  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1675  double rows;
1676  double retrieved_rows;
1677  int width;
1678  Cost startup_cost;
1679  Cost total_cost;
1680  Cost run_cost;
1681  Cost cpu_per_tuple;
1682 
1683  /*
1684  * If the table or the server is configured to use remote estimates,
1685  * connect to the foreign server and execute EXPLAIN to estimate the
1686  * number of rows selected by the restriction+join clauses. Otherwise,
1687  * estimate rows using whatever statistics we have locally, in a way
1688  * similar to ordinary tables.
1689  */
1690  if (fpinfo->use_remote_estimate)
1691  {
1692  StringInfoData sql;
1693  List *retrieved_attrs;
1694  PGconn *conn;
1695 
1696  /*
1697  * Construct EXPLAIN query including the desired SELECT, FROM, and
1698  * WHERE clauses. Params and other-relation Vars are replaced by
1699  * dummy values.
1700  */
1701  initStringInfo(&sql);
1702  appendStringInfoString(&sql, "EXPLAIN ");
1703  deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
1704  &retrieved_attrs);
1705  if (fpinfo->remote_conds)
1706  appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
1707  true, NULL);
1708  if (join_conds)
1709  appendWhereClause(&sql, root, baserel, join_conds,
1710  (fpinfo->remote_conds == NIL), NULL);
1711 
1712  /* Get the remote estimate */
1713  conn = GetConnection(fpinfo->server, fpinfo->user, false);
1714  get_remote_estimate(sql.data, conn, &rows, &width,
1715  &startup_cost, &total_cost);
1716  ReleaseConnection(conn);
1717 
1718  retrieved_rows = rows;
1719 
1720  /* Factor in the selectivity of the local_conds */
1721  rows = clamp_row_est(rows * fpinfo->local_conds_sel);
1722 
1723  /* Add in the eval cost of the local_conds */
1724  startup_cost += fpinfo->local_conds_cost.startup;
1725  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
1726  }
1727  else
1728  {
1729  /*
1730  * We don't support join conditions in this mode (hence, no
1731  * parameterized paths can be made).
1732  */
1733  Assert(join_conds == NIL);
1734 
1735  /* Use rows/width estimates made by set_baserel_size_estimates. */
1736  rows = baserel->rows;
1737  width = baserel->width;
1738 
1739  /*
1740  * Back into an estimate of the number of retrieved rows. Just in
1741  * case this is nuts, clamp to at most baserel->tuples.
1742  */
1743  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
1744  retrieved_rows = Min(retrieved_rows, baserel->tuples);
1745 
1746  /*
1747  * Cost as though this were a seqscan, which is pessimistic. We
1748  * effectively imagine the local_conds are being evaluated remotely,
1749  * too.
1750  */
1751  startup_cost = 0;
1752  run_cost = 0;
1753  run_cost += seq_page_cost * baserel->pages;
1754 
1755  startup_cost += baserel->baserestrictcost.startup;
1756  cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
1757  run_cost += cpu_per_tuple * baserel->tuples;
1758 
1759  total_cost = startup_cost + run_cost;
1760  }
1761 
1762  /*
1763  * Add some additional cost factors to account for connection overhead
1764  * (fdw_startup_cost), transferring data across the network
1765  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
1766  * (cpu_tuple_cost per retrieved row).
1767  */
1768  startup_cost += fpinfo->fdw_startup_cost;
1769  total_cost += fpinfo->fdw_startup_cost;
1770  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
1771  total_cost += cpu_tuple_cost * retrieved_rows;
1772 
1773  /* Return results. */
1774  *p_rows = rows;
1775  *p_width = width;
1776  *p_startup_cost = startup_cost;
1777  *p_total_cost = total_cost;
1778 }
1779 
1780 /*
1781  * Estimate costs of executing a SQL statement remotely.
1782  * The given "sql" must be an EXPLAIN command.
1783  */
1784 static void
1785 get_remote_estimate(const char *sql, PGconn *conn,
1786  double *rows, int *width,
1787  Cost *startup_cost, Cost *total_cost)
1788 {
1789  PGresult *volatile res = NULL;
1790 
1791  /* PGresult must be released before leaving this function. */
1792  PG_TRY();
1793  {
1794  char *line;
1795  char *p;
1796  int n;
1797 
1798  /*
1799  * Execute EXPLAIN remotely.
1800  */
1801  res = PQexec(conn, sql);
1802  if (PQresultStatus(res) != PGRES_TUPLES_OK)
1803  pgfdw_report_error(ERROR, res, false, sql);
1804 
1805  /*
1806  * Extract cost numbers for topmost plan node. Note we search for a
1807  * left paren from the end of the line to avoid being confused by
1808  * other uses of parentheses.
1809  */
1810  line = PQgetvalue(res, 0, 0);
1811  p = strrchr(line, '(');
1812  if (p == NULL)
1813  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
1814  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
1815  startup_cost, total_cost, rows, width);
1816  if (n != 4)
1817  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
1818 
1819  PQclear(res);
1820  res = NULL;
1821  }
1822  PG_CATCH();
1823  {
1824  if (res)
1825  PQclear(res);
1826  PG_RE_THROW();
1827  }
1828  PG_END_TRY();
1829 }
1830 
1831 /*
1832  * Detect whether we want to process an EquivalenceClass member.
1833  *
1834  * This is a callback for use by generate_implied_equalities_for_column.
1835  */
1836 static bool
1839  void *arg)
1840 {
1842  Expr *expr = em->em_expr;
1843 
1844  /*
1845  * If we've identified what we're processing in the current scan, we only
1846  * want to match that expression.
1847  */
1848  if (state->current != NULL)
1849  return equal(expr, state->current);
1850 
1851  /*
1852  * Otherwise, ignore anything we've already processed.
1853  */
1854  if (list_member(state->already_used, expr))
1855  return false;
1856 
1857  /* This is the new target to process. */
1858  state->current = expr;
1859  return true;
1860 }
1861 
1862 /*
1863  * Create cursor for node's query with current parameter values.
1864  */
1865 static void
1867 {
1868  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1869  ExprContext *econtext = node->ss.ps.ps_ExprContext;
1870  int numParams = fsstate->numParams;
1871  const char **values = fsstate->param_values;
1872  PGconn *conn = fsstate->conn;
1874  PGresult *res;
1875 
1876  /*
1877  * Construct array of query parameter values in text format. We do the
1878  * conversions in the short-lived per-tuple context, so as not to cause a
1879  * memory leak over repeated scans.
1880  */
1881  if (numParams > 0)
1882  {
1883  int nestlevel;
1884  MemoryContext oldcontext;
1885  int i;
1886  ListCell *lc;
1887 
1888  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1889 
1890  nestlevel = set_transmission_modes();
1891 
1892  i = 0;
1893  foreach(lc, fsstate->param_exprs)
1894  {
1895  ExprState *expr_state = (ExprState *) lfirst(lc);
1896  Datum expr_value;
1897  bool isNull;
1898 
1899  /* Evaluate the parameter expression */
1900  expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
1901 
1902  /*
1903  * Get string representation of each parameter value by invoking
1904  * type-specific output function, unless the value is null.
1905  */
1906  if (isNull)
1907  values[i] = NULL;
1908  else
1909  values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
1910  expr_value);
1911  i++;
1912  }
1913 
1914  reset_transmission_modes(nestlevel);
1915 
1916  MemoryContextSwitchTo(oldcontext);
1917  }
1918 
1919  /* Construct the DECLARE CURSOR command */
1920  initStringInfo(&buf);
1921  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
1922  fsstate->cursor_number, fsstate->query);
1923 
1924  /*
1925  * Notice that we pass NULL for paramTypes, thus forcing the remote server
1926  * to infer types for all parameters. Since we explicitly cast every
1927  * parameter (see deparse.c), the "inference" is trivial and will produce
1928  * the desired result. This allows us to avoid assuming that the remote
1929  * server has the same OIDs we do for the parameters' types.
1930  *
1931  * We don't use a PG_TRY block here, so be careful not to throw error
1932  * without releasing the PGresult.
1933  */
1934  res = PQexecParams(conn, buf.data, numParams, NULL, values,
1935  NULL, NULL, 0);
1936  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1937  pgfdw_report_error(ERROR, res, true, fsstate->query);
1938  PQclear(res);
1939 
1940  /* Mark the cursor as created, and show no tuples have been retrieved */
1941  fsstate->cursor_exists = true;
1942  fsstate->tuples = NULL;
1943  fsstate->num_tuples = 0;
1944  fsstate->next_tuple = 0;
1945  fsstate->fetch_ct_2 = 0;
1946  fsstate->eof_reached = false;
1947 
1948  /* Clean up */
1949  pfree(buf.data);
1950 }
1951 
1952 /*
1953  * Fetch some more rows from the node's cursor.
1954  */
1955 static void
1957 {
1958  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1959  PGresult *volatile res = NULL;
1960  MemoryContext oldcontext;
1961 
1962  /*
1963  * We'll store the tuples in the batch_cxt. First, flush the previous
1964  * batch.
1965  */
1966  fsstate->tuples = NULL;
1967  MemoryContextReset(fsstate->batch_cxt);
1968  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
1969 
1970  /* PGresult must be released before leaving this function. */
1971  PG_TRY();
1972  {
1973  PGconn *conn = fsstate->conn;
1974  char sql[64];
1975  int fetch_size;
1976  int numrows;
1977  int i;
1978 
1979  /* The fetch size is arbitrary, but shouldn't be enormous. */
1980  fetch_size = 100;
1981 
1982  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
1983  fetch_size, fsstate->cursor_number);
1984 
1985  res = PQexec(conn, sql);
1986  /* On error, report the original query, not the FETCH. */
1987  if (PQresultStatus(res) != PGRES_TUPLES_OK)
1988  pgfdw_report_error(ERROR, res, false, fsstate->query);
1989 
1990  /* Convert the data into HeapTuples */
1991  numrows = PQntuples(res);
1992  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
1993  fsstate->num_tuples = numrows;
1994  fsstate->next_tuple = 0;
1995 
1996  for (i = 0; i < numrows; i++)
1997  {
1998  fsstate->tuples[i] =
2000  fsstate->rel,
2001  fsstate->attinmeta,
2002  fsstate->retrieved_attrs,
2003  fsstate->temp_cxt);
2004  }
2005 
2006  /* Update fetch_ct_2 */
2007  if (fsstate->fetch_ct_2 < 2)
2008  fsstate->fetch_ct_2++;
2009 
2010  /* Must be EOF if we didn't get as many tuples as we asked for. */
2011  fsstate->eof_reached = (numrows < fetch_size);
2012 
2013  PQclear(res);
2014  res = NULL;
2015  }
2016  PG_CATCH();
2017  {
2018  if (res)
2019  PQclear(res);
2020  PG_RE_THROW();
2021  }
2022  PG_END_TRY();
2023 
2024  MemoryContextSwitchTo(oldcontext);
2025 }
2026 
2027 /*
2028  * Force assorted GUC parameters to settings that ensure that we'll output
2029  * data values in a form that is unambiguous to the remote server.
2030  *
2031  * This is rather expensive and annoying to do once per row, but there's
2032  * little choice if we want to be sure values are transmitted accurately;
2033  * we can't leave the settings in place between rows for fear of affecting
2034  * user-visible computations.
2035  *
2036  * We use the equivalent of a function SET option to allow the settings to
2037  * persist only until the caller calls reset_transmission_modes(). If an
2038  * error is thrown in between, guc.c will take care of undoing the settings.
2039  *
2040  * The return value is the nestlevel that must be passed to
2041  * reset_transmission_modes() to undo things.
2042  */
2043 int
2045 {
2046  int nestlevel = NewGUCNestLevel();
2047 
2048  /*
2049  * The values set here should match what pg_dump does. See also
2050  * configure_remote_session in connection.c.
2051  */
2052  if (DateStyle != USE_ISO_DATES)
2053  (void) set_config_option("datestyle", "ISO",
2055  GUC_ACTION_SAVE, true, 0);
2057  (void) set_config_option("intervalstyle", "postgres",
2059  GUC_ACTION_SAVE, true, 0);
2060  if (extra_float_digits < 3)
2061  (void) set_config_option("extra_float_digits", "3",
2063  GUC_ACTION_SAVE, true, 0);
2064 
2065  return nestlevel;
2066 }
2067 
2068 /*
2069  * Undo the effects of set_transmission_modes().
2070  */
2071 void
2073 {
2074  AtEOXact_GUC(true, nestlevel);
2075 }
2076 
2077 /*
2078  * Utility routine to close a cursor.
2079  */
2080 static void
2082 {
2083  char sql[64];
2084  PGresult *res;
2085 
2086  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
2087 
2088  /*
2089  * We don't use a PG_TRY block here, so be careful not to throw error
2090  * without releasing the PGresult.
2091  */
2092  res = PQexec(conn, sql);
2093  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2094  pgfdw_report_error(ERROR, res, true, sql);
2095  PQclear(res);
2096 }
2097 
2098 /*
2099  * prepare_foreign_modify
2100  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
2101  */
2102 static void
2104 {
2105  char prep_name[NAMEDATALEN];
2106  char *p_name;
2107  PGresult *res;
2108 
2109  /* Construct name we'll use for the prepared statement. */
2110  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
2111  GetPrepStmtNumber(fmstate->conn));
2112  p_name = pstrdup(prep_name);
2113 
2114  /*
2115  * We intentionally do not specify parameter types here, but leave the
2116  * remote server to derive them by default. This avoids possible problems
2117  * with the remote server using different type OIDs than we do. All of
2118  * the prepared statements we use in this module are simple enough that
2119  * the remote server will make the right choices.
2120  *
2121  * We don't use a PG_TRY block here, so be careful not to throw error
2122  * without releasing the PGresult.
2123  */
2124  res = PQprepare(fmstate->conn,
2125  p_name,
2126  fmstate->query,
2127  0,
2128  NULL);
2129 
2130  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2131  pgfdw_report_error(ERROR, res, true, fmstate->query);
2132  PQclear(res);
2133 
2134  /* This action shows that the prepare has been done. */
2135  fmstate->p_name = p_name;
2136 }
2137 
2138 /*
2139  * convert_prep_stmt_params
2140  * Create array of text strings representing parameter values
2141  *
2142  * tupleid is ctid to send, or NULL if none
2143  * slot is slot to get remaining parameters from, or NULL if none
2144  *
2145  * Data is constructed in temp_cxt; caller should reset that after use.
2146  */
2147 static const char **
2149  ItemPointer tupleid,
2150  TupleTableSlot *slot)
2151 {
2152  const char **p_values;
2153  int pindex = 0;
2154  MemoryContext oldcontext;
2155 
2156  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
2157 
2158  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
2159 
2160  /* 1st parameter should be ctid, if it's in use */
2161  if (tupleid != NULL)
2162  {
2163  /* don't need set_transmission_modes for TID output */
2164  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
2165  PointerGetDatum(tupleid));
2166  pindex++;
2167  }
2168 
2169  /* get following parameters from slot */
2170  if (slot != NULL && fmstate->target_attrs != NIL)
2171  {
2172  int nestlevel;
2173  ListCell *lc;
2174 
2175  nestlevel = set_transmission_modes();
2176 
2177  foreach(lc, fmstate->target_attrs)
2178  {
2179  int attnum = lfirst_int(lc);
2180  Datum value;
2181  bool isnull;
2182 
2183  value = slot_getattr(slot, attnum, &isnull);
2184  if (isnull)
2185  p_values[pindex] = NULL;
2186  else
2187  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
2188  value);
2189  pindex++;
2190  }
2191 
2192  reset_transmission_modes(nestlevel);
2193  }
2194 
2195  Assert(pindex == fmstate->p_nums);
2196 
2197  MemoryContextSwitchTo(oldcontext);
2198 
2199  return p_values;
2200 }
2201 
2202 /*
2203  * store_returning_result
2204  * Store the result of a RETURNING clause
2205  *
2206  * On error, be sure to release the PGresult on the way out. Callers do not
2207  * have PG_TRY blocks to ensure this happens.
2208  */
2209 static void
2211  TupleTableSlot *slot, PGresult *res)
2212 {
2213  /* PGresult must be released before leaving this function. */
2214  PG_TRY();
2215  {
2216  HeapTuple newtup;
2217 
2218  newtup = make_tuple_from_result_row(res, 0,
2219  fmstate->rel,
2220  fmstate->attinmeta,
2221  fmstate->retrieved_attrs,
2222  fmstate->temp_cxt);
2223  /* tuple will be deleted when it is cleared from the slot */
2224  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
2225  }
2226  PG_CATCH();
2227  {
2228  if (res)
2229  PQclear(res);
2230  PG_RE_THROW();
2231  }
2232  PG_END_TRY();
2233 }
2234 
2235 /*
2236  * postgresAnalyzeForeignTable
2237  * Test whether analyzing this foreign table is supported
2238  */
2239 static bool
2241  AcquireSampleRowsFunc *func,
2242  BlockNumber *totalpages)
2243 {
2244  ForeignTable *table;
2245  ForeignServer *server;
2246  UserMapping *user;
2247  PGconn *conn;
2248  StringInfoData sql;
2249  PGresult *volatile res = NULL;
2250 
2251  /* Return the row-analysis function pointer */
2253 
2254  /*
2255  * Now we have to get the number of pages. It's annoying that the ANALYZE
2256  * API requires us to return that now, because it forces some duplication
2257  * of effort between this routine and postgresAcquireSampleRowsFunc. But
2258  * it's probably not worth redefining that API at this point.
2259  */
2260 
2261  /*
2262  * Get the connection to use. We do the remote access as the table's
2263  * owner, even if the ANALYZE was started by some other user.
2264  */
2265  table = GetForeignTable(RelationGetRelid(relation));
2266  server = GetForeignServer(table->serverid);
2267  user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
2268  conn = GetConnection(server, user, false);
2269 
2270  /*
2271  * Construct command to get page count for relation.
2272  */
2273  initStringInfo(&sql);
2274  deparseAnalyzeSizeSql(&sql, relation);
2275 
2276  /* In what follows, do not risk leaking any PGresults. */
2277  PG_TRY();
2278  {
2279  res = PQexec(conn, sql.data);
2280  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2281  pgfdw_report_error(ERROR, res, false, sql.data);
2282 
2283  if (PQntuples(res) != 1 || PQnfields(res) != 1)
2284  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
2285  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2286 
2287  PQclear(res);
2288  res = NULL;
2289  }
2290  PG_CATCH();
2291  {
2292  if (res)
2293  PQclear(res);
2294  PG_RE_THROW();
2295  }
2296  PG_END_TRY();
2297 
2298  ReleaseConnection(conn);
2299 
2300  return true;
2301 }
2302 
2303 /*
2304  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
2305  *
2306  * We fetch the whole table from the remote side and pick out some sample rows.
2307  *
2308  * Selected rows are returned in the caller-allocated array rows[],
2309  * which must have at least targrows entries.
2310  * The actual number of rows selected is returned as the function result.
2311  * We also count the total number of rows in the table and return it into
2312  * *totalrows. Note that *totaldeadrows is always set to 0.
2313  *
2314  * Note that the returned list of rows is not always in order by physical
2315  * position in the table. Therefore, correlation estimates derived later
2316  * may be meaningless, but it's OK because we don't use the estimates
2317  * currently (the planner only pays attention to correlation for indexscans).
2318  */
2319 static int
2321  HeapTuple *rows, int targrows,
2322  double *totalrows,
2323  double *totaldeadrows)
2324 {
2325  PgFdwAnalyzeState astate;
2326  ForeignTable *table;
2327  ForeignServer *server;
2328  UserMapping *user;
2329  PGconn *conn;
2330  unsigned int cursor_number;
2331  StringInfoData sql;
2332  PGresult *volatile res = NULL;
2333 
2334  /* Initialize workspace state */
2335  astate.rel = relation;
2337 
2338  astate.rows = rows;
2339  astate.targrows = targrows;
2340  astate.numrows = 0;
2341  astate.samplerows = 0;
2342  astate.rowstoskip = -1; /* -1 means not set yet */
2343  astate.rstate = anl_init_selection_state(targrows);
2344 
2345  /* Remember ANALYZE context, and create a per-tuple temp context */
2346  astate.anl_cxt = CurrentMemoryContext;
2348  "postgres_fdw temporary data",
2352 
2353  /*
2354  * Get the connection to use. We do the remote access as the table's
2355  * owner, even if the ANALYZE was started by some other user.
2356  */
2357  table = GetForeignTable(RelationGetRelid(relation));
2358  server = GetForeignServer(table->serverid);
2359  user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
2360  conn = GetConnection(server, user, false);
2361 
2362  /*
2363  * Construct cursor that retrieves whole rows from remote.
2364  */
2365  cursor_number = GetCursorNumber(conn);
2366  initStringInfo(&sql);
2367  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
2368  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
2369 
2370  /* In what follows, do not risk leaking any PGresults. */
2371  PG_TRY();
2372  {
2373  res = PQexec(conn, sql.data);
2374  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2375  pgfdw_report_error(ERROR, res, false, sql.data);
2376  PQclear(res);
2377  res = NULL;
2378 
2379  /* Retrieve and process rows a batch at a time. */
2380  for (;;)
2381  {
2382  char fetch_sql[64];
2383  int fetch_size;
2384  int numrows;
2385  int i;
2386 
2387  /* Allow users to cancel long query */
2389 
2390  /*
2391  * XXX possible future improvement: if rowstoskip is large, we
2392  * could issue a MOVE rather than physically fetching the rows,
2393  * then just adjust rowstoskip and samplerows appropriately.
2394  */
2395 
2396  /* The fetch size is arbitrary, but shouldn't be enormous. */
2397  fetch_size = 100;
2398 
2399  /* Fetch some rows */
2400  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
2401  fetch_size, cursor_number);
2402 
2403  res = PQexec(conn, fetch_sql);
2404  /* On error, report the original query, not the FETCH. */
2405  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2406  pgfdw_report_error(ERROR, res, false, sql.data);
2407 
2408  /* Process whatever we got. */
2409  numrows = PQntuples(res);
2410  for (i = 0; i < numrows; i++)
2411  analyze_row_processor(res, i, &astate);
2412 
2413  PQclear(res);
2414  res = NULL;
2415 
2416  /* Must be EOF if we didn't get all the rows requested. */
2417  if (numrows < fetch_size)
2418  break;
2419  }
2420 
2421  /* Close the cursor, just to be tidy. */
2422  close_cursor(conn, cursor_number);
2423  }
2424  PG_CATCH();
2425  {
2426  if (res)
2427  PQclear(res);
2428  PG_RE_THROW();
2429  }
2430  PG_END_TRY();
2431 
2432  ReleaseConnection(conn);
2433 
2434  /* We assume that we have no dead tuple. */
2435  *totaldeadrows = 0.0;
2436 
2437  /* We've retrieved all living tuples from foreign server. */
2438  *totalrows = astate.samplerows;
2439 
2440  /*
2441  * Emit some interesting relation info
2442  */
2443  ereport(elevel,
2444  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
2445  RelationGetRelationName(relation),
2446  astate.samplerows, astate.numrows)));
2447 
2448  return astate.numrows;
2449 }
2450 
2451 /*
2452  * Collect sample rows from the result of query.
2453  * - Use all tuples in sample until target # of samples are collected.
2454  * - Subsequently, replace already-sampled tuples randomly.
2455  */
2456 static void
2458 {
2459  int targrows = astate->targrows;
2460  int pos; /* array index to store tuple in */
2461  MemoryContext oldcontext;
2462 
2463  /* Always increment sample row counter. */
2464  astate->samplerows += 1;
2465 
2466  /*
2467  * Determine the slot where this sample row should be stored. Set pos to
2468  * negative value to indicate the row should be skipped.
2469  */
2470  if (astate->numrows < targrows)
2471  {
2472  /* First targrows rows are always included into the sample */
2473  pos = astate->numrows++;
2474  }
2475  else
2476  {
2477  /*
2478  * Now we start replacing tuples in the sample until we reach the end
2479  * of the relation. Same algorithm as in acquire_sample_rows in
2480  * analyze.c; see Jeff Vitter's paper.
2481  */
2482  if (astate->rowstoskip < 0)
2483  astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows,
2484  &astate->rstate);
2485 
2486  if (astate->rowstoskip <= 0)
2487  {
2488  /* Choose a random reservoir element to replace. */
2489  pos = (int) (targrows * anl_random_fract());
2490  Assert(pos >= 0 && pos < targrows);
2491  heap_freetuple(astate->rows[pos]);
2492  }
2493  else
2494  {
2495  /* Skip this tuple. */
2496  pos = -1;
2497  }
2498 
2499  astate->rowstoskip -= 1;
2500  }
2501 
2502  if (pos >= 0)
2503  {
2504  /*
2505  * Create sample tuple from current result row, and store it in the
2506  * position determined above. The tuple has to be created in anl_cxt.
2507  */
2508  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
2509 
2510  astate->rows[pos] = make_tuple_from_result_row(res, row,
2511  astate->rel,
2512  astate->attinmeta,
2513  astate->retrieved_attrs,
2514  astate->temp_cxt);
2515 
2516  MemoryContextSwitchTo(oldcontext);
2517  }
2518 }
2519 
2520 /*
2521  * Create a tuple from the specified row of the PGresult.
2522  *
2523  * rel is the local representation of the foreign table, attinmeta is
2524  * conversion data for the rel's tupdesc, and retrieved_attrs is an
2525  * integer list of the table column numbers present in the PGresult.
2526  * temp_context is a working context that can be reset after each tuple.
2527  */
2528 static HeapTuple
2530  int row,
2531  Relation rel,
2532  AttInMetadata *attinmeta,
2533  List *retrieved_attrs,
2534  MemoryContext temp_context)
2535 {
2536  HeapTuple tuple;
2537  TupleDesc tupdesc = RelationGetDescr(rel);
2538  Datum *values;
2539  bool *nulls;
2540  ItemPointer ctid = NULL;
2541  ConversionLocation errpos;
2542  ErrorContextCallback errcallback;
2543  MemoryContext oldcontext;
2544  ListCell *lc;
2545  int j;
2546 
2547  Assert(row < PQntuples(res));
2548 
2549  /*
2550  * Do the following work in a temp context that we reset after each tuple.
2551  * This cleans up not only the data we have direct access to, but any
2552  * cruft the I/O functions might leak.
2553  */
2554  oldcontext = MemoryContextSwitchTo(temp_context);
2555 
2556  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
2557  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
2558  /* Initialize to nulls for any columns not present in result */
2559  memset(nulls, true, tupdesc->natts * sizeof(bool));
2560 
2561  /*
2562  * Set up and install callback to report where conversion error occurs.
2563  */
2564  errpos.rel = rel;
2565  errpos.cur_attno = 0;
2566  errcallback.callback = conversion_error_callback;
2567  errcallback.arg = (void *) &errpos;
2568  errcallback.previous = error_context_stack;
2569  error_context_stack = &errcallback;
2570 
2571  /*
2572  * i indexes columns in the relation, j indexes columns in the PGresult.
2573  */
2574  j = 0;
2575  foreach(lc, retrieved_attrs)
2576  {
2577  int i = lfirst_int(lc);
2578  char *valstr;
2579 
2580  /* fetch next column's textual value */
2581  if (PQgetisnull(res, row, j))
2582  valstr = NULL;
2583  else
2584  valstr = PQgetvalue(res, row, j);
2585 
2586  /* convert value to internal representation */
2587  if (i > 0)
2588  {
2589  /* ordinary column */
2590  Assert(i <= tupdesc->natts);
2591  nulls[i - 1] = (valstr == NULL);
2592  /* Apply the input function even to nulls, to support domains */
2593  errpos.cur_attno = i;
2594  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
2595  valstr,
2596  attinmeta->attioparams[i - 1],
2597  attinmeta->atttypmods[i - 1]);
2598  errpos.cur_attno = 0;
2599  }
2600  else if (i == SelfItemPointerAttributeNumber)
2601  {
2602  /* ctid --- note we ignore any other system column in result */
2603  if (valstr != NULL)
2604  {
2605  Datum datum;
2606 
2607  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
2608  ctid = (ItemPointer) DatumGetPointer(datum);
2609  }
2610  }
2611 
2612  j++;
2613  }
2614 
2615  /* Uninstall error context callback. */
2616  error_context_stack = errcallback.previous;
2617 
2618  /*
2619  * Check we got the expected number of columns. Note: j == 0 and
2620  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
2621  */
2622  if (j > 0 && j != PQnfields(res))
2623  elog(ERROR, "remote query result does not match the foreign table");
2624 
2625  /*
2626  * Build the result tuple in caller's memory context.
2627  */
2628  MemoryContextSwitchTo(oldcontext);
2629 
2630  tuple = heap_form_tuple(tupdesc, values, nulls);
2631 
2632  if (ctid)
2633  tuple->t_self = *ctid;
2634 
2635  /* Clean up */
2636  MemoryContextReset(temp_context);
2637 
2638  return tuple;
2639 }
2640 
2641 /*
2642  * Callback function which is called when error occurs during column value
2643  * conversion. Print names of column and relation.
2644  */
2645 static void
2647 {
2648  ConversionLocation *errpos = (ConversionLocation *) arg;
2649  TupleDesc tupdesc = RelationGetDescr(errpos->rel);
2650 
2651  if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
2652  errcontext("column \"%s\" of foreign table \"%s\"",
2653  NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
2654  RelationGetRelationName(errpos->rel));
2655 }