151 #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
175 #define QUEUEALIGN(len) INTALIGN(len)
177 #define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
188 #define QUEUE_POS_PAGE(x) ((x).page)
189 #define QUEUE_POS_OFFSET(x) ((x).offset)
191 #define SET_QUEUE_POS(x,y,z) \
197 #define QUEUE_POS_EQUAL(x,y) \
198 ((x).page == (y).page && (x).offset == (y).offset)
201 #define QUEUE_POS_MIN(x,y) \
202 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
203 (x).page != (y).page ? (y) : \
204 (x).offset < (y).offset ? (x) : (y))
207 #define QUEUE_POS_MAX(x,y) \
208 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
209 (x).page != (y).page ? (x) : \
210 (x).offset > (y).offset ? (x) : (y))
255 #define QUEUE_HEAD (asyncQueueControl->head)
256 #define QUEUE_TAIL (asyncQueueControl->tail)
257 #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
258 #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
259 #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
266 #define AsyncCtl (&AsyncCtlData)
267 #define QUEUE_PAGESIZE BLCKSZ
268 #define QUEUE_FULL_WARN_INTERVAL 5000
287 #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
316 char channel[FLEXIBLE_ARRAY_MEMBER];
546 elog(
ERROR,
"cannot send notifications from a parallel worker");
552 if (!channel || !strlen(channel))
554 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
555 errmsg(
"channel name cannot be empty")));
559 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
560 errmsg(
"channel name too long")));
566 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
567 errmsg(
"payload string too long")));
591 pendingNotifies =
lappend(pendingNotifies, n);
620 strlen(channel) + 1);
622 strcpy(actrec->
channel, channel);
624 pendingActions =
lappend(pendingActions, actrec);
717 char *channel = (
char *)
lfirst(*lcp);
750 if (pendingActions || pendingNotifies)
752 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
753 errmsg(
"cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
776 if (pendingActions ==
NIL && pendingNotifies ==
NIL)
783 foreach(p, pendingActions)
837 while (nextNotify !=
NULL)
855 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
856 errmsg(
"too many notifications in the NOTIFY queue")));
879 if (!pendingActions && !pendingNotifies)
886 foreach(p, pendingActions)
1040 foreach(q, listenChannels)
1042 char *lchan = (
char *)
lfirst(q);
1044 if (strcmp(lchan, channel) == 0)
1071 listenChannels =
NIL;
1130 if (listenChannels !=
NIL)
1135 else if (!signalled)
1168 foreach(p, listenChannels)
1170 char *lchan = (
char *)
lfirst(p);
1172 if (strcmp(lchan, channel) == 0)
1250 bool pageJump =
false;
1256 offset += entryLength;
1283 size_t channellen = strlen(n->
channel);
1284 size_t payloadlen = strlen(n->
payload);
1293 qe->
length = entryLength;
1298 memcpy(qe->
data + channellen + 1, n->
payload, payloadlen + 1);
1346 AsyncCtl->shared->page_dirty[slotno] =
true;
1348 while (nextNotify !=
NULL)
1361 nextNotify =
lnext(nextNotify);
1377 memcpy(
AsyncCtl->shared->page_buffer[slotno] + offset,
1434 occupied = headPage - tailPage;
1463 if (fillDegree < 0.5)
1486 (
errmsg(
"NOTIFY queue is %.0f%% full", fillDegree * 100),
1488 errdetail(
"The server process with PID %d is among those with the oldest transactions.", minPid)
1491 errhint(
"The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1514 bool signalled =
false;
1553 for (i = 0; i < count; i++)
1564 elog(
DEBUG3,
"could not signal backend with PID %d: %m", pid);
1611 upperPendingActions =
lcons(pendingActions, upperPendingActions);
1616 pendingActions =
NIL;
1618 upperPendingNotifies =
lcons(pendingNotifies, upperPendingNotifies);
1623 pendingNotifies =
NIL;
1636 List *parentPendingActions;
1637 List *parentPendingNotifies;
1639 parentPendingActions = (
List *)
linitial(upperPendingActions);
1648 pendingActions =
list_concat(parentPendingActions, pendingActions);
1650 parentPendingNotifies = (
List *)
linitial(upperPendingNotifies);
1659 pendingNotifies =
list_concat(parentPendingNotifies, pendingNotifies);
1680 while (
list_length(upperPendingActions) > my_level - 2)
1682 pendingActions = (
List *)
linitial(upperPendingActions);
1686 while (
list_length(upperPendingNotifies) > my_level - 2)
1688 pendingNotifies = (
List *)
linitial(upperPendingNotifies);
1835 memcpy(page_buffer.buf + curoffset,
1836 AsyncCtl->shared->page_buffer[slotno] + curoffset,
1858 }
while (!reachedStop);
1908 bool reachedStop =
false;
1909 bool reachedEndOfPage;
1947 *current = thisentry;
1954 char *channel = qe->
data;
1959 char *payload = qe->
data + strlen(channel) + 1;
1974 }
while (!reachedEndOfPage);
2044 if (listenChannels ==
NIL)
2097 elog(
INFO,
"NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2107 if (pendingNotifies ==
NIL)
2110 if (payload ==
NULL)
2132 if (strcmp(n->
channel, channel) == 0 &&
2133 strcmp(n->
payload, payload) == 0)
2136 foreach(p, pendingNotifies)
2140 if (strcmp(n->
channel, channel) == 0 &&
2141 strcmp(n->
payload, payload) == 0)
2159 pendingActions =
NIL;
2160 pendingNotifies =
NIL;
struct QueueBackendStatus QueueBackendStatus
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
int errhint(const char *fmt,...)
static void queue_listen(ListenActionKind action, const char *channel)
static SlruCtlData AsyncCtlData
MemoryContext TopTransactionContext
#define QUEUE_BACKEND_PID(i)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
TimestampTz GetCurrentTimestamp(void)
void AsyncShmemInit(void)
bool TransactionIdIsInProgress(TransactionId xid)
#define SRF_IS_FIRSTCALL()
char * pstrdup(const char *in)
#define DatabaseRelationId
void CommitTransactionCommand(void)
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
#define PG_RETURN_FLOAT8(x)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static void Exec_UnlistenAllCommit(void)
void set_ps_display(const char *activity, bool force)
MemoryContext CurTransactionContext
static List * listenChannels
void AtPrepare_Notify(void)
int errcode(int sqlerrcode)
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
bool IsTransactionOrTransactionBlock(void)
List * list_concat(List *list1, List *list2)
static double asyncQueueUsage(void)
void Async_Listen(const char *channel)
static void ClearPendingActionsAndNotifies(void)
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
bool TransactionIdDidCommit(TransactionId transactionId)
static void Exec_UnlistenCommit(const char *channel)
#define PG_PROTOCOL_MAJOR(v)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Size SimpleLruShmemSize(int nslots, int nlsns)
void list_free_deep(List *list)
static bool IsListeningOn(const char *channel)
#define SRF_PERCALL_SETUP()
static bool unlistenExitRegistered
static bool asyncQueueIsFull(void)
static void asyncQueueFillWarning(void)
#define PG_GETARG_TEXT_PP(n)
void LWLockRelease(LWLock *lock)
#define SRF_RETURN_NEXT(_funcctx, _result)
void PreCommit_Notify(void)
#define NUM_ASYNC_BUFFERS
#define QUEUE_POS_OFFSET(x)
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
void pfree(void *pointer)
#define AsyncQueueEntryEmptySize
static List * pendingActions
void PreventCommandDuringRecovery(const char *cmdname)
void ProcessNotifyInterrupt(void)
static bool AsyncExistsPendingNotify(const char *channel, const char *payload)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
static void Async_UnlistenOnExit(int code, Datum arg)
void AtSubCommit_Notify(void)
static bool backendHasSentNotifications
TransactionId GetCurrentTransactionId(void)
#define SET_QUEUE_POS(x, y, z)
int errdetail(const char *fmt,...)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define InvalidTransactionId
static ListCell * list_head(const List *l)
void SimpleLruWritePage(SlruCtl ctl, int slotno)
MemoryContext CurrentMemoryContext
#define NOTIFY_PAYLOAD_MAX_LENGTH
static bool asyncQueuePagePrecedes(int p, int q)
static AsyncQueueControl * asyncQueueControl
static void asyncQueueAdvanceTail(void)
static void Exec_ListenCommit(const char *channel)
#define ereport(elevel, rest)
#define IsParallelWorker()
MemoryContext TopMemoryContext
#define QUEUE_FULL_WARN_INTERVAL
List * lappend(List *list, void *datum)
struct AsyncQueueControl AsyncQueueControl
struct QueuePosition QueuePosition
List * list_delete_cell(List *list, ListCell *cell, ListCell *prev)
Size mul_size(Size s1, Size s2)
void AtSubAbort_Notify(void)
static List * upperPendingActions
static void ProcessIncomingNotify(void)
static void asyncQueueUnregister(void)
static bool amRegisteredListener
void AtAbort_Notify(void)
Size add_size(Size s1, Size s2)
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define QUEUE_BACKEND_DBOID(i)
static void asyncQueueReadAllNotifications(void)
void AtSubStart_Notify(void)
int GetCurrentTransactionNestLevel(void)
Datum pg_notify(PG_FUNCTION_ARGS)
List * lcons(void *datum, List *list)
void SetLatch(volatile Latch *latch)
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
#define Assert(condition)
void ProcessCompletedNotifies(void)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
static void Exec_ListenPreCommit(void)
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
void StartTransactionCommand(void)
MemoryContext multi_call_memory_ctx
struct AsyncQueueEntry AsyncQueueEntry
Size AsyncShmemSize(void)
void Async_UnlistenAll(void)
static int list_length(const List *l)
struct Notification Notification
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void Async_Notify(const char *channel, const char *payload)
Datum pg_listening_channels(PG_FUNCTION_ARGS)
char * text_to_cstring(const text *t)
#define AccessExclusiveLock
void AtCommit_Notify(void)
void HandleNotifyInterrupt(void)
int errmsg(const char *fmt,...)
char channel[FLEXIBLE_ARRAY_MEMBER]
#define CStringGetTextDatum(s)
static List * upperPendingNotifies
volatile sig_atomic_t notifyInterruptPending
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer)
#define SLRU_PAGES_PER_SEGMENT
TimestampTz lastQueueFillWarn
CommandDest whereToSendOutput
#define QUEUE_BACKEND_POS(i)
ProtocolVersion FrontendProtocol
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
#define QUEUE_POS_PAGE(x)
#define QUEUE_POS_EQUAL(x, y)
#define offsetof(type, field)
#define QUEUE_POS_MIN(x, y)
void Async_Unlisten(const char *channel)
List * list_delete_first(List *list)
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
#define SRF_RETURN_DONE(_funcctx)
static bool SignalBackends(void)
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
#define SRF_FIRSTCALL_INIT()
#define QUEUE_POS_MAX(x, y)
static List * pendingNotifies