150 #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
174 #define QUEUEALIGN(len) INTALIGN(len)
176 #define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
187 #define QUEUE_POS_PAGE(x) ((x).page)
188 #define QUEUE_POS_OFFSET(x) ((x).offset)
190 #define SET_QUEUE_POS(x,y,z) \
196 #define QUEUE_POS_EQUAL(x,y) \
197 ((x).page == (y).page && (x).offset == (y).offset)
200 #define QUEUE_POS_MIN(x,y) \
201 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
202 (x).page != (y).page ? (y) : \
203 (x).offset < (y).offset ? (x) : (y))
246 #define QUEUE_HEAD (asyncQueueControl->head)
247 #define QUEUE_TAIL (asyncQueueControl->tail)
248 #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
249 #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
256 #define AsyncCtl (&AsyncCtlData)
257 #define QUEUE_PAGESIZE BLCKSZ
258 #define QUEUE_FULL_WARN_INTERVAL 5000
277 #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
306 char channel[FLEXIBLE_ARRAY_MEMBER];
472 AsyncCtlLock,
"pg_notify");
541 if (!channel || !strlen(channel))
543 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
544 errmsg(
"channel name cannot be empty")));
548 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
549 errmsg(
"channel name too long")));
555 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
556 errmsg(
"payload string too long")));
580 pendingNotifies =
lappend(pendingNotifies, n);
609 strlen(channel) + 1);
611 strcpy(actrec->
channel, channel);
613 pendingActions =
lappend(pendingActions, actrec);
706 char *channel = (
char *)
lfirst(*lcp);
739 if (pendingActions || pendingNotifies)
741 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
742 errmsg(
"cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
765 if (pendingActions ==
NIL && pendingNotifies ==
NIL)
772 foreach(p, pendingActions)
826 while (nextNotify !=
NULL)
844 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
845 errmsg(
"too many notifications in the NOTIFY queue")));
868 if (!pendingActions && !pendingNotifies)
875 foreach(p, pendingActions)
1000 foreach(q, listenChannels)
1002 char *lchan = (
char *)
lfirst(q);
1004 if (strcmp(lchan, channel) == 0)
1031 listenChannels =
NIL;
1090 if (listenChannels !=
NIL)
1095 else if (!signalled)
1128 foreach(p, listenChannels)
1130 char *lchan = (
char *)
lfirst(p);
1132 if (strcmp(lchan, channel) == 0)
1209 bool pageJump =
false;
1215 offset += entryLength;
1242 size_t channellen = strlen(n->
channel);
1243 size_t payloadlen = strlen(n->
payload);
1252 qe->
length = entryLength;
1257 memcpy(qe->
data + channellen + 1, n->
payload, payloadlen + 1);
1305 AsyncCtl->shared->page_dirty[slotno] =
true;
1307 while (nextNotify !=
NULL)
1320 nextNotify =
lnext(nextNotify);
1336 memcpy(
AsyncCtl->shared->page_buffer[slotno] + offset,
1393 occupied = headPage - tailPage;
1422 if (fillDegree < 0.5)
1445 (
errmsg(
"NOTIFY queue is %.0f%% full", fillDegree * 100),
1447 errdetail(
"The server process with PID %d is among those with the oldest transactions.", minPid)
1450 errhint(
"The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1473 bool signalled =
false;
1512 for (i = 0; i < count; i++)
1523 elog(
DEBUG3,
"could not signal backend with PID %d: %m", pid);
1570 upperPendingActions =
lcons(pendingActions, upperPendingActions);
1575 pendingActions =
NIL;
1577 upperPendingNotifies =
lcons(pendingNotifies, upperPendingNotifies);
1582 pendingNotifies =
NIL;
1595 List *parentPendingActions;
1596 List *parentPendingNotifies;
1598 parentPendingActions = (
List *)
linitial(upperPendingActions);
1607 pendingActions =
list_concat(parentPendingActions, pendingActions);
1609 parentPendingNotifies = (
List *)
linitial(upperPendingNotifies);
1618 pendingNotifies =
list_concat(parentPendingNotifies, pendingNotifies);
1639 while (
list_length(upperPendingActions) > my_level - 2)
1641 pendingActions = (
List *)
linitial(upperPendingActions);
1645 while (
list_length(upperPendingNotifies) > my_level - 2)
1647 pendingNotifies = (
List *)
linitial(upperPendingNotifies);
1794 memcpy(page_buffer.buf + curoffset,
1795 AsyncCtl->shared->page_buffer[slotno] + curoffset,
1817 }
while (!reachedStop);
1867 bool reachedStop =
false;
1868 bool reachedEndOfPage;
1906 *current = thisentry;
1913 char *channel = qe->
data;
1918 char *payload = qe->
data + strlen(channel) + 1;
1933 }
while (!reachedEndOfPage);
2003 if (listenChannels ==
NIL)
2056 elog(
INFO,
"NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2066 if (pendingNotifies ==
NIL)
2069 if (payload ==
NULL)
2091 if (strcmp(n->
channel, channel) == 0 &&
2092 strcmp(n->
payload, payload) == 0)
2095 foreach(p, pendingNotifies)
2099 if (strcmp(n->
channel, channel) == 0 &&
2100 strcmp(n->
payload, payload) == 0)
2118 pendingActions =
NIL;
2119 pendingNotifies =
NIL;
struct QueueBackendStatus QueueBackendStatus
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
int errhint(const char *fmt,...)
static void queue_listen(ListenActionKind action, const char *channel)
static SlruCtlData AsyncCtlData
MemoryContext TopTransactionContext
#define QUEUE_BACKEND_PID(i)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
TimestampTz GetCurrentTimestamp(void)
void AsyncShmemInit(void)
bool TransactionIdIsInProgress(TransactionId xid)
#define SRF_IS_FIRSTCALL()
char * pstrdup(const char *in)
#define DatabaseRelationId
void CommitTransactionCommand(void)
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
#define PG_RETURN_FLOAT8(x)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static void Exec_UnlistenAllCommit(void)
void set_ps_display(const char *activity, bool force)
MemoryContext CurTransactionContext
static List * listenChannels
void AtPrepare_Notify(void)
int errcode(int sqlerrcode)
Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)
bool IsTransactionOrTransactionBlock(void)
List * list_concat(List *list1, List *list2)
static double asyncQueueUsage(void)
void Async_Listen(const char *channel)
static void ClearPendingActionsAndNotifies(void)
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
bool TransactionIdDidCommit(TransactionId transactionId)
static void Exec_UnlistenCommit(const char *channel)
#define PG_PROTOCOL_MAJOR(v)
static void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Size SimpleLruShmemSize(int nslots, int nlsns)
void list_free_deep(List *list)
static bool IsListeningOn(const char *channel)
#define SRF_PERCALL_SETUP()
static bool unlistenExitRegistered
static bool asyncQueueIsFull(void)
static void asyncQueueFillWarning(void)
#define PG_GETARG_TEXT_PP(n)
void LWLockRelease(LWLock *lock)
#define SRF_RETURN_NEXT(_funcctx, _result)
void PreCommit_Notify(void)
#define NUM_ASYNC_BUFFERS
#define QUEUE_POS_OFFSET(x)
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
void pfree(void *pointer)
#define AsyncQueueEntryEmptySize
static List * pendingActions
void PreventCommandDuringRecovery(const char *cmdname)
void ProcessNotifyInterrupt(void)
static bool AsyncExistsPendingNotify(const char *channel, const char *payload)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
static void Async_UnlistenOnExit(int code, Datum arg)
void AtSubCommit_Notify(void)
static bool backendHasSentNotifications
TransactionId GetCurrentTransactionId(void)
#define SET_QUEUE_POS(x, y, z)
int errdetail(const char *fmt,...)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define InvalidTransactionId
static ListCell * list_head(const List *l)
void SimpleLruWritePage(SlruCtl ctl, int slotno)
MemoryContext CurrentMemoryContext
#define NOTIFY_PAYLOAD_MAX_LENGTH
static bool asyncQueuePagePrecedes(int p, int q)
static AsyncQueueControl * asyncQueueControl
static void asyncQueueAdvanceTail(void)
static void Exec_ListenCommit(const char *channel)
#define ereport(elevel, rest)
MemoryContext TopMemoryContext
void SetLatch(volatile Latch *latch)
#define QUEUE_FULL_WARN_INTERVAL
List * lappend(List *list, void *datum)
struct AsyncQueueControl AsyncQueueControl
struct QueuePosition QueuePosition
List * list_delete_cell(List *list, ListCell *cell, ListCell *prev)
Size mul_size(Size s1, Size s2)
void AtSubAbort_Notify(void)
static List * upperPendingActions
static void ProcessIncomingNotify(void)
static void asyncQueueUnregister(void)
static bool amRegisteredListener
void AtAbort_Notify(void)
Size add_size(Size s1, Size s2)
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
static void asyncQueueReadAllNotifications(void)
void AtSubStart_Notify(void)
int GetCurrentTransactionNestLevel(void)
Datum pg_notify(PG_FUNCTION_ARGS)
List * lcons(void *datum, List *list)
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
#define Assert(condition)
void ProcessCompletedNotifies(void)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
static void Exec_ListenPreCommit(void)
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]
void StartTransactionCommand(void)
MemoryContext multi_call_memory_ctx
struct AsyncQueueEntry AsyncQueueEntry
Size AsyncShmemSize(void)
void Async_UnlistenAll(void)
static int list_length(const List *l)
struct Notification Notification
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void Async_Notify(const char *channel, const char *payload)
Datum pg_listening_channels(PG_FUNCTION_ARGS)
char * text_to_cstring(const text *t)
#define AccessExclusiveLock
void AtCommit_Notify(void)
void HandleNotifyInterrupt(void)
int errmsg(const char *fmt,...)
char channel[FLEXIBLE_ARRAY_MEMBER]
#define CStringGetTextDatum(s)
static List * upperPendingNotifies
volatile sig_atomic_t notifyInterruptPending
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer)
#define SLRU_PAGES_PER_SEGMENT
TimestampTz lastQueueFillWarn
CommandDest whereToSendOutput
#define QUEUE_BACKEND_POS(i)
ProtocolVersion FrontendProtocol
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
#define QUEUE_POS_PAGE(x)
#define QUEUE_POS_EQUAL(x, y)
#define offsetof(type, field)
#define QUEUE_POS_MIN(x, y)
void Async_Unlisten(const char *channel)
List * list_delete_first(List *list)
#define SRF_RETURN_DONE(_funcctx)
static bool SignalBackends(void)
#define SRF_FIRSTCALL_INIT()
static List * pendingNotifies