45 #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001)
46 #define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002)
47 #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003)
48 #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
49 #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
51 #define PARALLEL_TUPLE_QUEUE_SIZE 65536
63 #define GetInstrumentationArray(sei) \
64 (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
65 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
166 if (planstate ==
NULL)
205 if (planstate ==
NULL)
285 (
Size) PARALLEL_TUPLE_QUEUE_SIZE);
329 int instrumentation_len = 0;
330 int instrument_offset = 0;
351 pstmt_len = strlen(pstmt_data) + 1;
387 instrumentation_len =
390 instrumentation_len =
MAXALIGN(instrumentation_len);
391 instrument_offset = instrumentation_len;
410 memcpy(pstmt_space, pstmt_data, pstmt_len);
443 for (i = 0; i < nworkers * e.
nnodes; ++
i)
465 elog(
ERROR,
"inconsistent count of PlanState nodes");
490 elog(
ERROR,
"plan node %d not found", plan_node_id);
575 int instrument_options)
601 receiver, paramLI, instrument_options);
628 elog(
ERROR,
"plan node %d not found", plan_node_id);
637 Assert(ParallelWorkerNumber < instrumentation->num_workers);
652 if (planstate ==
NULL)
701 int instrument_options = 0;
706 if (instrumentation !=
NULL)
724 if (instrumentation !=
NULL)
void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
void * stringToNode(char *str)
void(* rDestroy)(DestReceiver *self)
WorkerInstrumentation * worker_instrument
struct ExecParallelEstimateContext ExecParallelEstimateContext
int plan_node_id[FLEXIBLE_ARRAY_MEMBER]
Instrumentation * instrument
void ExecParallelFinish(ParallelExecutorInfo *pei)
#define PARALLEL_KEY_TUPLE_QUEUE
void FreeQueryDesc(QueryDesc *qdesc)
static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
void InstrAggNode(Instrumentation *dst, Instrumentation *add)
static bool ExecParallelReportInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
void ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
shm_toc_estimator estimator
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Snapshot GetActiveSnapshot(void)
PlannedStmt * es_plannedstmt
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
void InstrEndParallelQuery(BufferUsage *result)
static char * ExecSerializePlan(Plan *plan, EState *estate)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, int instrument_options)
static shm_mq_handle ** ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
#define shm_toc_estimate_chunk(e, sz)
Size EstimateParamListSpace(ParamListInfo paramLI)
void * copyObject(const void *from)
void InstrEndLoop(Instrumentation *instr)
SharedExecutorInstrumentation * instrumentation
void ExecutorEnd(QueryDesc *queryDesc)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
#define PARALLEL_KEY_INSTRUMENTATION
void DestroyParallelContext(ParallelContext *pcxt)
void pfree(void *pointer)
void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt)
static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d)
BufferUsage * buffer_usage
shm_mq * shm_mq_create(void *address, Size size)
#define PARALLEL_KEY_PLANNEDSTMT
ParamListInfo RestoreParamList(char **start_address)
void * shm_toc_lookup(shm_toc *toc, uint64 key)
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
struct ExecParallelInitializeDSMContext ExecParallelInitializeDSMContext
void ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
static bool ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
#define PARALLEL_TUPLE_QUEUE_SIZE
void InstrAccumParallelQuery(BufferUsage *result)
static bool ExecParallelEstimate(PlanState *node, ExecParallelEstimateContext *e)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
#define PARALLEL_KEY_BUFFER_USAGE
#define IsParallelWorker()
void ExecutorFinish(QueryDesc *queryDesc)
#define PARALLEL_KEY_PARAMS
void InitializeParallelDSM(ParallelContext *pcxt)
void InstrStartParallelQuery(void)
void InstrInit(Instrumentation *instr, int instrument_options)
Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER]
void * palloc0(Size size)
void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void ExecParallelReinitialize(ParallelExecutorInfo *pei)
void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
void ExecParallelCleanup(ParallelExecutorInfo *pei)
void SerializeParamList(ParamListInfo paramLI, char **start_address)
#define Assert(condition)
Bitmapset * rewindPlanIDs
void ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt)
#define shm_toc_estimate_keys(e, cnt)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
struct Instrumentation Instrumentation
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
char * nodeToString(const void *obj)
void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
ParamListInfo es_param_list_info
static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
ParallelContext * CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
SharedExecutorInstrumentation * instrumentation
bool planstate_tree_walker(PlanState *planstate, bool(*walker)(), void *context)
static QueryDesc * ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options)
#define offsetof(type, field)
#define GetInstrumentationArray(sei)