45 #define PARALLEL_ERROR_QUEUE_SIZE 16384
48 #define PARALLEL_MAGIC 0x50477c7c
54 #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
55 #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
56 #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
57 #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
58 #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
59 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
60 #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
61 #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
62 #define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
201 Size library_len = 0;
203 Size combocidlen = 0;
245 "parallel error queue size not buffer-aligned");
313 char *error_queue_space;
374 char *extensionstate;
397 char *error_queue_space;
436 bool any_registrations_failed =
false;
473 memcpy(worker.
bgw_extra, &i,
sizeof(
int));
474 if (!any_registrations_failed &&
493 any_registrations_failed =
true;
520 bool anyone_alive =
false;
587 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
588 errmsg(
"postmaster exited during a parallel transaction")));
688 int save_errno = errno;
745 (
errcode(ERRCODE_INTERNAL_ERROR),
746 errmsg(
"lost connection to parallel worker")));
836 elog(
ERROR,
"unknown message type: %c (%d bytes)",
858 if (pcxt->
subid != mySubId)
892 char *error_queue_space;
930 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
931 errmsg(
"could not map dynamic shared memory segment")));
935 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
936 errmsg(
"invalid magic number in dynamic shared memory segment")));
941 MyFixedParallelState = fps;
950 mq = (
shm_mq *) (error_queue_space +
1090 char *extensionstate;
1092 char *function_name;
1097 library_name = extensionstate;
1098 function_name = extensionstate + strlen(library_name) + 1;
char bgw_extra[BGW_EXTRALEN]
parallel_worker_main_type entrypoint
#define DatumGetUInt32(X)
#define PARALLEL_ERROR_QUEUE_SIZE
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Snapshot RestoreSnapshot(char *start_address)
static void HandleParallelMessage(ParallelContext *, int, StringInfo msg)
MemoryContext TopTransactionContext
void SetUserIdAndSecContext(Oid userid, int sec_context)
static void dlist_push_head(dlist_head *head, dlist_node *node)
XLogRecPtr XactLastRecEnd
static void WaitForParallelWorkersToExit(ParallelContext *pcxt)
#define dlist_foreach(iter, lhead)
ResourceOwner CurrentResourceOwner
char * pstrdup(const char *in)
void CommitTransactionCommand(void)
shm_toc_estimator estimator
void EndParallelWorkerTransaction(void)
#define SpinLockInit(lock)
void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
dsm_segment * dsm_attach(dsm_handle h)
static void ParallelErrorContext(void *arg)
Oid authenticated_user_id
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Snapshot GetActiveSnapshot(void)
dsm_handle dsm_segment_handle(dsm_segment *seg)
int errcode(int sqlerrcode)
PGPROC * parallel_master_pgproc
Oid temp_toast_namespace_id
BackgroundWorker * MyBgworkerEntry
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
bool BecomeLockGroupMember(PGPROC *leader, int pid)
void PopActiveSnapshot(void)
Size shm_toc_estimate(shm_toc_estimator *e)
void ResetLatch(volatile Latch *latch)
void SerializeTransactionState(Size maxsize, char *start_address)
#define shm_toc_estimate_chunk(e, sz)
#define BGWORKER_SHMEM_ACCESS
struct ErrorContextCallback * previous
Snapshot GetTransactionSnapshot(void)
void InvalidateSystemCaches(void)
parallel_worker_main_type entrypoint
#define ALLOCSET_DEFAULT_MINSIZE
void RestoreComboCIDState(char *comboCIDstate)
#define RESUME_INTERRUPTS()
ErrorContextCallback * error_context_stack
#define StaticAssertStmt(condition, errmessage)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
#define SpinLockAcquire(lock)
void DestroyParallelContext(ParallelContext *pcxt)
#define dlist_container(type, membername, ptr)
ParallelWorkerInfo * worker
static void ParallelWorkerMain(Datum main_arg)
void pfree(void *pointer)
bool IsInParallelMode(void)
void SerializeLibraryState(Size maxsize, char *start_address)
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
Oid GetAuthenticatedUserId(void)
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT
void SerializeSnapshot(Snapshot snapshot, char *start_address)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
bgworker_main_type bgw_main
shm_mq * shm_mq_create(void *address, Size size)
void ExitParallelMode(void)
#define PARALLEL_KEY_FIXED
ParallelContext * CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers)
void HandleParallelMessages(void)
#define PARALLEL_KEY_ERROR_QUEUE
void * shm_toc_lookup(shm_toc *toc, uint64 key)
void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
#define DSM_CREATE_NULL_IF_MAXSEGMENTS
void PushActiveSnapshot(Snapshot snap)
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
shm_mq_handle * error_mqh
int SetClientEncoding(int encoding)
BackgroundWorkerHandle * bgwhandle
int dynamic_shared_memory_type
Size EstimateGUCStateSpace(void)
#define BGW_NEVER_RESTART
#define shm_toc_initialize_estimator(e)
Size EstimateComboCIDStateSpace(void)
#define UInt32GetDatum(X)
MemoryContext CurrentMemoryContext
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
static void dlist_delete(dlist_node *node)
void LaunchParallelWorkers(ParallelContext *pcxt)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
void BecomeLockGroupLeader(void)
#define ereport(elevel, rest)
MemoryContext TopMemoryContext
void ThrowErrorData(ErrorData *edata)
void initStringInfo(StringInfo str)
#define DLIST_STATIC_INIT(name)
PGFunction load_external_function(char *filename, char *funcname, bool signalNotFound, void **filehandle)
void InitializeParallelDSM(ParallelContext *pcxt)
bool ParallelContextActive(void)
#define SpinLockRelease(lock)
#define dlist_head_element(type, membername, lhead)
Size EstimateSnapshotSpace(Snapshot snap)
Size mul_size(Size s1, Size s2)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
void * palloc0(Size size)
void RestoreLibraryState(char *start_address)
dsm_segment * dsm_create(Size size, int flags)
shm_toc * shm_toc_attach(uint64 magic, void *address)
int GetDatabaseEncoding(void)
pid_t parallel_master_pid
Size EstimateLibraryStateSpace(void)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
volatile bool InterruptPending
void SetLatch(volatile Latch *latch)
BackendId parallel_master_backend_id
void * dsm_segment_address(dsm_segment *seg)
char bgw_name[BGW_MAXLEN]
#define Assert(condition)
BackendId ParallelMasterBackendId
void StartParallelWorkerTransaction(char *tstatespace)
#define BGWORKER_BACKEND_DATABASE_CONNECTION
SubTransactionId GetCurrentSubTransactionId(void)
#define PARALLEL_KEY_EXTENSION_TRAMPOLINE
Size EstimateTransactionStateSpace(void)
void StartTransactionCommand(void)
static bool dlist_is_empty(dlist_head *head)
BgWorkerStartTime bgw_start_time
#define shm_toc_estimate_keys(e, cnt)
bool ParallelMessagePending
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
void EnterParallelMode(void)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout)
ErrorContextCallback * error_context_stack
void pq_set_parallel_master(pid_t pid, BackendId backend_id)
#define PARALLEL_KEY_TRANSACTION_STATE
void dsm_detach(dsm_segment *seg)
void(* callback)(void *arg)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
int errmsg(const char *fmt,...)
#define IsolationIsSerializable()
static FixedParallelState * MyFixedParallelState
void * MemoryContextAlloc(MemoryContext context, Size size)
#define HOLD_INTERRUPTS()
#define ALLOCSET_DEFAULT_INITSIZE
bool InitializingParallelWorker
void HandleParallelMessageInterrupt(void)
#define ALLOCSET_DEFAULT_MAXSIZE
#define CHECK_FOR_INTERRUPTS()
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
ParallelContext * CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
static dlist_head pcxt_list
#define pq_putmessage(msgtype, s, len)
static void static void status(const char *fmt,...) pg_attribute_printf(1
struct FixedParallelState FixedParallelState
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
void AtEOXact_Parallel(bool isCommit)
#define PARALLEL_KEY_COMBO_CID
void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
void SerializeGUCState(Size maxsize, char *start_address)
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT
void SerializeComboCIDState(Size maxsize, char *start_address)
#define PARALLEL_KEY_LIBRARY
void RestoreGUCState(void *gucstate)
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)
void BackgroundWorkerUnblockSignals(void)
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)