PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
connection.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * connection.c
4  * Connection management functions for postgres_fdw
5  *
6  * Portions Copyright (c) 2012-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/connection.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/xact.h"
18 #include "mb/pg_wchar.h"
19 #include "miscadmin.h"
20 #include "storage/latch.h"
21 #include "utils/hsearch.h"
22 #include "utils/memutils.h"
23 
24 
25 /*
26  * Connection cache hash table entry
27  *
28  * The lookup key in this hash table is the user mapping OID. We use just one
29  * connection per user mapping ID, which ensures that all the scans use the
30  * same snapshot during a query. Using the user mapping OID rather than
31  * the foreign server OID + user OID avoids creating multiple connections when
32  * the public user mapping applies to all user OIDs.
33  *
34  * The "conn" pointer can be NULL if we don't currently have a live connection.
35  * When we do have a connection, xact_depth tracks the current depth of
36  * transactions and subtransactions open on the remote side. We need to issue
37  * commands at the same nesting depth on the remote as we're executing at
38  * ourselves, so that rolling back a subtransaction will kill the right
39  * queries and not the wrong ones.
40  */
41 typedef Oid ConnCacheKey;
42 
43 typedef struct ConnCacheEntry
44 {
45  ConnCacheKey key; /* hash key (must be first) */
46  PGconn *conn; /* connection to foreign server, or NULL */
47  int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
48  * one level of subxact open, etc */
49  bool have_prep_stmt; /* have we prepared any stmts in this xact? */
50  bool have_error; /* have any subxacts aborted in this xact? */
52 
53 /*
54  * Connection cache (initialized on first use)
55  */
57 
58 /* for assigning cursor numbers and prepared statement numbers */
59 static unsigned int cursor_number = 0;
60 static unsigned int prep_stmt_number = 0;
61 
62 /* tracks whether any work is needed in callback functions */
63 static bool xact_got_connection = false;
64 
65 /* prototypes of private functions */
67 static void check_conn_params(const char **keywords, const char **values);
69 static void do_sql_command(PGconn *conn, const char *sql);
70 static void begin_remote_xact(ConnCacheEntry *entry);
71 static void pgfdw_xact_callback(XactEvent event, void *arg);
72 static void pgfdw_subxact_callback(SubXactEvent event,
73  SubTransactionId mySubid,
74  SubTransactionId parentSubid,
75  void *arg);
76 
77 
78 /*
79  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
80  * server with the user's authorization. A new connection is established
81  * if we don't already have a suitable one, and a transaction is opened at
82  * the right subtransaction nesting depth if we didn't do that already.
83  *
84  * will_prep_stmt must be true if caller intends to create any prepared
85  * statements. Since those don't go away automatically at transaction end
86  * (not even on error), we need this flag to cue manual cleanup.
87  *
88  * XXX Note that caching connections theoretically requires a mechanism to
89  * detect change of FDW objects to invalidate already established connections.
90  * We could manage that by watching for invalidation events on the relevant
91  * syscaches. For the moment, though, it's not clear that this would really
92  * be useful and not mere pedantry. We could not flush any active connections
93  * mid-transaction anyway.
94  */
95 PGconn *
96 GetConnection(UserMapping *user, bool will_prep_stmt)
97 {
98  bool found;
99  ConnCacheEntry *entry;
100  ConnCacheKey key;
101 
102  /* First time through, initialize connection cache hashtable */
103  if (ConnectionHash == NULL)
104  {
105  HASHCTL ctl;
106 
107  MemSet(&ctl, 0, sizeof(ctl));
108  ctl.keysize = sizeof(ConnCacheKey);
109  ctl.entrysize = sizeof(ConnCacheEntry);
110  /* allocate ConnectionHash in the cache context */
111  ctl.hcxt = CacheMemoryContext;
112  ConnectionHash = hash_create("postgres_fdw connections", 8,
113  &ctl,
115 
116  /*
117  * Register some callback functions that manage connection cleanup.
118  * This should be done just once in each backend.
119  */
122  }
123 
124  /* Set flag that we did GetConnection during the current transaction */
125  xact_got_connection = true;
126 
127  /* Create hash key for the entry. Assume no pad bytes in key struct */
128  key = user->umid;
129 
130  /*
131  * Find or create cached entry for requested connection.
132  */
133  entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
134  if (!found)
135  {
136  /* initialize new hashtable entry (key is already filled in) */
137  entry->conn = NULL;
138  entry->xact_depth = 0;
139  entry->have_prep_stmt = false;
140  entry->have_error = false;
141  }
142 
143  /*
144  * We don't check the health of cached connection here, because it would
145  * require some overhead. Broken connection will be detected when the
146  * connection is actually used.
147  */
148 
149  /*
150  * If cache entry doesn't have a connection, we have to establish a new
151  * connection. (If connect_pg_server throws an error, the cache entry
152  * will be left in a valid empty state.)
153  */
154  if (entry->conn == NULL)
155  {
156  ForeignServer *server = GetForeignServer(user->serverid);
157 
158  entry->xact_depth = 0; /* just to be sure */
159  entry->have_prep_stmt = false;
160  entry->have_error = false;
161  entry->conn = connect_pg_server(server, user);
162 
163  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
164  entry->conn, server->servername, user->umid, user->userid);
165  }
166 
167  /*
168  * Start a new transaction or subtransaction if needed.
169  */
170  begin_remote_xact(entry);
171 
172  /* Remember if caller will prepare statements */
173  entry->have_prep_stmt |= will_prep_stmt;
174 
175  return entry->conn;
176 }
177 
178 /*
179  * Connect to remote server using specified server and user mapping properties.
180  */
181 static PGconn *
183 {
184  PGconn *volatile conn = NULL;
185 
186  /*
187  * Use PG_TRY block to ensure closing connection on error.
188  */
189  PG_TRY();
190  {
191  const char **keywords;
192  const char **values;
193  int n;
194 
195  /*
196  * Construct connection params from generic options of ForeignServer
197  * and UserMapping. (Some of them might not be libpq options, in
198  * which case we'll just waste a few array slots.) Add 3 extra slots
199  * for fallback_application_name, client_encoding, end marker.
200  */
201  n = list_length(server->options) + list_length(user->options) + 3;
202  keywords = (const char **) palloc(n * sizeof(char *));
203  values = (const char **) palloc(n * sizeof(char *));
204 
205  n = 0;
206  n += ExtractConnectionOptions(server->options,
207  keywords + n, values + n);
209  keywords + n, values + n);
210 
211  /* Use "postgres_fdw" as fallback_application_name. */
212  keywords[n] = "fallback_application_name";
213  values[n] = "postgres_fdw";
214  n++;
215 
216  /* Set client_encoding so that libpq can convert encoding properly. */
217  keywords[n] = "client_encoding";
218  values[n] = GetDatabaseEncodingName();
219  n++;
220 
221  keywords[n] = values[n] = NULL;
222 
223  /* verify connection parameters and make connection */
224  check_conn_params(keywords, values);
225 
226  conn = PQconnectdbParams(keywords, values, false);
227  if (!conn || PQstatus(conn) != CONNECTION_OK)
228  {
229  char *connmessage;
230  int msglen;
231 
232  /* libpq typically appends a newline, strip that */
233  connmessage = pstrdup(PQerrorMessage(conn));
234  msglen = strlen(connmessage);
235  if (msglen > 0 && connmessage[msglen - 1] == '\n')
236  connmessage[msglen - 1] = '\0';
237  ereport(ERROR,
238  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
239  errmsg("could not connect to server \"%s\"",
240  server->servername),
241  errdetail_internal("%s", connmessage)));
242  }
243 
244  /*
245  * Check that non-superuser has used password to establish connection;
246  * otherwise, he's piggybacking on the postgres server's user
247  * identity. See also dblink_security_check() in contrib/dblink.
248  */
249  if (!superuser() && !PQconnectionUsedPassword(conn))
250  ereport(ERROR,
251  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
252  errmsg("password is required"),
253  errdetail("Non-superuser cannot connect if the server does not request a password."),
254  errhint("Target server's authentication method must be changed.")));
255 
256  /* Prepare new session for use */
258 
259  pfree(keywords);
260  pfree(values);
261  }
262  PG_CATCH();
263  {
264  /* Release PGconn data structure if we managed to create one */
265  if (conn)
266  PQfinish(conn);
267  PG_RE_THROW();
268  }
269  PG_END_TRY();
270 
271  return conn;
272 }
273 
274 /*
275  * For non-superusers, insist that the connstr specify a password. This
276  * prevents a password from being picked up from .pgpass, a service file,
277  * the environment, etc. We don't want the postgres user's passwords
278  * to be accessible to non-superusers. (See also dblink_connstr_check in
279  * contrib/dblink.)
280  */
281 static void
282 check_conn_params(const char **keywords, const char **values)
283 {
284  int i;
285 
286  /* no check required if superuser */
287  if (superuser())
288  return;
289 
290  /* ok if params contain a non-empty password */
291  for (i = 0; keywords[i] != NULL; i++)
292  {
293  if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
294  return;
295  }
296 
297  ereport(ERROR,
298  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
299  errmsg("password is required"),
300  errdetail("Non-superusers must provide a password in the user mapping.")));
301 }
302 
303 /*
304  * Issue SET commands to make sure remote session is configured properly.
305  *
306  * We do this just once at connection, assuming nothing will change the
307  * values later. Since we'll never send volatile function calls to the
308  * remote, there shouldn't be any way to break this assumption from our end.
309  * It's possible to think of ways to break it at the remote end, eg making
310  * a foreign table point to a view that includes a set_config call ---
311  * but once you admit the possibility of a malicious view definition,
312  * there are any number of ways to break things.
313  */
314 static void
316 {
317  int remoteversion = PQserverVersion(conn);
318 
319  /* Force the search path to contain only pg_catalog (see deparse.c) */
320  do_sql_command(conn, "SET search_path = pg_catalog");
321 
322  /*
323  * Set remote timezone; this is basically just cosmetic, since all
324  * transmitted and returned timestamptzs should specify a zone explicitly
325  * anyway. However it makes the regression test outputs more predictable.
326  *
327  * We don't risk setting remote zone equal to ours, since the remote
328  * server might use a different timezone database. Instead, use UTC
329  * (quoted, because very old servers are picky about case).
330  */
331  do_sql_command(conn, "SET timezone = 'UTC'");
332 
333  /*
334  * Set values needed to ensure unambiguous data output from remote. (This
335  * logic should match what pg_dump does. See also set_transmission_modes
336  * in postgres_fdw.c.)
337  */
338  do_sql_command(conn, "SET datestyle = ISO");
339  if (remoteversion >= 80400)
340  do_sql_command(conn, "SET intervalstyle = postgres");
341  if (remoteversion >= 90000)
342  do_sql_command(conn, "SET extra_float_digits = 3");
343  else
344  do_sql_command(conn, "SET extra_float_digits = 2");
345 }
346 
347 /*
348  * Convenience subroutine to issue a non-data-returning SQL command to remote
349  */
350 static void
351 do_sql_command(PGconn *conn, const char *sql)
352 {
353  PGresult *res;
354 
355  res = PQexec(conn, sql);
356  if (PQresultStatus(res) != PGRES_COMMAND_OK)
357  pgfdw_report_error(ERROR, res, conn, true, sql);
358  PQclear(res);
359 }
360 
361 /*
362  * Start remote transaction or subtransaction, if needed.
363  *
364  * Note that we always use at least REPEATABLE READ in the remote session.
365  * This is so that, if a query initiates multiple scans of the same or
366  * different foreign tables, we will get snapshot-consistent results from
367  * those scans. A disadvantage is that we can't provide sane emulation of
368  * READ COMMITTED behavior --- it would be nice if we had some other way to
369  * control which remote queries share a snapshot.
370  */
371 static void
373 {
374  int curlevel = GetCurrentTransactionNestLevel();
375 
376  /* Start main transaction if we haven't yet */
377  if (entry->xact_depth <= 0)
378  {
379  const char *sql;
380 
381  elog(DEBUG3, "starting remote transaction on connection %p",
382  entry->conn);
383 
385  sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
386  else
387  sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
388  do_sql_command(entry->conn, sql);
389  entry->xact_depth = 1;
390  }
391 
392  /*
393  * If we're in a subtransaction, stack up savepoints to match our level.
394  * This ensures we can rollback just the desired effects when a
395  * subtransaction aborts.
396  */
397  while (entry->xact_depth < curlevel)
398  {
399  char sql[64];
400 
401  snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
402  do_sql_command(entry->conn, sql);
403  entry->xact_depth++;
404  }
405 }
406 
407 /*
408  * Release connection reference count created by calling GetConnection.
409  */
410 void
412 {
413  /*
414  * Currently, we don't actually track connection references because all
415  * cleanup is managed on a transaction or subtransaction basis instead. So
416  * there's nothing to do here.
417  */
418 }
419 
420 /*
421  * Assign a "unique" number for a cursor.
422  *
423  * These really only need to be unique per connection within a transaction.
424  * For the moment we ignore the per-connection point and assign them across
425  * all connections in the transaction, but we ask for the connection to be
426  * supplied in case we want to refine that.
427  *
428  * Note that even if wraparound happens in a very long transaction, actual
429  * collisions are highly improbable; just be sure to use %u not %d to print.
430  */
431 unsigned int
433 {
434  return ++cursor_number;
435 }
436 
437 /*
438  * Assign a "unique" number for a prepared statement.
439  *
440  * This works much like GetCursorNumber, except that we never reset the counter
441  * within a session. That's because we can't be 100% sure we've gotten rid
442  * of all prepared statements on all connections, and it's not really worth
443  * increasing the risk of prepared-statement name collisions by resetting.
444  */
445 unsigned int
447 {
448  return ++prep_stmt_number;
449 }
450 
451 /*
452  * Submit a query and wait for the result.
453  *
454  * This function is interruptible by signals.
455  *
456  * Caller is responsible for the error handling on the result.
457  */
458 PGresult *
459 pgfdw_exec_query(PGconn *conn, const char *query)
460 {
461  /*
462  * Submit a query. Since we don't use non-blocking mode, this also can
463  * block. But its risk is relatively small, so we ignore that for now.
464  */
465  if (!PQsendQuery(conn, query))
466  pgfdw_report_error(ERROR, NULL, conn, false, query);
467 
468  /* Wait for the result. */
469  return pgfdw_get_result(conn, query);
470 }
471 
472 /*
473  * Wait for the result from a prior asynchronous execution function call.
474  *
475  * This function offers quick responsiveness by checking for any interruptions.
476  *
477  * This function emulates the PQexec()'s behavior of returning the last result
478  * when there are many.
479  *
480  * Caller is responsible for the error handling on the result.
481  */
482 PGresult *
483 pgfdw_get_result(PGconn *conn, const char *query)
484 {
485  PGresult *last_res = NULL;
486 
487  for (;;)
488  {
489  PGresult *res;
490 
491  while (PQisBusy(conn))
492  {
493  int wc;
494 
495  /* Sleep until there's something to do */
498  PQsocket(conn),
499  -1L);
501 
503 
504  /* Data available in socket */
505  if (wc & WL_SOCKET_READABLE)
506  {
507  if (!PQconsumeInput(conn))
508  pgfdw_report_error(ERROR, NULL, conn, false, query);
509  }
510  }
511 
512  res = PQgetResult(conn);
513  if (res == NULL)
514  break; /* query is complete */
515 
516  PQclear(last_res);
517  last_res = res;
518  }
519 
520  return last_res;
521 }
522 
523 /*
524  * Report an error we got from the remote server.
525  *
526  * elevel: error level to use (typically ERROR, but might be less)
527  * res: PGresult containing the error
528  * conn: connection we did the query on
529  * clear: if true, PQclear the result (otherwise caller will handle it)
530  * sql: NULL, or text of remote command we tried to execute
531  *
532  * Note: callers that choose not to throw ERROR for a remote error are
533  * responsible for making sure that the associated ConnCacheEntry gets
534  * marked with have_error = true.
535  */
536 void
538  bool clear, const char *sql)
539 {
540  /* If requested, PGresult must be released before leaving this function. */
541  PG_TRY();
542  {
543  char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
544  char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
545  char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
546  char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
547  char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
548  int sqlstate;
549 
550  if (diag_sqlstate)
551  sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
552  diag_sqlstate[1],
553  diag_sqlstate[2],
554  diag_sqlstate[3],
555  diag_sqlstate[4]);
556  else
557  sqlstate = ERRCODE_CONNECTION_FAILURE;
558 
559  /*
560  * If we don't get a message from the PGresult, try the PGconn. This
561  * is needed because for connection-level failures, PQexec may just
562  * return NULL, not a PGresult at all.
563  */
564  if (message_primary == NULL)
565  message_primary = PQerrorMessage(conn);
566 
567  ereport(elevel,
568  (errcode(sqlstate),
569  message_primary ? errmsg_internal("%s", message_primary) :
570  errmsg("unknown error"),
571  message_detail ? errdetail_internal("%s", message_detail) : 0,
572  message_hint ? errhint("%s", message_hint) : 0,
573  message_context ? errcontext("%s", message_context) : 0,
574  sql ? errcontext("Remote SQL command: %s", sql) : 0));
575  }
576  PG_CATCH();
577  {
578  if (clear)
579  PQclear(res);
580  PG_RE_THROW();
581  }
582  PG_END_TRY();
583  if (clear)
584  PQclear(res);
585 }
586 
587 /*
588  * pgfdw_xact_callback --- cleanup at main-transaction end.
589  */
590 static void
592 {
593  HASH_SEQ_STATUS scan;
594  ConnCacheEntry *entry;
595 
596  /* Quick exit if no connections were touched in this transaction. */
597  if (!xact_got_connection)
598  return;
599 
600  /*
601  * Scan all connection cache entries to find open remote transactions, and
602  * close them.
603  */
604  hash_seq_init(&scan, ConnectionHash);
605  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
606  {
607  PGresult *res;
608 
609  /* Ignore cache entry if no open connection right now */
610  if (entry->conn == NULL)
611  continue;
612 
613  /* If it has an open remote transaction, try to close it */
614  if (entry->xact_depth > 0)
615  {
616  elog(DEBUG3, "closing remote transaction on connection %p",
617  entry->conn);
618 
619  switch (event)
620  {
623  /* Commit all remote transactions during pre-commit */
624  do_sql_command(entry->conn, "COMMIT TRANSACTION");
625 
626  /*
627  * If there were any errors in subtransactions, and we
628  * made prepared statements, do a DEALLOCATE ALL to make
629  * sure we get rid of all prepared statements. This is
630  * annoying and not terribly bulletproof, but it's
631  * probably not worth trying harder.
632  *
633  * DEALLOCATE ALL only exists in 8.3 and later, so this
634  * constrains how old a server postgres_fdw can
635  * communicate with. We intentionally ignore errors in
636  * the DEALLOCATE, so that we can hobble along to some
637  * extent with older servers (leaking prepared statements
638  * as we go; but we don't really support update operations
639  * pre-8.3 anyway).
640  */
641  if (entry->have_prep_stmt && entry->have_error)
642  {
643  res = PQexec(entry->conn, "DEALLOCATE ALL");
644  PQclear(res);
645  }
646  entry->have_prep_stmt = false;
647  entry->have_error = false;
648  break;
650 
651  /*
652  * We disallow remote transactions that modified anything,
653  * since it's not very reasonable to hold them open until
654  * the prepared transaction is committed. For the moment,
655  * throw error unconditionally; later we might allow
656  * read-only cases. Note that the error will cause us to
657  * come right back here with event == XACT_EVENT_ABORT, so
658  * we'll clean up the connection state at that point.
659  */
660  ereport(ERROR,
661  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
662  errmsg("cannot prepare a transaction that modified remote tables")));
663  break;
665  case XACT_EVENT_COMMIT:
666  case XACT_EVENT_PREPARE:
667  /* Pre-commit should have closed the open transaction */
668  elog(ERROR, "missed cleaning up connection during pre-commit");
669  break;
671  case XACT_EVENT_ABORT:
672  /* Assume we might have lost track of prepared statements */
673  entry->have_error = true;
674 
675  /*
676  * If a command has been submitted to the remote server by
677  * using an asynchronous execution function, the command
678  * might not have yet completed. Check to see if a command
679  * is still being processed by the remote server, and if so,
680  * request cancellation of the command.
681  */
682  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
683  {
684  PGcancel *cancel;
685  char errbuf[256];
686 
687  if ((cancel = PQgetCancel(entry->conn)))
688  {
689  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
691  (errcode(ERRCODE_CONNECTION_FAILURE),
692  errmsg("could not send cancel request: %s",
693  errbuf)));
694  PQfreeCancel(cancel);
695  }
696  }
697 
698  /* If we're aborting, abort all remote transactions too */
699  res = PQexec(entry->conn, "ABORT TRANSACTION");
700  /* Note: can't throw ERROR, it would be infinite loop */
701  if (PQresultStatus(res) != PGRES_COMMAND_OK)
702  pgfdw_report_error(WARNING, res, entry->conn, true,
703  "ABORT TRANSACTION");
704  else
705  {
706  PQclear(res);
707  /* As above, make sure to clear any prepared stmts */
708  if (entry->have_prep_stmt && entry->have_error)
709  {
710  res = PQexec(entry->conn, "DEALLOCATE ALL");
711  PQclear(res);
712  }
713  entry->have_prep_stmt = false;
714  entry->have_error = false;
715  }
716  break;
717  }
718  }
719 
720  /* Reset state to show we're out of a transaction */
721  entry->xact_depth = 0;
722 
723  /*
724  * If the connection isn't in a good idle state, discard it to
725  * recover. Next GetConnection will open a new connection.
726  */
727  if (PQstatus(entry->conn) != CONNECTION_OK ||
729  {
730  elog(DEBUG3, "discarding connection %p", entry->conn);
731  PQfinish(entry->conn);
732  entry->conn = NULL;
733  }
734  }
735 
736  /*
737  * Regardless of the event type, we can now mark ourselves as out of the
738  * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
739  * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
740  */
741  xact_got_connection = false;
742 
743  /* Also reset cursor numbering for next transaction */
744  cursor_number = 0;
745 }
746 
747 /*
748  * pgfdw_subxact_callback --- cleanup at subtransaction end.
749  */
750 static void
752  SubTransactionId parentSubid, void *arg)
753 {
754  HASH_SEQ_STATUS scan;
755  ConnCacheEntry *entry;
756  int curlevel;
757 
758  /* Nothing to do at subxact start, nor after commit. */
759  if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
760  event == SUBXACT_EVENT_ABORT_SUB))
761  return;
762 
763  /* Quick exit if no connections were touched in this transaction. */
764  if (!xact_got_connection)
765  return;
766 
767  /*
768  * Scan all connection cache entries to find open remote subtransactions
769  * of the current level, and close them.
770  */
771  curlevel = GetCurrentTransactionNestLevel();
772  hash_seq_init(&scan, ConnectionHash);
773  while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
774  {
775  PGresult *res;
776  char sql[100];
777 
778  /*
779  * We only care about connections with open remote subtransactions of
780  * the current level.
781  */
782  if (entry->conn == NULL || entry->xact_depth < curlevel)
783  continue;
784 
785  if (entry->xact_depth > curlevel)
786  elog(ERROR, "missed cleaning up remote subtransaction at level %d",
787  entry->xact_depth);
788 
789  if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
790  {
791  /* Commit all remote subtransactions during pre-commit */
792  snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
793  do_sql_command(entry->conn, sql);
794  }
795  else
796  {
797  /* Assume we might have lost track of prepared statements */
798  entry->have_error = true;
799 
800  /*
801  * If a command has been submitted to the remote server by using an
802  * asynchronous execution function, the command might not have yet
803  * completed. Check to see if a command is still being processed by
804  * the remote server, and if so, request cancellation of the
805  * command.
806  */
807  if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
808  {
809  PGcancel *cancel;
810  char errbuf[256];
811 
812  if ((cancel = PQgetCancel(entry->conn)))
813  {
814  if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
816  (errcode(ERRCODE_CONNECTION_FAILURE),
817  errmsg("could not send cancel request: %s",
818  errbuf)));
819  PQfreeCancel(cancel);
820  }
821  }
822 
823  /* Rollback all remote subtransactions during abort */
824  snprintf(sql, sizeof(sql),
825  "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
826  curlevel, curlevel);
827  res = PQexec(entry->conn, sql);
828  if (PQresultStatus(res) != PGRES_COMMAND_OK)
829  pgfdw_report_error(WARNING, res, entry->conn, true, sql);
830  else
831  PQclear(res);
832  }
833 
834  /* OK, we're outta that level of subtransaction */
835  entry->xact_depth--;
836  }
837 }
Oid umid
Definition: foreign.h:58
XactEvent
Definition: xact.h:80
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5444
static void configure_remote_session(PGconn *conn)
Definition: connection.c:315
int errhint(const char *fmt,...)
Definition: elog.c:987
#define PG_DIAG_MESSAGE_PRIMARY
Definition: postgres_ext.h:53
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define DEBUG3
Definition: elog.h:23
#define PG_DIAG_MESSAGE_DETAIL
Definition: postgres_ext.h:54
struct ConnCacheEntry ConnCacheEntry
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:62
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:3196
ConnCacheKey key
Definition: connection.c:45
char * pstrdup(const char *in)
Definition: mcxt.c:1168
Size entrysize
Definition: hsearch.h:73
int errcode(int sqlerrcode)
Definition: elog.c:575
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3052
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:849
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:109
bool have_prep_stmt
Definition: connection.c:49
uint32 SubTransactionId
Definition: c.h:397
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:887
void ResetLatch(volatile Latch *latch)
Definition: latch.c:459
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5434
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:52
Oid userid
Definition: foreign.h:59
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2579
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:461
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
void ReleaseConnection(PGconn *conn)
Definition: connection.c:411
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1114
Definition: dynahash.c:193
void pfree(void *pointer)
Definition: mcxt.c:995
static unsigned int prep_stmt_number
Definition: connection.c:60
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
Definition: option.c:296
#define ERROR
Definition: elog.h:43
static void do_sql_command(PGconn *conn, const char *sql)
Definition: connection.c:351
PGconn * conn
Definition: streamutil.c:45
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:3173
int errdetail(const char *fmt,...)
Definition: elog.c:873
List * options
Definition: foreign.h:61
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:537
static unsigned int cursor_number
Definition: connection.c:59
#define ereport(elevel, rest)
Definition: elog.h:122
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout)
Definition: latch.c:318
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:5399
#define WARNING
Definition: elog.h:40
static int elevel
Definition: vacuumlazy.c:130
#define HASH_BLOBS
Definition: hsearch.h:88
SubXactEvent
Definition: xact.h:94
static void pgfdw_xact_callback(XactEvent event, void *arg)
Definition: connection.c:591
static HTAB * ConnectionHash
Definition: connection.c:56
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:98
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
Definition: xact.c:3356
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1614
Size keysize
Definition: hsearch.h:72
#define PG_DIAG_MESSAGE_HINT
Definition: postgres_ext.h:55
void PQclear(PGresult *res)
Definition: fe-exec.c:650
static void check_conn_params(const char **keywords, const char **values)
Definition: connection.c:282
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:758
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:96
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define PG_CATCH()
Definition: elog.h:292
PGconn * conn
Definition: connection.c:46
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:483
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2641
#define NULL
Definition: c.h:226
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1021
void RegisterXactCallback(XactCallback callback, void *arg)
Definition: xact.c:3301
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1664
static bool xact_got_connection
Definition: connection.c:63
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:432
static int list_length(const List *l)
Definition: pg_list.h:89
Oid serverid
Definition: foreign.h:60
#define PG_RE_THROW()
Definition: elog.h:313
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1355
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1345
static Datum values[MAXATTR]
Definition: bootstrap.c:160
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:5490
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:3328
static char * user
Definition: pg_regress.c:90
void * palloc(Size size)
Definition: mcxt.c:894
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define IsolationIsSerializable()
Definition: xact.h:44
char * servername
Definition: foreign.h:50
int i
#define errcontext
Definition: elog.h:164
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1829
void * arg
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:218
unsigned int GetPrepStmtNumber(PGconn *conn)
Definition: connection.c:446
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
Definition: connection.c:751
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5391
#define PG_TRY()
Definition: elog.h:283
static void begin_remote_xact(ConnCacheEntry *entry)
Definition: connection.c:372
Oid ConnCacheKey
Definition: connection.c:41
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:459
List * options
Definition: foreign.h:53
#define WL_LATCH_SET
Definition: latch.h:108
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:5462
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1685
#define PG_END_TRY()
Definition: elog.h:299
#define PG_DIAG_CONTEXT
Definition: postgres_ext.h:59
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)
Definition: connection.c:182