103 if (ConnectionHash ==
NULL)
107 MemSet(&ctl, 0,
sizeof(ctl));
112 ConnectionHash =
hash_create(
"postgres_fdw connections", 8,
163 elog(
DEBUG3,
"new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
191 const char **keywords;
202 keywords = (
const char **)
palloc(n *
sizeof(
char *));
203 values = (
const char **)
palloc(n *
sizeof(
char *));
207 keywords + n, values + n);
209 keywords + n, values + n);
212 keywords[n] =
"fallback_application_name";
213 values[n] =
"postgres_fdw";
217 keywords[n] =
"client_encoding";
221 keywords[n] = values[n] =
NULL;
234 msglen = strlen(connmessage);
235 if (msglen > 0 && connmessage[msglen - 1] ==
'\n')
236 connmessage[msglen - 1] =
'\0';
238 (
errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
239 errmsg(
"could not connect to server \"%s\"",
251 (
errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
252 errmsg(
"password is required"),
253 errdetail(
"Non-superuser cannot connect if the server does not request a password."),
254 errhint(
"Target server's authentication method must be changed.")));
291 for (i = 0; keywords[
i] !=
NULL; i++)
293 if (strcmp(keywords[i],
"password") == 0 && values[i][0] !=
'\0')
298 (
errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
299 errmsg(
"password is required"),
300 errdetail(
"Non-superusers must provide a password in the user mapping.")));
339 if (remoteversion >= 80400)
341 if (remoteversion >= 90000)
381 elog(
DEBUG3,
"starting remote transaction on connection %p",
385 sql =
"START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
387 sql =
"START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
538 bool clear,
const char *sql)
557 sqlstate = ERRCODE_CONNECTION_FAILURE;
564 if (message_primary ==
NULL)
572 message_hint ?
errhint(
"%s", message_hint) : 0,
573 message_context ?
errcontext(
"%s", message_context) : 0,
574 sql ?
errcontext(
"Remote SQL command: %s", sql) : 0));
616 elog(
DEBUG3,
"closing remote transaction on connection %p",
661 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
662 errmsg(
"cannot prepare a transaction that modified remote tables")));
668 elog(
ERROR,
"missed cleaning up connection during pre-commit");
689 if (!
PQcancel(cancel, errbuf,
sizeof(errbuf)))
691 (
errcode(ERRCODE_CONNECTION_FAILURE),
692 errmsg(
"could not send cancel request: %s",
699 res =
PQexec(entry->
conn,
"ABORT TRANSACTION");
703 "ABORT TRANSACTION");
786 elog(
ERROR,
"missed cleaning up remote subtransaction at level %d",
792 snprintf(sql,
sizeof(sql),
"RELEASE SAVEPOINT s%d", curlevel);
814 if (!
PQcancel(cancel, errbuf,
sizeof(errbuf)))
816 (
errcode(ERRCODE_CONNECTION_FAILURE),
817 errmsg(
"could not send cancel request: %s",
825 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
char * PQerrorMessage(const PGconn *conn)
static void configure_remote_session(PGconn *conn)
int errhint(const char *fmt,...)
#define PG_DIAG_MESSAGE_PRIMARY
#define PG_DIAG_MESSAGE_DETAIL
struct ConnCacheEntry ConnCacheEntry
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
void PQfreeCancel(PGcancel *cancel)
char * pstrdup(const char *in)
int errcode(int sqlerrcode)
void PQfinish(PGconn *conn)
#define MemSet(start, val, len)
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void ResetLatch(volatile Latch *latch)
int PQserverVersion(const PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
int errdetail_internal(const char *fmt,...)
void ReleaseConnection(PGconn *conn)
int PQsendQuery(PGconn *conn, const char *query)
void pfree(void *pointer)
static unsigned int prep_stmt_number
int ExtractConnectionOptions(List *defelems, const char **keywords, const char **values)
static void do_sql_command(PGconn *conn, const char *sql)
PGcancel * PQgetCancel(PGconn *conn)
int errdetail(const char *fmt,...)
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
static unsigned int cursor_number
#define ereport(elevel, rest)
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout)
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
static void pgfdw_xact_callback(XactEvent event, void *arg)
static HTAB * ConnectionHash
ForeignServer * GetForeignServer(Oid serverid)
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
void RegisterSubXactCallback(SubXactCallback callback, void *arg)
int PQconsumeInput(PGconn *conn)
#define PG_DIAG_MESSAGE_HINT
void PQclear(PGresult *res)
static void check_conn_params(const char **keywords, const char **values)
int GetCurrentTransactionNestLevel(void)
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
int errmsg_internal(const char *fmt,...)
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
char * PQresultErrorField(const PGresult *res, int fieldcode)
const char * GetDatabaseEncodingName(void)
void RegisterXactCallback(XactCallback callback, void *arg)
int PQisBusy(PGconn *conn)
static bool xact_got_connection
unsigned int GetCursorNumber(PGconn *conn)
static int list_length(const List *l)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
static Datum values[MAXATTR]
int PQconnectionUsedPassword(const PGconn *conn)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
int errmsg(const char *fmt,...)
#define IsolationIsSerializable()
PGresult * PQexec(PGconn *conn, const char *query)
#define CHECK_FOR_INTERRUPTS()
unsigned int GetPrepStmtNumber(PGconn *conn)
static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
ConnStatusType PQstatus(const PGconn *conn)
static void begin_remote_xact(ConnCacheEntry *entry)
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
int PQsocket(const PGconn *conn)
PGresult * PQgetResult(PGconn *conn)
MemoryContext CacheMemoryContext
static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user)