PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeWindowAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  * routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set. Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications. The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore. The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  * src/backend/executor/nodeWindowAgg.c
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35 
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/syscache.h"
53 #include "windowapi.h"
54 
55 /*
56  * All the window function APIs are called with this object, which is passed
57  * to window functions as fcinfo->context.
58  */
59 typedef struct WindowObjectData
60 {
62  WindowAggState *winstate; /* parent WindowAggState */
63  List *argstates; /* ExprState trees for fn's arguments */
64  void *localmem; /* WinGetPartitionLocalMemory's chunk */
65  int markptr; /* tuplestore mark pointer for this fn */
66  int readptr; /* tuplestore read pointer for this fn */
67  int64 markpos; /* row that markptr is positioned on */
68  int64 seekpos; /* row that readptr is positioned on */
70 
71 /*
72  * We have one WindowStatePerFunc struct for each window function and
73  * window aggregate handled by this node.
74  */
75 typedef struct WindowStatePerFuncData
76 {
77  /* Links to WindowFunc expr and state nodes this working state is for */
80 
81  int numArguments; /* number of arguments */
82 
83  FmgrInfo flinfo; /* fmgr lookup data for window function */
84 
85  Oid winCollation; /* collation derived for window function */
86 
87  /*
88  * We need the len and byval info for the result of each function in order
89  * to know how to copy/delete values.
90  */
93 
94  bool plain_agg; /* is it just a plain aggregate function? */
95  int aggno; /* if so, index of its PerAggData */
96 
97  WindowObject winobj; /* object used in window function API */
99 
100 /*
101  * For plain aggregate window functions, we also have one of these.
102  */
103 typedef struct WindowStatePerAggData
104 {
105  /* Oids of transition functions */
107  Oid invtransfn_oid; /* may be InvalidOid */
108  Oid finalfn_oid; /* may be InvalidOid */
109 
110  /*
111  * fmgr lookup data for transition functions --- only valid when
112  * corresponding oid is not InvalidOid. Note in particular that fn_strict
113  * flags are kept here.
114  */
118 
119  int numFinalArgs; /* number of arguments to pass to finalfn */
120 
121  /*
122  * initial value from pg_aggregate entry
123  */
126 
127  /*
128  * cached value for current frame boundaries
129  */
132 
133  /*
134  * We need the len and byval info for the agg's input, result, and
135  * transition data types in order to know how to copy/delete values.
136  */
139  transtypeLen;
143 
144  int wfuncno; /* index of associated PerFuncData */
145 
146  /* Context holding transition value and possibly other subsidiary data */
147  MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
148 
149  /* Current transition value */
150  Datum transValue; /* current transition value */
152 
153  int64 transValueCount; /* number of currently-aggregated rows */
154 
155  /* Data local to eval_windowaggregates() */
156  bool restart; /* need to restart this agg in this cycle? */
158 
159 static void initialize_windowaggregate(WindowAggState *winstate,
160  WindowStatePerFunc perfuncstate,
161  WindowStatePerAgg peraggstate);
162 static void advance_windowaggregate(WindowAggState *winstate,
163  WindowStatePerFunc perfuncstate,
164  WindowStatePerAgg peraggstate);
165 static bool advance_windowaggregate_base(WindowAggState *winstate,
166  WindowStatePerFunc perfuncstate,
167  WindowStatePerAgg peraggstate);
168 static void finalize_windowaggregate(WindowAggState *winstate,
169  WindowStatePerFunc perfuncstate,
170  WindowStatePerAgg peraggstate,
171  Datum *result, bool *isnull);
172 
173 static void eval_windowaggregates(WindowAggState *winstate);
174 static void eval_windowfunction(WindowAggState *winstate,
175  WindowStatePerFunc perfuncstate,
176  Datum *result, bool *isnull);
177 
178 static void begin_partition(WindowAggState *winstate);
179 static void spool_tuples(WindowAggState *winstate, int64 pos);
180 static void release_partition(WindowAggState *winstate);
181 
182 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
183  TupleTableSlot *slot);
184 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
185 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
186 
188  WindowFunc *wfunc,
189  WindowStatePerAgg peraggstate);
190 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
191 
192 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
193  TupleTableSlot *slot2);
194 static bool window_gettupleslot(WindowObject winobj, int64 pos,
195  TupleTableSlot *slot);
196 
197 
198 /*
199  * initialize_windowaggregate
200  * parallel to initialize_aggregates in nodeAgg.c
201  */
202 static void
204  WindowStatePerFunc perfuncstate,
205  WindowStatePerAgg peraggstate)
206 {
207  MemoryContext oldContext;
208 
209  /*
210  * If we're using a private aggcontext, we may reset it here. But if the
211  * context is shared, we don't know which other aggregates may still need
212  * it, so we must leave it to the caller to reset at an appropriate time.
213  */
214  if (peraggstate->aggcontext != winstate->aggcontext)
216 
217  if (peraggstate->initValueIsNull)
218  peraggstate->transValue = peraggstate->initValue;
219  else
220  {
221  oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
222  peraggstate->transValue = datumCopy(peraggstate->initValue,
223  peraggstate->transtypeByVal,
224  peraggstate->transtypeLen);
225  MemoryContextSwitchTo(oldContext);
226  }
227  peraggstate->transValueIsNull = peraggstate->initValueIsNull;
228  peraggstate->transValueCount = 0;
229  peraggstate->resultValue = (Datum) 0;
230  peraggstate->resultValueIsNull = true;
231 }
232 
233 /*
234  * advance_windowaggregate
235  * parallel to advance_aggregates in nodeAgg.c
236  */
237 static void
239  WindowStatePerFunc perfuncstate,
240  WindowStatePerAgg peraggstate)
241 {
242  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
243  int numArguments = perfuncstate->numArguments;
244  FunctionCallInfoData fcinfodata;
245  FunctionCallInfo fcinfo = &fcinfodata;
246  Datum newVal;
247  ListCell *arg;
248  int i;
249  MemoryContext oldContext;
250  ExprContext *econtext = winstate->tmpcontext;
251  ExprState *filter = wfuncstate->aggfilter;
252 
253  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
254 
255  /* Skip anything FILTERed out */
256  if (filter)
257  {
258  bool isnull;
259  Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL);
260 
261  if (isnull || !DatumGetBool(res))
262  {
263  MemoryContextSwitchTo(oldContext);
264  return;
265  }
266  }
267 
268  /* We start from 1, since the 0th arg will be the transition value */
269  i = 1;
270  foreach(arg, wfuncstate->args)
271  {
272  ExprState *argstate = (ExprState *) lfirst(arg);
273 
274  fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
275  &fcinfo->argnull[i], NULL);
276  i++;
277  }
278 
279  if (peraggstate->transfn.fn_strict)
280  {
281  /*
282  * For a strict transfn, nothing happens when there's a NULL input; we
283  * just keep the prior transValue. Note transValueCount doesn't
284  * change either.
285  */
286  for (i = 1; i <= numArguments; i++)
287  {
288  if (fcinfo->argnull[i])
289  {
290  MemoryContextSwitchTo(oldContext);
291  return;
292  }
293  }
294 
295  /*
296  * For strict transition functions with initial value NULL we use the
297  * first non-NULL input as the initial state. (We already checked
298  * that the agg's input type is binary-compatible with its transtype,
299  * so straight copy here is OK.)
300  *
301  * We must copy the datum into aggcontext if it is pass-by-ref. We do
302  * not need to pfree the old transValue, since it's NULL.
303  */
304  if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
305  {
306  MemoryContextSwitchTo(peraggstate->aggcontext);
307  peraggstate->transValue = datumCopy(fcinfo->arg[1],
308  peraggstate->transtypeByVal,
309  peraggstate->transtypeLen);
310  peraggstate->transValueIsNull = false;
311  peraggstate->transValueCount = 1;
312  MemoryContextSwitchTo(oldContext);
313  return;
314  }
315 
316  if (peraggstate->transValueIsNull)
317  {
318  /*
319  * Don't call a strict function with NULL inputs. Note it is
320  * possible to get here despite the above tests, if the transfn is
321  * strict *and* returned a NULL on a prior cycle. If that happens
322  * we will propagate the NULL all the way to the end. That can
323  * only happen if there's no inverse transition function, though,
324  * since we disallow transitions back to NULL when there is one.
325  */
326  MemoryContextSwitchTo(oldContext);
327  Assert(!OidIsValid(peraggstate->invtransfn_oid));
328  return;
329  }
330  }
331 
332  /*
333  * OK to call the transition function. Set winstate->curaggcontext while
334  * calling it, for possible use by AggCheckCallContext.
335  */
336  InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
337  numArguments + 1,
338  perfuncstate->winCollation,
339  (void *) winstate, NULL);
340  fcinfo->arg[0] = peraggstate->transValue;
341  fcinfo->argnull[0] = peraggstate->transValueIsNull;
342  winstate->curaggcontext = peraggstate->aggcontext;
343  newVal = FunctionCallInvoke(fcinfo);
344  winstate->curaggcontext = NULL;
345 
346  /*
347  * Moving-aggregate transition functions must not return null, see
348  * advance_windowaggregate_base().
349  */
350  if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
351  ereport(ERROR,
352  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
353  errmsg("moving-aggregate transition function must not return null")));
354 
355  /*
356  * We must track the number of rows included in transValue, since to
357  * remove the last input, advance_windowaggregate_base() musn't call the
358  * inverse transition function, but simply reset transValue back to its
359  * initial value.
360  */
361  peraggstate->transValueCount++;
362 
363  /*
364  * If pass-by-ref datatype, must copy the new value into aggcontext and
365  * pfree the prior transValue. But if transfn returned a pointer to its
366  * first input, we don't need to do anything.
367  */
368  if (!peraggstate->transtypeByVal &&
369  DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
370  {
371  if (!fcinfo->isnull)
372  {
373  MemoryContextSwitchTo(peraggstate->aggcontext);
374  newVal = datumCopy(newVal,
375  peraggstate->transtypeByVal,
376  peraggstate->transtypeLen);
377  }
378  if (!peraggstate->transValueIsNull)
379  pfree(DatumGetPointer(peraggstate->transValue));
380  }
381 
382  MemoryContextSwitchTo(oldContext);
383  peraggstate->transValue = newVal;
384  peraggstate->transValueIsNull = fcinfo->isnull;
385 }
386 
387 /*
388  * advance_windowaggregate_base
389  * Remove the oldest tuple from an aggregation.
390  *
391  * This is very much like advance_windowaggregate, except that we will call
392  * the inverse transition function (which caller must have checked is
393  * available).
394  *
395  * Returns true if we successfully removed the current row from this
396  * aggregate, false if not (in the latter case, caller is responsible
397  * for cleaning up by restarting the aggregation).
398  */
399 static bool
401  WindowStatePerFunc perfuncstate,
402  WindowStatePerAgg peraggstate)
403 {
404  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
405  int numArguments = perfuncstate->numArguments;
406  FunctionCallInfoData fcinfodata;
407  FunctionCallInfo fcinfo = &fcinfodata;
408  Datum newVal;
409  ListCell *arg;
410  int i;
411  MemoryContext oldContext;
412  ExprContext *econtext = winstate->tmpcontext;
413  ExprState *filter = wfuncstate->aggfilter;
414 
415  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
416 
417  /* Skip anything FILTERed out */
418  if (filter)
419  {
420  bool isnull;
421  Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL);
422 
423  if (isnull || !DatumGetBool(res))
424  {
425  MemoryContextSwitchTo(oldContext);
426  return true;
427  }
428  }
429 
430  /* We start from 1, since the 0th arg will be the transition value */
431  i = 1;
432  foreach(arg, wfuncstate->args)
433  {
434  ExprState *argstate = (ExprState *) lfirst(arg);
435 
436  fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
437  &fcinfo->argnull[i], NULL);
438  i++;
439  }
440 
441  if (peraggstate->invtransfn.fn_strict)
442  {
443  /*
444  * For a strict (inv)transfn, nothing happens when there's a NULL
445  * input; we just keep the prior transValue. Note transValueCount
446  * doesn't change either.
447  */
448  for (i = 1; i <= numArguments; i++)
449  {
450  if (fcinfo->argnull[i])
451  {
452  MemoryContextSwitchTo(oldContext);
453  return true;
454  }
455  }
456  }
457 
458  /* There should still be an added but not yet removed value */
459  Assert(peraggstate->transValueCount > 0);
460 
461  /*
462  * In moving-aggregate mode, the state must never be NULL, except possibly
463  * before any rows have been aggregated (which is surely not the case at
464  * this point). This restriction allows us to interpret a NULL result
465  * from the inverse function as meaning "sorry, can't do an inverse
466  * transition in this case". We already checked this in
467  * advance_windowaggregate, but just for safety, check again.
468  */
469  if (peraggstate->transValueIsNull)
470  elog(ERROR, "aggregate transition value is NULL before inverse transition");
471 
472  /*
473  * We mustn't use the inverse transition function to remove the last
474  * input. Doing so would yield a non-NULL state, whereas we should be in
475  * the initial state afterwards which may very well be NULL. So instead,
476  * we simply re-initialize the aggregate in this case.
477  */
478  if (peraggstate->transValueCount == 1)
479  {
480  MemoryContextSwitchTo(oldContext);
482  &winstate->perfunc[peraggstate->wfuncno],
483  peraggstate);
484  return true;
485  }
486 
487  /*
488  * OK to call the inverse transition function. Set
489  * winstate->curaggcontext while calling it, for possible use by
490  * AggCheckCallContext.
491  */
492  InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
493  numArguments + 1,
494  perfuncstate->winCollation,
495  (void *) winstate, NULL);
496  fcinfo->arg[0] = peraggstate->transValue;
497  fcinfo->argnull[0] = peraggstate->transValueIsNull;
498  winstate->curaggcontext = peraggstate->aggcontext;
499  newVal = FunctionCallInvoke(fcinfo);
500  winstate->curaggcontext = NULL;
501 
502  /*
503  * If the function returns NULL, report failure, forcing a restart.
504  */
505  if (fcinfo->isnull)
506  {
507  MemoryContextSwitchTo(oldContext);
508  return false;
509  }
510 
511  /* Update number of rows included in transValue */
512  peraggstate->transValueCount--;
513 
514  /*
515  * If pass-by-ref datatype, must copy the new value into aggcontext and
516  * pfree the prior transValue. But if invtransfn returned a pointer to
517  * its first input, we don't need to do anything.
518  *
519  * Note: the checks for null values here will never fire, but it seems
520  * best to have this stanza look just like advance_windowaggregate.
521  */
522  if (!peraggstate->transtypeByVal &&
523  DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
524  {
525  if (!fcinfo->isnull)
526  {
527  MemoryContextSwitchTo(peraggstate->aggcontext);
528  newVal = datumCopy(newVal,
529  peraggstate->transtypeByVal,
530  peraggstate->transtypeLen);
531  }
532  if (!peraggstate->transValueIsNull)
533  pfree(DatumGetPointer(peraggstate->transValue));
534  }
535 
536  MemoryContextSwitchTo(oldContext);
537  peraggstate->transValue = newVal;
538  peraggstate->transValueIsNull = fcinfo->isnull;
539 
540  return true;
541 }
542 
543 /*
544  * finalize_windowaggregate
545  * parallel to finalize_aggregate in nodeAgg.c
546  */
547 static void
549  WindowStatePerFunc perfuncstate,
550  WindowStatePerAgg peraggstate,
551  Datum *result, bool *isnull)
552 {
553  MemoryContext oldContext;
554 
556 
557  /*
558  * Apply the agg's finalfn if one is provided, else return transValue.
559  */
560  if (OidIsValid(peraggstate->finalfn_oid))
561  {
562  int numFinalArgs = peraggstate->numFinalArgs;
563  FunctionCallInfoData fcinfo;
564  bool anynull;
565  int i;
566 
567  InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
568  numFinalArgs,
569  perfuncstate->winCollation,
570  (void *) winstate, NULL);
571  fcinfo.arg[0] = peraggstate->transValue;
572  fcinfo.argnull[0] = peraggstate->transValueIsNull;
573  anynull = peraggstate->transValueIsNull;
574 
575  /* Fill any remaining argument positions with nulls */
576  for (i = 1; i < numFinalArgs; i++)
577  {
578  fcinfo.arg[i] = (Datum) 0;
579  fcinfo.argnull[i] = true;
580  anynull = true;
581  }
582 
583  if (fcinfo.flinfo->fn_strict && anynull)
584  {
585  /* don't call a strict function with NULL inputs */
586  *result = (Datum) 0;
587  *isnull = true;
588  }
589  else
590  {
591  winstate->curaggcontext = peraggstate->aggcontext;
592  *result = FunctionCallInvoke(&fcinfo);
593  winstate->curaggcontext = NULL;
594  *isnull = fcinfo.isnull;
595  }
596  }
597  else
598  {
599  *result = peraggstate->transValue;
600  *isnull = peraggstate->transValueIsNull;
601  }
602 
603  /*
604  * If result is pass-by-ref, make sure it is in the right context.
605  */
606  if (!peraggstate->resulttypeByVal && !*isnull &&
608  DatumGetPointer(*result)))
609  *result = datumCopy(*result,
610  peraggstate->resulttypeByVal,
611  peraggstate->resulttypeLen);
612  MemoryContextSwitchTo(oldContext);
613 }
614 
615 /*
616  * eval_windowaggregates
617  * evaluate plain aggregates being used as window functions
618  *
619  * This differs from nodeAgg.c in two ways. First, if the window's frame
620  * start position moves, we use the inverse transition function (if it exists)
621  * to remove rows from the transition value. And second, we expect to be
622  * able to call aggregate final functions repeatedly after aggregating more
623  * data onto the same transition value. This is not a behavior required by
624  * nodeAgg.c.
625  */
626 static void
628 {
629  WindowStatePerAgg peraggstate;
630  int wfuncno,
631  numaggs,
632  numaggs_restart,
633  i;
634  int64 aggregatedupto_nonrestarted;
635  MemoryContext oldContext;
636  ExprContext *econtext;
637  WindowObject agg_winobj;
638  TupleTableSlot *agg_row_slot;
639  TupleTableSlot *temp_slot;
640 
641  numaggs = winstate->numaggs;
642  if (numaggs == 0)
643  return; /* nothing to do */
644 
645  /* final output execution is in ps_ExprContext */
646  econtext = winstate->ss.ps.ps_ExprContext;
647  agg_winobj = winstate->agg_winobj;
648  agg_row_slot = winstate->agg_row_slot;
649  temp_slot = winstate->temp_slot_1;
650 
651  /*
652  * Currently, we support only a subset of the SQL-standard window framing
653  * rules.
654  *
655  * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
656  * a contiguous group of rows extending forward from the start of the
657  * partition, and rows only enter the frame, never exit it, as the current
658  * row advances forward. This makes it possible to use an incremental
659  * strategy for evaluating aggregates: we run the transition function for
660  * each row added to the frame, and run the final function whenever we
661  * need the current aggregate value. This is considerably more efficient
662  * than the naive approach of re-running the entire aggregate calculation
663  * for each current row. It does assume that the final function doesn't
664  * damage the running transition value, but we have the same assumption in
665  * nodeAgg.c too (when it rescans an existing hash table).
666  *
667  * If the frame start does sometimes move, we can still optimize as above
668  * whenever successive rows share the same frame head, but if the frame
669  * head moves beyond the previous head we try to remove those rows using
670  * the aggregate's inverse transition function. This function restores
671  * the aggregate's current state to what it would be if the removed row
672  * had never been aggregated in the first place. Inverse transition
673  * functions may optionally return NULL, indicating that the function was
674  * unable to remove the tuple from aggregation. If this happens, or if
675  * the aggregate doesn't have an inverse transition function at all, we
676  * must perform the aggregation all over again for all tuples within the
677  * new frame boundaries.
678  *
679  * In many common cases, multiple rows share the same frame and hence the
680  * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
681  * window, then all rows are peers and so they all have window frame equal
682  * to the whole partition.) We optimize such cases by calculating the
683  * aggregate value once when we reach the first row of a peer group, and
684  * then returning the saved value for all subsequent rows.
685  *
686  * 'aggregatedupto' keeps track of the first row that has not yet been
687  * accumulated into the aggregate transition values. Whenever we start a
688  * new peer group, we accumulate forward to the end of the peer group.
689  */
690 
691  /*
692  * First, update the frame head position.
693  *
694  * The frame head should never move backwards, and the code below wouldn't
695  * cope if it did, so for safety we complain if it does.
696  */
697  update_frameheadpos(agg_winobj, temp_slot);
698  if (winstate->frameheadpos < winstate->aggregatedbase)
699  elog(ERROR, "window frame head moved backward");
700 
701  /*
702  * If the frame didn't change compared to the previous row, we can re-use
703  * the result values that were previously saved at the bottom of this
704  * function. Since we don't know the current frame's end yet, this is not
705  * possible to check for fully. But if the frame end mode is UNBOUNDED
706  * FOLLOWING or CURRENT ROW, and the current row lies within the previous
707  * row's frame, then the two frames' ends must coincide. Note that on the
708  * first row aggregatedbase == aggregatedupto, meaning this test must
709  * fail, so we don't need to check the "there was no previous row" case
710  * explicitly here.
711  */
712  if (winstate->aggregatedbase == winstate->frameheadpos &&
715  winstate->aggregatedbase <= winstate->currentpos &&
716  winstate->aggregatedupto > winstate->currentpos)
717  {
718  for (i = 0; i < numaggs; i++)
719  {
720  peraggstate = &winstate->peragg[i];
721  wfuncno = peraggstate->wfuncno;
722  econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
723  econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
724  }
725  return;
726  }
727 
728  /*----------
729  * Initialize restart flags.
730  *
731  * We restart the aggregation:
732  * - if we're processing the first row in the partition, or
733  * - if the frame's head moved and we cannot use an inverse
734  * transition function, or
735  * - if the new frame doesn't overlap the old one
736  *
737  * Note that we don't strictly need to restart in the last case, but if
738  * we're going to remove all rows from the aggregation anyway, a restart
739  * surely is faster.
740  *----------
741  */
742  numaggs_restart = 0;
743  for (i = 0; i < numaggs; i++)
744  {
745  peraggstate = &winstate->peragg[i];
746  if (winstate->currentpos == 0 ||
747  (winstate->aggregatedbase != winstate->frameheadpos &&
748  !OidIsValid(peraggstate->invtransfn_oid)) ||
749  winstate->aggregatedupto <= winstate->frameheadpos)
750  {
751  peraggstate->restart = true;
752  numaggs_restart++;
753  }
754  else
755  peraggstate->restart = false;
756  }
757 
758  /*
759  * If we have any possibly-moving aggregates, attempt to advance
760  * aggregatedbase to match the frame's head by removing input rows that
761  * fell off the top of the frame from the aggregations. This can fail,
762  * i.e. advance_windowaggregate_base() can return false, in which case
763  * we'll restart that aggregate below.
764  */
765  while (numaggs_restart < numaggs &&
766  winstate->aggregatedbase < winstate->frameheadpos)
767  {
768  /*
769  * Fetch the next tuple of those being removed. This should never fail
770  * as we should have been here before.
771  */
772  if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
773  temp_slot))
774  elog(ERROR, "could not re-fetch previously fetched frame row");
775 
776  /* Set tuple context for evaluation of aggregate arguments */
777  winstate->tmpcontext->ecxt_outertuple = temp_slot;
778 
779  /*
780  * Perform the inverse transition for each aggregate function in the
781  * window, unless it has already been marked as needing a restart.
782  */
783  for (i = 0; i < numaggs; i++)
784  {
785  bool ok;
786 
787  peraggstate = &winstate->peragg[i];
788  if (peraggstate->restart)
789  continue;
790 
791  wfuncno = peraggstate->wfuncno;
792  ok = advance_windowaggregate_base(winstate,
793  &winstate->perfunc[wfuncno],
794  peraggstate);
795  if (!ok)
796  {
797  /* Inverse transition function has failed, must restart */
798  peraggstate->restart = true;
799  numaggs_restart++;
800  }
801  }
802 
803  /* Reset per-input-tuple context after each tuple */
804  ResetExprContext(winstate->tmpcontext);
805 
806  /* And advance the aggregated-row state */
807  winstate->aggregatedbase++;
808  ExecClearTuple(temp_slot);
809  }
810 
811  /*
812  * If we successfully advanced the base rows of all the aggregates,
813  * aggregatedbase now equals frameheadpos; but if we failed for any, we
814  * must forcibly update aggregatedbase.
815  */
816  winstate->aggregatedbase = winstate->frameheadpos;
817 
818  /*
819  * If we created a mark pointer for aggregates, keep it pushed up to frame
820  * head, so that tuplestore can discard unnecessary rows.
821  */
822  if (agg_winobj->markptr >= 0)
823  WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
824 
825  /*
826  * Now restart the aggregates that require it.
827  *
828  * We assume that aggregates using the shared context always restart if
829  * *any* aggregate restarts, and we may thus clean up the shared
830  * aggcontext if that is the case. Private aggcontexts are reset by
831  * initialize_windowaggregate() if their owning aggregate restarts. If we
832  * aren't restarting an aggregate, we need to free any previously saved
833  * result for it, else we'll leak memory.
834  */
835  if (numaggs_restart > 0)
837  for (i = 0; i < numaggs; i++)
838  {
839  peraggstate = &winstate->peragg[i];
840 
841  /* Aggregates using the shared ctx must restart if *any* agg does */
842  Assert(peraggstate->aggcontext != winstate->aggcontext ||
843  numaggs_restart == 0 ||
844  peraggstate->restart);
845 
846  if (peraggstate->restart)
847  {
848  wfuncno = peraggstate->wfuncno;
850  &winstate->perfunc[wfuncno],
851  peraggstate);
852  }
853  else if (!peraggstate->resultValueIsNull)
854  {
855  if (!peraggstate->resulttypeByVal)
856  pfree(DatumGetPointer(peraggstate->resultValue));
857  peraggstate->resultValue = (Datum) 0;
858  peraggstate->resultValueIsNull = true;
859  }
860  }
861 
862  /*
863  * Non-restarted aggregates now contain the rows between aggregatedbase
864  * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
865  * contain no rows. If there are any restarted aggregates, we must thus
866  * begin aggregating anew at frameheadpos, otherwise we may simply
867  * continue at aggregatedupto. We must remember the old value of
868  * aggregatedupto to know how long to skip advancing non-restarted
869  * aggregates. If we modify aggregatedupto, we must also clear
870  * agg_row_slot, per the loop invariant below.
871  */
872  aggregatedupto_nonrestarted = winstate->aggregatedupto;
873  if (numaggs_restart > 0 &&
874  winstate->aggregatedupto != winstate->frameheadpos)
875  {
876  winstate->aggregatedupto = winstate->frameheadpos;
877  ExecClearTuple(agg_row_slot);
878  }
879 
880  /*
881  * Advance until we reach a row not in frame (or end of partition).
882  *
883  * Note the loop invariant: agg_row_slot is either empty or holds the row
884  * at position aggregatedupto. We advance aggregatedupto after processing
885  * a row.
886  */
887  for (;;)
888  {
889  /* Fetch next row if we didn't already */
890  if (TupIsNull(agg_row_slot))
891  {
892  if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
893  agg_row_slot))
894  break; /* must be end of partition */
895  }
896 
897  /* Exit loop (for now) if not in frame */
898  if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
899  break;
900 
901  /* Set tuple context for evaluation of aggregate arguments */
902  winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
903 
904  /* Accumulate row into the aggregates */
905  for (i = 0; i < numaggs; i++)
906  {
907  peraggstate = &winstate->peragg[i];
908 
909  /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
910  if (!peraggstate->restart &&
911  winstate->aggregatedupto < aggregatedupto_nonrestarted)
912  continue;
913 
914  wfuncno = peraggstate->wfuncno;
915  advance_windowaggregate(winstate,
916  &winstate->perfunc[wfuncno],
917  peraggstate);
918  }
919 
920  /* Reset per-input-tuple context after each tuple */
921  ResetExprContext(winstate->tmpcontext);
922 
923  /* And advance the aggregated-row state */
924  winstate->aggregatedupto++;
925  ExecClearTuple(agg_row_slot);
926  }
927 
928  /* The frame's end is not supposed to move backwards, ever */
929  Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
930 
931  /*
932  * finalize aggregates and fill result/isnull fields.
933  */
934  for (i = 0; i < numaggs; i++)
935  {
936  Datum *result;
937  bool *isnull;
938 
939  peraggstate = &winstate->peragg[i];
940  wfuncno = peraggstate->wfuncno;
941  result = &econtext->ecxt_aggvalues[wfuncno];
942  isnull = &econtext->ecxt_aggnulls[wfuncno];
943  finalize_windowaggregate(winstate,
944  &winstate->perfunc[wfuncno],
945  peraggstate,
946  result, isnull);
947 
948  /*
949  * save the result in case next row shares the same frame.
950  *
951  * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
952  * advance that the next row can't possibly share the same frame. Is
953  * it worth detecting that and skipping this code?
954  */
955  if (!peraggstate->resulttypeByVal && !*isnull)
956  {
957  oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
958  peraggstate->resultValue =
959  datumCopy(*result,
960  peraggstate->resulttypeByVal,
961  peraggstate->resulttypeLen);
962  MemoryContextSwitchTo(oldContext);
963  }
964  else
965  {
966  peraggstate->resultValue = *result;
967  }
968  peraggstate->resultValueIsNull = *isnull;
969  }
970 }
971 
972 /*
973  * eval_windowfunction
974  *
975  * Arguments of window functions are not evaluated here, because a window
976  * function can need random access to arbitrary rows in the partition.
977  * The window function uses the special WinGetFuncArgInPartition and
978  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
979  * it wants.
980  */
981 static void
983  Datum *result, bool *isnull)
984 {
985  FunctionCallInfoData fcinfo;
986  MemoryContext oldContext;
987 
989 
990  /*
991  * We don't pass any normal arguments to a window function, but we do pass
992  * it the number of arguments, in order to permit window function
993  * implementations to support varying numbers of arguments. The real info
994  * goes through the WindowObject, which is passed via fcinfo->context.
995  */
996  InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
997  perfuncstate->numArguments,
998  perfuncstate->winCollation,
999  (void *) perfuncstate->winobj, NULL);
1000  /* Just in case, make all the regular argument slots be null */
1001  memset(fcinfo.argnull, true, perfuncstate->numArguments);
1002  /* Window functions don't have a current aggregate context, either */
1003  winstate->curaggcontext = NULL;
1004 
1005  *result = FunctionCallInvoke(&fcinfo);
1006  *isnull = fcinfo.isnull;
1007 
1008  /*
1009  * Make sure pass-by-ref data is allocated in the appropriate context. (We
1010  * need this in case the function returns a pointer into some short-lived
1011  * tuple, as is entirely possible.)
1012  */
1013  if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
1015  DatumGetPointer(*result)))
1016  *result = datumCopy(*result,
1017  perfuncstate->resulttypeByVal,
1018  perfuncstate->resulttypeLen);
1019 
1020  MemoryContextSwitchTo(oldContext);
1021 }
1022 
1023 /*
1024  * begin_partition
1025  * Start buffering rows of the next partition.
1026  */
1027 static void
1029 {
1030  PlanState *outerPlan = outerPlanState(winstate);
1031  int numfuncs = winstate->numfuncs;
1032  int i;
1033 
1034  winstate->partition_spooled = false;
1035  winstate->framehead_valid = false;
1036  winstate->frametail_valid = false;
1037  winstate->spooled_rows = 0;
1038  winstate->currentpos = 0;
1039  winstate->frameheadpos = 0;
1040  winstate->frametailpos = -1;
1041  ExecClearTuple(winstate->agg_row_slot);
1042 
1043  /*
1044  * If this is the very first partition, we need to fetch the first input
1045  * row to store in first_part_slot.
1046  */
1047  if (TupIsNull(winstate->first_part_slot))
1048  {
1049  TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1050 
1051  if (!TupIsNull(outerslot))
1052  ExecCopySlot(winstate->first_part_slot, outerslot);
1053  else
1054  {
1055  /* outer plan is empty, so we have nothing to do */
1056  winstate->partition_spooled = true;
1057  winstate->more_partitions = false;
1058  return;
1059  }
1060  }
1061 
1062  /* Create new tuplestore for this partition */
1063  winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1064 
1065  /*
1066  * Set up read pointers for the tuplestore. The current pointer doesn't
1067  * need BACKWARD capability, but the per-window-function read pointers do,
1068  * and the aggregate pointer does if frame start is movable.
1069  */
1070  winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1071 
1072  /* reset default REWIND capability bit for current ptr */
1073  tuplestore_set_eflags(winstate->buffer, 0);
1074 
1075  /* create read pointers for aggregates, if needed */
1076  if (winstate->numaggs > 0)
1077  {
1078  WindowObject agg_winobj = winstate->agg_winobj;
1079  int readptr_flags = 0;
1080 
1081  /* If the frame head is potentially movable ... */
1083  {
1084  /* ... create a mark pointer to track the frame head */
1085  agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1086  /* and the read pointer will need BACKWARD capability */
1087  readptr_flags |= EXEC_FLAG_BACKWARD;
1088  }
1089 
1090  agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1091  readptr_flags);
1092  agg_winobj->markpos = -1;
1093  agg_winobj->seekpos = -1;
1094 
1095  /* Also reset the row counters for aggregates */
1096  winstate->aggregatedbase = 0;
1097  winstate->aggregatedupto = 0;
1098  }
1099 
1100  /* create mark and read pointers for each real window function */
1101  for (i = 0; i < numfuncs; i++)
1102  {
1103  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1104 
1105  if (!perfuncstate->plain_agg)
1106  {
1107  WindowObject winobj = perfuncstate->winobj;
1108 
1109  winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1110  0);
1111  winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1113  winobj->markpos = -1;
1114  winobj->seekpos = -1;
1115  }
1116  }
1117 
1118  /*
1119  * Store the first tuple into the tuplestore (it's always available now;
1120  * we either read it above, or saved it at the end of previous partition)
1121  */
1122  tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1123  winstate->spooled_rows++;
1124 }
1125 
1126 /*
1127  * Read tuples from the outer node, up to and including position 'pos', and
1128  * store them into the tuplestore. If pos is -1, reads the whole partition.
1129  */
1130 static void
1131 spool_tuples(WindowAggState *winstate, int64 pos)
1132 {
1133  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1135  TupleTableSlot *outerslot;
1136  MemoryContext oldcontext;
1137 
1138  if (!winstate->buffer)
1139  return; /* just a safety check */
1140  if (winstate->partition_spooled)
1141  return; /* whole partition done already */
1142 
1143  /*
1144  * If the tuplestore has spilled to disk, alternate reading and writing
1145  * becomes quite expensive due to frequent buffer flushes. It's cheaper
1146  * to force the entire partition to get spooled in one go.
1147  *
1148  * XXX this is a horrid kluge --- it'd be better to fix the performance
1149  * problem inside tuplestore. FIXME
1150  */
1151  if (!tuplestore_in_memory(winstate->buffer))
1152  pos = -1;
1153 
1154  outerPlan = outerPlanState(winstate);
1155 
1156  /* Must be in query context to call outerplan */
1158 
1159  while (winstate->spooled_rows <= pos || pos == -1)
1160  {
1161  outerslot = ExecProcNode(outerPlan);
1162  if (TupIsNull(outerslot))
1163  {
1164  /* reached the end of the last partition */
1165  winstate->partition_spooled = true;
1166  winstate->more_partitions = false;
1167  break;
1168  }
1169 
1170  if (node->partNumCols > 0)
1171  {
1172  /* Check if this tuple still belongs to the current partition */
1173  if (!execTuplesMatch(winstate->first_part_slot,
1174  outerslot,
1175  node->partNumCols, node->partColIdx,
1176  winstate->partEqfunctions,
1177  winstate->tmpcontext->ecxt_per_tuple_memory))
1178  {
1179  /*
1180  * end of partition; copy the tuple for the next cycle.
1181  */
1182  ExecCopySlot(winstate->first_part_slot, outerslot);
1183  winstate->partition_spooled = true;
1184  winstate->more_partitions = true;
1185  break;
1186  }
1187  }
1188 
1189  /* Still in partition, so save it into the tuplestore */
1190  tuplestore_puttupleslot(winstate->buffer, outerslot);
1191  winstate->spooled_rows++;
1192  }
1193 
1194  MemoryContextSwitchTo(oldcontext);
1195 }
1196 
1197 /*
1198  * release_partition
1199  * clear information kept within a partition, including
1200  * tuplestore and aggregate results.
1201  */
1202 static void
1204 {
1205  int i;
1206 
1207  for (i = 0; i < winstate->numfuncs; i++)
1208  {
1209  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1210 
1211  /* Release any partition-local state of this window function */
1212  if (perfuncstate->winobj)
1213  perfuncstate->winobj->localmem = NULL;
1214  }
1215 
1216  /*
1217  * Release all partition-local memory (in particular, any partition-local
1218  * state that we might have trashed our pointers to in the above loop, and
1219  * any aggregate temp data). We don't rely on retail pfree because some
1220  * aggregates might have allocated data we don't have direct pointers to.
1221  */
1224  for (i = 0; i < winstate->numaggs; i++)
1225  {
1226  if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1228  }
1229 
1230  if (winstate->buffer)
1231  tuplestore_end(winstate->buffer);
1232  winstate->buffer = NULL;
1233  winstate->partition_spooled = false;
1234 }
1235 
1236 /*
1237  * row_is_in_frame
1238  * Determine whether a row is in the current row's window frame according
1239  * to our window framing rule
1240  *
1241  * The caller must have already determined that the row is in the partition
1242  * and fetched it into a slot. This function just encapsulates the framing
1243  * rules.
1244  */
1245 static bool
1246 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1247 {
1248  int frameOptions = winstate->frameOptions;
1249 
1250  Assert(pos >= 0); /* else caller error */
1251 
1252  /* First, check frame starting conditions */
1253  if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1254  {
1255  if (frameOptions & FRAMEOPTION_ROWS)
1256  {
1257  /* rows before current row are out of frame */
1258  if (pos < winstate->currentpos)
1259  return false;
1260  }
1261  else if (frameOptions & FRAMEOPTION_RANGE)
1262  {
1263  /* preceding row that is not peer is out of frame */
1264  if (pos < winstate->currentpos &&
1265  !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1266  return false;
1267  }
1268  else
1269  Assert(false);
1270  }
1271  else if (frameOptions & FRAMEOPTION_START_VALUE)
1272  {
1273  if (frameOptions & FRAMEOPTION_ROWS)
1274  {
1275  int64 offset = DatumGetInt64(winstate->startOffsetValue);
1276 
1277  /* rows before current row + offset are out of frame */
1278  if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1279  offset = -offset;
1280 
1281  if (pos < winstate->currentpos + offset)
1282  return false;
1283  }
1284  else if (frameOptions & FRAMEOPTION_RANGE)
1285  {
1286  /* parser should have rejected this */
1287  elog(ERROR, "window frame with value offset is not implemented");
1288  }
1289  else
1290  Assert(false);
1291  }
1292 
1293  /* Okay so far, now check frame ending conditions */
1294  if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1295  {
1296  if (frameOptions & FRAMEOPTION_ROWS)
1297  {
1298  /* rows after current row are out of frame */
1299  if (pos > winstate->currentpos)
1300  return false;
1301  }
1302  else if (frameOptions & FRAMEOPTION_RANGE)
1303  {
1304  /* following row that is not peer is out of frame */
1305  if (pos > winstate->currentpos &&
1306  !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1307  return false;
1308  }
1309  else
1310  Assert(false);
1311  }
1312  else if (frameOptions & FRAMEOPTION_END_VALUE)
1313  {
1314  if (frameOptions & FRAMEOPTION_ROWS)
1315  {
1316  int64 offset = DatumGetInt64(winstate->endOffsetValue);
1317 
1318  /* rows after current row + offset are out of frame */
1319  if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1320  offset = -offset;
1321 
1322  if (pos > winstate->currentpos + offset)
1323  return false;
1324  }
1325  else if (frameOptions & FRAMEOPTION_RANGE)
1326  {
1327  /* parser should have rejected this */
1328  elog(ERROR, "window frame with value offset is not implemented");
1329  }
1330  else
1331  Assert(false);
1332  }
1333 
1334  /* If we get here, it's in frame */
1335  return true;
1336 }
1337 
1338 /*
1339  * update_frameheadpos
1340  * make frameheadpos valid for the current row
1341  *
1342  * Uses the winobj's read pointer for any required fetches; hence, if the
1343  * frame mode is one that requires row comparisons, the winobj's mark must
1344  * not be past the currently known frame head. Also uses the specified slot
1345  * for any required fetches.
1346  */
1347 static void
1349 {
1350  WindowAggState *winstate = winobj->winstate;
1351  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1352  int frameOptions = winstate->frameOptions;
1353 
1354  if (winstate->framehead_valid)
1355  return; /* already known for current row */
1356 
1357  if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1358  {
1359  /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1360  winstate->frameheadpos = 0;
1361  winstate->framehead_valid = true;
1362  }
1363  else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1364  {
1365  if (frameOptions & FRAMEOPTION_ROWS)
1366  {
1367  /* In ROWS mode, frame head is the same as current */
1368  winstate->frameheadpos = winstate->currentpos;
1369  winstate->framehead_valid = true;
1370  }
1371  else if (frameOptions & FRAMEOPTION_RANGE)
1372  {
1373  int64 fhprev;
1374 
1375  /* If no ORDER BY, all rows are peers with each other */
1376  if (node->ordNumCols == 0)
1377  {
1378  winstate->frameheadpos = 0;
1379  winstate->framehead_valid = true;
1380  return;
1381  }
1382 
1383  /*
1384  * In RANGE START_CURRENT mode, frame head is the first row that
1385  * is a peer of current row. We search backwards from current,
1386  * which could be a bit inefficient if peer sets are large. Might
1387  * be better to have a separate read pointer that moves forward
1388  * tracking the frame head.
1389  */
1390  fhprev = winstate->currentpos - 1;
1391  for (;;)
1392  {
1393  /* assume the frame head can't go backwards */
1394  if (fhprev < winstate->frameheadpos)
1395  break;
1396  if (!window_gettupleslot(winobj, fhprev, slot))
1397  break; /* start of partition */
1398  if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1399  break; /* not peer of current row */
1400  fhprev--;
1401  }
1402  winstate->frameheadpos = fhprev + 1;
1403  winstate->framehead_valid = true;
1404  }
1405  else
1406  Assert(false);
1407  }
1408  else if (frameOptions & FRAMEOPTION_START_VALUE)
1409  {
1410  if (frameOptions & FRAMEOPTION_ROWS)
1411  {
1412  /* In ROWS mode, bound is physically n before/after current */
1413  int64 offset = DatumGetInt64(winstate->startOffsetValue);
1414 
1415  if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1416  offset = -offset;
1417 
1418  winstate->frameheadpos = winstate->currentpos + offset;
1419  /* frame head can't go before first row */
1420  if (winstate->frameheadpos < 0)
1421  winstate->frameheadpos = 0;
1422  else if (winstate->frameheadpos > winstate->currentpos)
1423  {
1424  /* make sure frameheadpos is not past end of partition */
1425  spool_tuples(winstate, winstate->frameheadpos - 1);
1426  if (winstate->frameheadpos > winstate->spooled_rows)
1427  winstate->frameheadpos = winstate->spooled_rows;
1428  }
1429  winstate->framehead_valid = true;
1430  }
1431  else if (frameOptions & FRAMEOPTION_RANGE)
1432  {
1433  /* parser should have rejected this */
1434  elog(ERROR, "window frame with value offset is not implemented");
1435  }
1436  else
1437  Assert(false);
1438  }
1439  else
1440  Assert(false);
1441 }
1442 
1443 /*
1444  * update_frametailpos
1445  * make frametailpos valid for the current row
1446  *
1447  * Uses the winobj's read pointer for any required fetches; hence, if the
1448  * frame mode is one that requires row comparisons, the winobj's mark must
1449  * not be past the currently known frame tail. Also uses the specified slot
1450  * for any required fetches.
1451  */
1452 static void
1454 {
1455  WindowAggState *winstate = winobj->winstate;
1456  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1457  int frameOptions = winstate->frameOptions;
1458 
1459  if (winstate->frametail_valid)
1460  return; /* already known for current row */
1461 
1462  if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1463  {
1464  /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1465  spool_tuples(winstate, -1);
1466  winstate->frametailpos = winstate->spooled_rows - 1;
1467  winstate->frametail_valid = true;
1468  }
1469  else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1470  {
1471  if (frameOptions & FRAMEOPTION_ROWS)
1472  {
1473  /* In ROWS mode, exactly the rows up to current are in frame */
1474  winstate->frametailpos = winstate->currentpos;
1475  winstate->frametail_valid = true;
1476  }
1477  else if (frameOptions & FRAMEOPTION_RANGE)
1478  {
1479  int64 ftnext;
1480 
1481  /* If no ORDER BY, all rows are peers with each other */
1482  if (node->ordNumCols == 0)
1483  {
1484  spool_tuples(winstate, -1);
1485  winstate->frametailpos = winstate->spooled_rows - 1;
1486  winstate->frametail_valid = true;
1487  return;
1488  }
1489 
1490  /*
1491  * Else we have to search for the first non-peer of the current
1492  * row. We assume the current value of frametailpos is a lower
1493  * bound on the possible frame tail location, ie, frame tail never
1494  * goes backward, and that currentpos is also a lower bound, ie,
1495  * frame end always >= current row.
1496  */
1497  ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1498  for (;;)
1499  {
1500  if (!window_gettupleslot(winobj, ftnext, slot))
1501  break; /* end of partition */
1502  if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1503  break; /* not peer of current row */
1504  ftnext++;
1505  }
1506  winstate->frametailpos = ftnext - 1;
1507  winstate->frametail_valid = true;
1508  }
1509  else
1510  Assert(false);
1511  }
1512  else if (frameOptions & FRAMEOPTION_END_VALUE)
1513  {
1514  if (frameOptions & FRAMEOPTION_ROWS)
1515  {
1516  /* In ROWS mode, bound is physically n before/after current */
1517  int64 offset = DatumGetInt64(winstate->endOffsetValue);
1518 
1519  if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1520  offset = -offset;
1521 
1522  winstate->frametailpos = winstate->currentpos + offset;
1523  /* smallest allowable value of frametailpos is -1 */
1524  if (winstate->frametailpos < 0)
1525  winstate->frametailpos = -1;
1526  else if (winstate->frametailpos > winstate->currentpos)
1527  {
1528  /* make sure frametailpos is not past last row of partition */
1529  spool_tuples(winstate, winstate->frametailpos);
1530  if (winstate->frametailpos >= winstate->spooled_rows)
1531  winstate->frametailpos = winstate->spooled_rows - 1;
1532  }
1533  winstate->frametail_valid = true;
1534  }
1535  else if (frameOptions & FRAMEOPTION_RANGE)
1536  {
1537  /* parser should have rejected this */
1538  elog(ERROR, "window frame with value offset is not implemented");
1539  }
1540  else
1541  Assert(false);
1542  }
1543  else
1544  Assert(false);
1545 }
1546 
1547 
1548 /* -----------------
1549  * ExecWindowAgg
1550  *
1551  * ExecWindowAgg receives tuples from its outer subplan and
1552  * stores them into a tuplestore, then processes window functions.
1553  * This node doesn't reduce nor qualify any row so the number of
1554  * returned rows is exactly the same as its outer subplan's result
1555  * (ignoring the case of SRFs in the targetlist, that is).
1556  * -----------------
1557  */
1560 {
1561  TupleTableSlot *result;
1562  ExprDoneCond isDone;
1563  ExprContext *econtext;
1564  int i;
1565  int numfuncs;
1566 
1567  if (winstate->all_done)
1568  return NULL;
1569 
1570  /*
1571  * Check to see if we're still projecting out tuples from a previous
1572  * output tuple (because there is a function-returning-set in the
1573  * projection expressions). If so, try to project another one.
1574  */
1575  if (winstate->ss.ps.ps_TupFromTlist)
1576  {
1577  TupleTableSlot *result;
1578  ExprDoneCond isDone;
1579 
1580  result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1581  if (isDone == ExprMultipleResult)
1582  return result;
1583  /* Done with that source tuple... */
1584  winstate->ss.ps.ps_TupFromTlist = false;
1585  }
1586 
1587  /*
1588  * Compute frame offset values, if any, during first call.
1589  */
1590  if (winstate->all_first)
1591  {
1592  int frameOptions = winstate->frameOptions;
1593  ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1594  Datum value;
1595  bool isnull;
1596  int16 len;
1597  bool byval;
1598 
1599  if (frameOptions & FRAMEOPTION_START_VALUE)
1600  {
1601  Assert(winstate->startOffset != NULL);
1602  value = ExecEvalExprSwitchContext(winstate->startOffset,
1603  econtext,
1604  &isnull,
1605  NULL);
1606  if (isnull)
1607  ereport(ERROR,
1608  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1609  errmsg("frame starting offset must not be null")));
1610  /* copy value into query-lifespan context */
1611  get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1612  &len, &byval);
1613  winstate->startOffsetValue = datumCopy(value, byval, len);
1614  if (frameOptions & FRAMEOPTION_ROWS)
1615  {
1616  /* value is known to be int8 */
1617  int64 offset = DatumGetInt64(value);
1618 
1619  if (offset < 0)
1620  ereport(ERROR,
1621  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1622  errmsg("frame starting offset must not be negative")));
1623  }
1624  }
1625  if (frameOptions & FRAMEOPTION_END_VALUE)
1626  {
1627  Assert(winstate->endOffset != NULL);
1628  value = ExecEvalExprSwitchContext(winstate->endOffset,
1629  econtext,
1630  &isnull,
1631  NULL);
1632  if (isnull)
1633  ereport(ERROR,
1634  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1635  errmsg("frame ending offset must not be null")));
1636  /* copy value into query-lifespan context */
1637  get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1638  &len, &byval);
1639  winstate->endOffsetValue = datumCopy(value, byval, len);
1640  if (frameOptions & FRAMEOPTION_ROWS)
1641  {
1642  /* value is known to be int8 */
1643  int64 offset = DatumGetInt64(value);
1644 
1645  if (offset < 0)
1646  ereport(ERROR,
1647  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1648  errmsg("frame ending offset must not be negative")));
1649  }
1650  }
1651  winstate->all_first = false;
1652  }
1653 
1654 restart:
1655  if (winstate->buffer == NULL)
1656  {
1657  /* Initialize for first partition and set current row = 0 */
1658  begin_partition(winstate);
1659  /* If there are no input rows, we'll detect that and exit below */
1660  }
1661  else
1662  {
1663  /* Advance current row within partition */
1664  winstate->currentpos++;
1665  /* This might mean that the frame moves, too */
1666  winstate->framehead_valid = false;
1667  winstate->frametail_valid = false;
1668  }
1669 
1670  /*
1671  * Spool all tuples up to and including the current row, if we haven't
1672  * already
1673  */
1674  spool_tuples(winstate, winstate->currentpos);
1675 
1676  /* Move to the next partition if we reached the end of this partition */
1677  if (winstate->partition_spooled &&
1678  winstate->currentpos >= winstate->spooled_rows)
1679  {
1680  release_partition(winstate);
1681 
1682  if (winstate->more_partitions)
1683  {
1684  begin_partition(winstate);
1685  Assert(winstate->spooled_rows > 0);
1686  }
1687  else
1688  {
1689  winstate->all_done = true;
1690  return NULL;
1691  }
1692  }
1693 
1694  /* final output execution is in ps_ExprContext */
1695  econtext = winstate->ss.ps.ps_ExprContext;
1696 
1697  /* Clear the per-output-tuple context for current row */
1698  ResetExprContext(econtext);
1699 
1700  /*
1701  * Read the current row from the tuplestore, and save in ScanTupleSlot.
1702  * (We can't rely on the outerplan's output slot because we may have to
1703  * read beyond the current row. Also, we have to actually copy the row
1704  * out of the tuplestore, since window function evaluation might cause the
1705  * tuplestore to dump its state to disk.)
1706  *
1707  * Current row must be in the tuplestore, since we spooled it above.
1708  */
1709  tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1710  if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1711  winstate->ss.ss_ScanTupleSlot))
1712  elog(ERROR, "unexpected end of tuplestore");
1713 
1714  /*
1715  * Evaluate true window functions
1716  */
1717  numfuncs = winstate->numfuncs;
1718  for (i = 0; i < numfuncs; i++)
1719  {
1720  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1721 
1722  if (perfuncstate->plain_agg)
1723  continue;
1724  eval_windowfunction(winstate, perfuncstate,
1725  &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1726  &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1727  }
1728 
1729  /*
1730  * Evaluate aggregates
1731  */
1732  if (winstate->numaggs > 0)
1733  eval_windowaggregates(winstate);
1734 
1735  /*
1736  * Truncate any no-longer-needed rows from the tuplestore.
1737  */
1738  tuplestore_trim(winstate->buffer);
1739 
1740  /*
1741  * Form and return a projection tuple using the windowfunc results and the
1742  * current row. Setting ecxt_outertuple arranges that any Vars will be
1743  * evaluated with respect to that row.
1744  */
1745  econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1746  result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1747 
1748  if (isDone == ExprEndResult)
1749  {
1750  /* SRF in tlist returned no rows, so advance to next input tuple */
1751  goto restart;
1752  }
1753 
1754  winstate->ss.ps.ps_TupFromTlist =
1755  (isDone == ExprMultipleResult);
1756  return result;
1757 }
1758 
1759 /* -----------------
1760  * ExecInitWindowAgg
1761  *
1762  * Creates the run-time information for the WindowAgg node produced by the
1763  * planner and initializes its outer subtree
1764  * -----------------
1765  */
1767 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1768 {
1769  WindowAggState *winstate;
1770  Plan *outerPlan;
1771  ExprContext *econtext;
1772  ExprContext *tmpcontext;
1773  WindowStatePerFunc perfunc;
1774  WindowStatePerAgg peragg;
1775  int numfuncs,
1776  wfuncno,
1777  numaggs,
1778  aggno;
1779  ListCell *l;
1780 
1781  /* check for unsupported flags */
1782  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1783 
1784  /*
1785  * create state structure
1786  */
1787  winstate = makeNode(WindowAggState);
1788  winstate->ss.ps.plan = (Plan *) node;
1789  winstate->ss.ps.state = estate;
1790 
1791  /*
1792  * Create expression contexts. We need two, one for per-input-tuple
1793  * processing and one for per-output-tuple processing. We cheat a little
1794  * by using ExecAssignExprContext() to build both.
1795  */
1796  ExecAssignExprContext(estate, &winstate->ss.ps);
1797  tmpcontext = winstate->ss.ps.ps_ExprContext;
1798  winstate->tmpcontext = tmpcontext;
1799  ExecAssignExprContext(estate, &winstate->ss.ps);
1800 
1801  /* Create long-lived context for storage of partition-local memory etc */
1802  winstate->partcontext =
1804  "WindowAgg_Partition",
1808 
1809  /*
1810  * Create mid-lived context for aggregate trans values etc.
1811  *
1812  * Note that moving aggregates each use their own private context, not
1813  * this one.
1814  */
1815  winstate->aggcontext =
1817  "WindowAgg_Aggregates",
1821 
1822  /*
1823  * tuple table initialization
1824  */
1825  ExecInitScanTupleSlot(estate, &winstate->ss);
1826  ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1827  winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1828  winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1829  winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1830  winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1831 
1832  winstate->ss.ps.targetlist = (List *)
1833  ExecInitExpr((Expr *) node->plan.targetlist,
1834  (PlanState *) winstate);
1835 
1836  /*
1837  * WindowAgg nodes never have quals, since they can only occur at the
1838  * logical top level of a query (ie, after any WHERE or HAVING filters)
1839  */
1840  Assert(node->plan.qual == NIL);
1841  winstate->ss.ps.qual = NIL;
1842 
1843  /*
1844  * initialize child nodes
1845  */
1846  outerPlan = outerPlan(node);
1847  outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1848 
1849  /*
1850  * initialize source tuple type (which is also the tuple type that we'll
1851  * store in the tuplestore and use in all our working slots).
1852  */
1853  ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1854 
1863 
1864  /*
1865  * Initialize result tuple type and projection info.
1866  */
1867  ExecAssignResultTypeFromTL(&winstate->ss.ps);
1868  ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1869 
1870  winstate->ss.ps.ps_TupFromTlist = false;
1871 
1872  /* Set up data for comparing tuples */
1873  if (node->partNumCols > 0)
1875  node->partOperators);
1876  if (node->ordNumCols > 0)
1878  node->ordOperators);
1879 
1880  /*
1881  * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1882  */
1883  numfuncs = winstate->numfuncs;
1884  numaggs = winstate->numaggs;
1885  econtext = winstate->ss.ps.ps_ExprContext;
1886  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1887  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1888 
1889  /*
1890  * allocate per-wfunc/per-agg state information.
1891  */
1892  perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1893  peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1894  winstate->perfunc = perfunc;
1895  winstate->peragg = peragg;
1896 
1897  wfuncno = -1;
1898  aggno = -1;
1899  foreach(l, winstate->funcs)
1900  {
1901  WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1902  WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1903  WindowStatePerFunc perfuncstate;
1904  AclResult aclresult;
1905  int i;
1906 
1907  if (wfunc->winref != node->winref) /* planner screwed up? */
1908  elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1909  wfunc->winref, node->winref);
1910 
1911  /* Look for a previous duplicate window function */
1912  for (i = 0; i <= wfuncno; i++)
1913  {
1914  if (equal(wfunc, perfunc[i].wfunc) &&
1915  !contain_volatile_functions((Node *) wfunc))
1916  break;
1917  }
1918  if (i <= wfuncno)
1919  {
1920  /* Found a match to an existing entry, so just mark it */
1921  wfuncstate->wfuncno = i;
1922  continue;
1923  }
1924 
1925  /* Nope, so assign a new PerAgg record */
1926  perfuncstate = &perfunc[++wfuncno];
1927 
1928  /* Mark WindowFunc state node with assigned index in the result array */
1929  wfuncstate->wfuncno = wfuncno;
1930 
1931  /* Check permission to call window function */
1932  aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1933  ACL_EXECUTE);
1934  if (aclresult != ACLCHECK_OK)
1935  aclcheck_error(aclresult, ACL_KIND_PROC,
1936  get_func_name(wfunc->winfnoid));
1938 
1939  /* Fill in the perfuncstate data */
1940  perfuncstate->wfuncstate = wfuncstate;
1941  perfuncstate->wfunc = wfunc;
1942  perfuncstate->numArguments = list_length(wfuncstate->args);
1943 
1944  fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1945  econtext->ecxt_per_query_memory);
1946  fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1947 
1948  perfuncstate->winCollation = wfunc->inputcollid;
1949 
1950  get_typlenbyval(wfunc->wintype,
1951  &perfuncstate->resulttypeLen,
1952  &perfuncstate->resulttypeByVal);
1953 
1954  /*
1955  * If it's really just a plain aggregate function, we'll emulate the
1956  * Agg environment for it.
1957  */
1958  perfuncstate->plain_agg = wfunc->winagg;
1959  if (wfunc->winagg)
1960  {
1961  WindowStatePerAgg peraggstate;
1962 
1963  perfuncstate->aggno = ++aggno;
1964  peraggstate = &winstate->peragg[aggno];
1965  initialize_peragg(winstate, wfunc, peraggstate);
1966  peraggstate->wfuncno = wfuncno;
1967  }
1968  else
1969  {
1971 
1972  winobj->winstate = winstate;
1973  winobj->argstates = wfuncstate->args;
1974  winobj->localmem = NULL;
1975  perfuncstate->winobj = winobj;
1976  }
1977  }
1978 
1979  /* Update numfuncs, numaggs to match number of unique functions found */
1980  winstate->numfuncs = wfuncno + 1;
1981  winstate->numaggs = aggno + 1;
1982 
1983  /* Set up WindowObject for aggregates, if needed */
1984  if (winstate->numaggs > 0)
1985  {
1986  WindowObject agg_winobj = makeNode(WindowObjectData);
1987 
1988  agg_winobj->winstate = winstate;
1989  agg_winobj->argstates = NIL;
1990  agg_winobj->localmem = NULL;
1991  /* make sure markptr = -1 to invalidate. It may not get used */
1992  agg_winobj->markptr = -1;
1993  agg_winobj->readptr = -1;
1994  winstate->agg_winobj = agg_winobj;
1995  }
1996 
1997  /* copy frame options to state node for easy access */
1998  winstate->frameOptions = node->frameOptions;
1999 
2000  /* initialize frame bound offset expressions */
2001  winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2002  (PlanState *) winstate);
2003  winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2004  (PlanState *) winstate);
2005 
2006  winstate->all_first = true;
2007  winstate->partition_spooled = false;
2008  winstate->more_partitions = false;
2009 
2010  return winstate;
2011 }
2012 
2013 /* -----------------
2014  * ExecEndWindowAgg
2015  * -----------------
2016  */
2017 void
2019 {
2021  int i;
2022 
2023  release_partition(node);
2024 
2028  ExecClearTuple(node->temp_slot_1);
2029  ExecClearTuple(node->temp_slot_2);
2030 
2031  /*
2032  * Free both the expr contexts.
2033  */
2034  ExecFreeExprContext(&node->ss.ps);
2035  node->ss.ps.ps_ExprContext = node->tmpcontext;
2036  ExecFreeExprContext(&node->ss.ps);
2037 
2038  for (i = 0; i < node->numaggs; i++)
2039  {
2040  if (node->peragg[i].aggcontext != node->aggcontext)
2042  }
2045 
2046  pfree(node->perfunc);
2047  pfree(node->peragg);
2048 
2049  outerPlan = outerPlanState(node);
2050  ExecEndNode(outerPlan);
2051 }
2052 
2053 /* -----------------
2054  * ExecReScanWindowAgg
2055  * -----------------
2056  */
2057 void
2059 {
2061  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2062 
2063  node->all_done = false;
2064 
2065  node->ss.ps.ps_TupFromTlist = false;
2066  node->all_first = true;
2067 
2068  /* release tuplestore et al */
2069  release_partition(node);
2070 
2071  /* release all temp tuples, but especially first_part_slot */
2075  ExecClearTuple(node->temp_slot_1);
2076  ExecClearTuple(node->temp_slot_2);
2077 
2078  /* Forget current wfunc values */
2079  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2080  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2081 
2082  /*
2083  * if chgParam of subnode is not null then plan will be re-scanned by
2084  * first ExecProcNode.
2085  */
2086  if (outerPlan->chgParam == NULL)
2087  ExecReScan(outerPlan);
2088 }
2089 
2090 /*
2091  * initialize_peragg
2092  *
2093  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2094  */
2095 static WindowStatePerAggData *
2097  WindowStatePerAgg peraggstate)
2098 {
2099  Oid inputTypes[FUNC_MAX_ARGS];
2100  int numArguments;
2101  HeapTuple aggTuple;
2102  Form_pg_aggregate aggform;
2103  Oid aggtranstype;
2104  AttrNumber initvalAttNo;
2105  AclResult aclresult;
2106  Oid transfn_oid,
2107  invtransfn_oid,
2108  finalfn_oid;
2109  bool finalextra;
2110  Expr *transfnexpr,
2111  *invtransfnexpr,
2112  *finalfnexpr;
2113  Datum textInitVal;
2114  int i;
2115  ListCell *lc;
2116 
2117  numArguments = list_length(wfunc->args);
2118 
2119  i = 0;
2120  foreach(lc, wfunc->args)
2121  {
2122  inputTypes[i++] = exprType((Node *) lfirst(lc));
2123  }
2124 
2125  aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2126  if (!HeapTupleIsValid(aggTuple))
2127  elog(ERROR, "cache lookup failed for aggregate %u",
2128  wfunc->winfnoid);
2129  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2130 
2131  /*
2132  * Figure out whether we want to use the moving-aggregate implementation,
2133  * and collect the right set of fields from the pg_attribute entry.
2134  *
2135  * If the frame head can't move, we don't need moving-aggregate code. Even
2136  * if we'd like to use it, don't do so if the aggregate's arguments (and
2137  * FILTER clause if any) contain any calls to volatile functions.
2138  * Otherwise, the difference between restarting and not restarting the
2139  * aggregation would be user-visible.
2140  */
2141  if (OidIsValid(aggform->aggminvtransfn) &&
2143  !contain_volatile_functions((Node *) wfunc))
2144  {
2145  peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2146  peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2147  peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2148  finalextra = aggform->aggmfinalextra;
2149  aggtranstype = aggform->aggmtranstype;
2150  initvalAttNo = Anum_pg_aggregate_aggminitval;
2151  }
2152  else
2153  {
2154  peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2155  peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2156  peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2157  finalextra = aggform->aggfinalextra;
2158  aggtranstype = aggform->aggtranstype;
2159  initvalAttNo = Anum_pg_aggregate_agginitval;
2160  }
2161 
2162  /*
2163  * ExecInitWindowAgg already checked permission to call aggregate function
2164  * ... but we still need to check the component functions
2165  */
2166 
2167  /* Check that aggregate owner has permission to call component fns */
2168  {
2169  HeapTuple procTuple;
2170  Oid aggOwner;
2171 
2172  procTuple = SearchSysCache1(PROCOID,
2173  ObjectIdGetDatum(wfunc->winfnoid));
2174  if (!HeapTupleIsValid(procTuple))
2175  elog(ERROR, "cache lookup failed for function %u",
2176  wfunc->winfnoid);
2177  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2178  ReleaseSysCache(procTuple);
2179 
2180  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2181  ACL_EXECUTE);
2182  if (aclresult != ACLCHECK_OK)
2183  aclcheck_error(aclresult, ACL_KIND_PROC,
2184  get_func_name(transfn_oid));
2185  InvokeFunctionExecuteHook(transfn_oid);
2186 
2187  if (OidIsValid(invtransfn_oid))
2188  {
2189  aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2190  ACL_EXECUTE);
2191  if (aclresult != ACLCHECK_OK)
2192  aclcheck_error(aclresult, ACL_KIND_PROC,
2193  get_func_name(invtransfn_oid));
2194  InvokeFunctionExecuteHook(invtransfn_oid);
2195  }
2196 
2197  if (OidIsValid(finalfn_oid))
2198  {
2199  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2200  ACL_EXECUTE);
2201  if (aclresult != ACLCHECK_OK)
2202  aclcheck_error(aclresult, ACL_KIND_PROC,
2203  get_func_name(finalfn_oid));
2204  InvokeFunctionExecuteHook(finalfn_oid);
2205  }
2206  }
2207 
2208  /* Detect how many arguments to pass to the finalfn */
2209  if (finalextra)
2210  peraggstate->numFinalArgs = numArguments + 1;
2211  else
2212  peraggstate->numFinalArgs = 1;
2213 
2214  /* resolve actual type of transition state, if polymorphic */
2215  aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2216  aggtranstype,
2217  inputTypes,
2218  numArguments);
2219 
2220  /* build expression trees using actual argument & result types */
2221  build_aggregate_transfn_expr(inputTypes,
2222  numArguments,
2223  0, /* no ordered-set window functions yet */
2224  false, /* no variadic window functions yet */
2225  wfunc->wintype,
2226  wfunc->inputcollid,
2227  transfn_oid,
2228  invtransfn_oid,
2229  &transfnexpr,
2230  &invtransfnexpr);
2231 
2232  /* set up infrastructure for calling the transfn(s) and finalfn */
2233  fmgr_info(transfn_oid, &peraggstate->transfn);
2234  fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2235 
2236  if (OidIsValid(invtransfn_oid))
2237  {
2238  fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2239  fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2240  }
2241 
2242  if (OidIsValid(finalfn_oid))
2243  {
2244  build_aggregate_finalfn_expr(inputTypes,
2245  peraggstate->numFinalArgs,
2246  aggtranstype,
2247  wfunc->wintype,
2248  wfunc->inputcollid,
2249  finalfn_oid,
2250  &finalfnexpr);
2251  fmgr_info(finalfn_oid, &peraggstate->finalfn);
2252  fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2253  }
2254 
2255  /* get info about relevant datatypes */
2256  get_typlenbyval(wfunc->wintype,
2257  &peraggstate->resulttypeLen,
2258  &peraggstate->resulttypeByVal);
2259  get_typlenbyval(aggtranstype,
2260  &peraggstate->transtypeLen,
2261  &peraggstate->transtypeByVal);
2262 
2263  /*
2264  * initval is potentially null, so don't try to access it as a struct
2265  * field. Must do it the hard way with SysCacheGetAttr.
2266  */
2267  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2268  &peraggstate->initValueIsNull);
2269 
2270  if (peraggstate->initValueIsNull)
2271  peraggstate->initValue = (Datum) 0;
2272  else
2273  peraggstate->initValue = GetAggInitVal(textInitVal,
2274  aggtranstype);
2275 
2276  /*
2277  * If the transfn is strict and the initval is NULL, make sure input type
2278  * and transtype are the same (or at least binary-compatible), so that
2279  * it's OK to use the first input value as the initial transValue. This
2280  * should have been checked at agg definition time, but we must check
2281  * again in case the transfn's strictness property has been changed.
2282  */
2283  if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2284  {
2285  if (numArguments < 1 ||
2286  !IsBinaryCoercible(inputTypes[0], aggtranstype))
2287  ereport(ERROR,
2288  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2289  errmsg("aggregate %u needs to have compatible input type and transition type",
2290  wfunc->winfnoid)));
2291  }
2292 
2293  /*
2294  * Insist that forward and inverse transition functions have the same
2295  * strictness setting. Allowing them to differ would require handling
2296  * more special cases in advance_windowaggregate and
2297  * advance_windowaggregate_base, for no discernible benefit. This should
2298  * have been checked at agg definition time, but we must check again in
2299  * case either function's strictness property has been changed.
2300  */
2301  if (OidIsValid(invtransfn_oid) &&
2302  peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2303  ereport(ERROR,
2304  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2305  errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2306 
2307  /*
2308  * Moving aggregates use their own aggcontext.
2309  *
2310  * This is necessary because they might restart at different times, so we
2311  * might never be able to reset the shared context otherwise. We can't
2312  * make it the aggregates' responsibility to clean up after themselves,
2313  * because strict aggregates must be restarted whenever we remove their
2314  * last non-NULL input, which the aggregate won't be aware is happening.
2315  * Also, just pfree()ing the transValue upon restarting wouldn't help,
2316  * since we'd miss any indirectly referenced data. We could, in theory,
2317  * make the memory allocation rules for moving aggregates different than
2318  * they have historically been for plain aggregates, but that seems grotty
2319  * and likely to lead to memory leaks.
2320  */
2321  if (OidIsValid(invtransfn_oid))
2322  peraggstate->aggcontext =
2324  "WindowAgg_AggregatePrivate",
2328  else
2329  peraggstate->aggcontext = winstate->aggcontext;
2330 
2331  ReleaseSysCache(aggTuple);
2332 
2333  return peraggstate;
2334 }
2335 
2336 static Datum
2337 GetAggInitVal(Datum textInitVal, Oid transtype)
2338 {
2339  Oid typinput,
2340  typioparam;
2341  char *strInitVal;
2342  Datum initVal;
2343 
2344  getTypeInputInfo(transtype, &typinput, &typioparam);
2345  strInitVal = TextDatumGetCString(textInitVal);
2346  initVal = OidInputFunctionCall(typinput, strInitVal,
2347  typioparam, -1);
2348  pfree(strInitVal);
2349  return initVal;
2350 }
2351 
2352 /*
2353  * are_peers
2354  * compare two rows to see if they are equal according to the ORDER BY clause
2355  *
2356  * NB: this does not consider the window frame mode.
2357  */
2358 static bool
2360  TupleTableSlot *slot2)
2361 {
2362  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2363 
2364  /* If no ORDER BY, all rows are peers with each other */
2365  if (node->ordNumCols == 0)
2366  return true;
2367 
2368  return execTuplesMatch(slot1, slot2,
2369  node->ordNumCols, node->ordColIdx,
2370  winstate->ordEqfunctions,
2371  winstate->tmpcontext->ecxt_per_tuple_memory);
2372 }
2373 
2374 /*
2375  * window_gettupleslot
2376  * Fetch the pos'th tuple of the current partition into the slot,
2377  * using the winobj's read pointer
2378  *
2379  * Returns true if successful, false if no such row
2380  */
2381 static bool
2383 {
2384  WindowAggState *winstate = winobj->winstate;
2385  MemoryContext oldcontext;
2386 
2387  /* Don't allow passing -1 to spool_tuples here */
2388  if (pos < 0)
2389  return false;
2390 
2391  /* If necessary, fetch the tuple into the spool */
2392  spool_tuples(winstate, pos);
2393 
2394  if (pos >= winstate->spooled_rows)
2395  return false;
2396 
2397  if (pos < winobj->markpos)
2398  elog(ERROR, "cannot fetch row before WindowObject's mark position");
2399 
2401 
2402  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2403 
2404  /*
2405  * Advance or rewind until we are within one tuple of the one we want.
2406  */
2407  if (winobj->seekpos < pos - 1)
2408  {
2409  if (!tuplestore_skiptuples(winstate->buffer,
2410  pos - 1 - winobj->seekpos,
2411  true))
2412  elog(ERROR, "unexpected end of tuplestore");
2413  winobj->seekpos = pos - 1;
2414  }
2415  else if (winobj->seekpos > pos + 1)
2416  {
2417  if (!tuplestore_skiptuples(winstate->buffer,
2418  winobj->seekpos - (pos + 1),
2419  false))
2420  elog(ERROR, "unexpected end of tuplestore");
2421  winobj->seekpos = pos + 1;
2422  }
2423  else if (winobj->seekpos == pos)
2424  {
2425  /*
2426  * There's no API to refetch the tuple at the current position. We
2427  * have to move one tuple forward, and then one backward. (We don't
2428  * do it the other way because we might try to fetch the row before
2429  * our mark, which isn't allowed.) XXX this case could stand to be
2430  * optimized.
2431  */
2432  tuplestore_advance(winstate->buffer, true);
2433  winobj->seekpos++;
2434  }
2435 
2436  /*
2437  * Now we should be on the tuple immediately before or after the one we
2438  * want, so just fetch forwards or backwards as appropriate.
2439  */
2440  if (winobj->seekpos > pos)
2441  {
2442  if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2443  elog(ERROR, "unexpected end of tuplestore");
2444  winobj->seekpos--;
2445  }
2446  else
2447  {
2448  if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2449  elog(ERROR, "unexpected end of tuplestore");
2450  winobj->seekpos++;
2451  }
2452 
2453  Assert(winobj->seekpos == pos);
2454 
2455  MemoryContextSwitchTo(oldcontext);
2456 
2457  return true;
2458 }
2459 
2460 
2461 /***********************************************************************
2462  * API exposed to window functions
2463  ***********************************************************************/
2464 
2465 
2466 /*
2467  * WinGetPartitionLocalMemory
2468  * Get working memory that lives till end of partition processing
2469  *
2470  * On first call within a given partition, this allocates and zeroes the
2471  * requested amount of space. Subsequent calls just return the same chunk.
2472  *
2473  * Memory obtained this way is normally used to hold state that should be
2474  * automatically reset for each new partition. If a window function wants
2475  * to hold state across the whole query, fcinfo->fn_extra can be used in the
2476  * usual way for that.
2477  */
2478 void *
2480 {
2481  Assert(WindowObjectIsValid(winobj));
2482  if (winobj->localmem == NULL)
2483  winobj->localmem =
2485  return winobj->localmem;
2486 }
2487 
2488 /*
2489  * WinGetCurrentPosition
2490  * Return the current row's position (counting from 0) within the current
2491  * partition.
2492  */
2493 int64
2495 {
2496  Assert(WindowObjectIsValid(winobj));
2497  return winobj->winstate->currentpos;
2498 }
2499 
2500 /*
2501  * WinGetPartitionRowCount
2502  * Return total number of rows contained in the current partition.
2503  *
2504  * Note: this is a relatively expensive operation because it forces the
2505  * whole partition to be "spooled" into the tuplestore at once. Once
2506  * executed, however, additional calls within the same partition are cheap.
2507  */
2508 int64
2510 {
2511  Assert(WindowObjectIsValid(winobj));
2512  spool_tuples(winobj->winstate, -1);
2513  return winobj->winstate->spooled_rows;
2514 }
2515 
2516 /*
2517  * WinSetMarkPosition
2518  * Set the "mark" position for the window object, which is the oldest row
2519  * number (counting from 0) it is allowed to fetch during all subsequent
2520  * operations within the current partition.
2521  *
2522  * Window functions do not have to call this, but are encouraged to move the
2523  * mark forward when possible to keep the tuplestore size down and prevent
2524  * having to spill rows to disk.
2525  */
2526 void
2527 WinSetMarkPosition(WindowObject winobj, int64 markpos)
2528 {
2529  WindowAggState *winstate;
2530 
2531  Assert(WindowObjectIsValid(winobj));
2532  winstate = winobj->winstate;
2533 
2534  if (markpos < winobj->markpos)
2535  elog(ERROR, "cannot move WindowObject's mark position backward");
2536  tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2537  if (markpos > winobj->markpos)
2538  {
2539  tuplestore_skiptuples(winstate->buffer,
2540  markpos - winobj->markpos,
2541  true);
2542  winobj->markpos = markpos;
2543  }
2544  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2545  if (markpos > winobj->seekpos)
2546  {
2547  tuplestore_skiptuples(winstate->buffer,
2548  markpos - winobj->seekpos,
2549  true);
2550  winobj->seekpos = markpos;
2551  }
2552 }
2553 
2554 /*
2555  * WinRowsArePeers
2556  * Compare two rows (specified by absolute position in window) to see
2557  * if they are equal according to the ORDER BY clause.
2558  *
2559  * NB: this does not consider the window frame mode.
2560  */
2561 bool
2562 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2563 {
2564  WindowAggState *winstate;
2565  WindowAgg *node;
2566  TupleTableSlot *slot1;
2567  TupleTableSlot *slot2;
2568  bool res;
2569 
2570  Assert(WindowObjectIsValid(winobj));
2571  winstate = winobj->winstate;
2572  node = (WindowAgg *) winstate->ss.ps.plan;
2573 
2574  /* If no ORDER BY, all rows are peers; don't bother to fetch them */
2575  if (node->ordNumCols == 0)
2576  return true;
2577 
2578  slot1 = winstate->temp_slot_1;
2579  slot2 = winstate->temp_slot_2;
2580 
2581  if (!window_gettupleslot(winobj, pos1, slot1))
2582  elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2583  pos1);
2584  if (!window_gettupleslot(winobj, pos2, slot2))
2585  elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2586  pos2);
2587 
2588  res = are_peers(winstate, slot1, slot2);
2589 
2590  ExecClearTuple(slot1);
2591  ExecClearTuple(slot2);
2592 
2593  return res;
2594 }
2595 
2596 /*
2597  * WinGetFuncArgInPartition
2598  * Evaluate a window function's argument expression on a specified
2599  * row of the partition. The row is identified in lseek(2) style,
2600  * i.e. relative to the current, first, or last row.
2601  *
2602  * argno: argument number to evaluate (counted from 0)
2603  * relpos: signed rowcount offset from the seek position
2604  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2605  * set_mark: If the row is found and set_mark is true, the mark is moved to
2606  * the row as a side-effect.
2607  * isnull: output argument, receives isnull status of result
2608  * isout: output argument, set to indicate whether target row position
2609  * is out of partition (can pass NULL if caller doesn't care about this)
2610  *
2611  * Specifying a nonexistent row is not an error, it just causes a null result
2612  * (plus setting *isout true, if isout isn't NULL).
2613  */
2614 Datum
2616  int relpos, int seektype, bool set_mark,
2617  bool *isnull, bool *isout)
2618 {
2619  WindowAggState *winstate;
2620  ExprContext *econtext;
2621  TupleTableSlot *slot;
2622  bool gottuple;
2623  int64 abs_pos;
2624 
2625  Assert(WindowObjectIsValid(winobj));
2626  winstate = winobj->winstate;
2627  econtext = winstate->ss.ps.ps_ExprContext;
2628  slot = winstate->temp_slot_1;
2629 
2630  switch (seektype)
2631  {
2632  case WINDOW_SEEK_CURRENT:
2633  abs_pos = winstate->currentpos + relpos;
2634  break;
2635  case WINDOW_SEEK_HEAD:
2636  abs_pos = relpos;
2637  break;
2638  case WINDOW_SEEK_TAIL:
2639  spool_tuples(winstate, -1);
2640  abs_pos = winstate->spooled_rows - 1 + relpos;
2641  break;
2642  default:
2643  elog(ERROR, "unrecognized window seek type: %d", seektype);
2644  abs_pos = 0; /* keep compiler quiet */
2645  break;
2646  }
2647 
2648  gottuple = window_gettupleslot(winobj, abs_pos, slot);
2649 
2650  if (!gottuple)
2651  {
2652  if (isout)
2653  *isout = true;
2654  *isnull = true;
2655  return (Datum) 0;
2656  }
2657  else
2658  {
2659  if (isout)
2660  *isout = false;
2661  if (set_mark)
2662  {
2663  int frameOptions = winstate->frameOptions;
2664  int64 mark_pos = abs_pos;
2665 
2666  /*
2667  * In RANGE mode with a moving frame head, we must not let the
2668  * mark advance past frameheadpos, since that row has to be
2669  * fetchable during future update_frameheadpos calls.
2670  *
2671  * XXX it is very ugly to pollute window functions' marks with
2672  * this consideration; it could for instance mask a logic bug that
2673  * lets a window function fetch rows before what it had claimed
2674  * was its mark. Perhaps use a separate mark for frame head
2675  * probes?
2676  */
2677  if ((frameOptions & FRAMEOPTION_RANGE) &&
2678  !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2679  {
2680  update_frameheadpos(winobj, winstate->temp_slot_2);
2681  if (mark_pos > winstate->frameheadpos)
2682  mark_pos = winstate->frameheadpos;
2683  }
2684  WinSetMarkPosition(winobj, mark_pos);
2685  }
2686  econtext->ecxt_outertuple = slot;
2687  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2688  econtext, isnull, NULL);
2689  }
2690 }
2691 
2692 /*
2693  * WinGetFuncArgInFrame
2694  * Evaluate a window function's argument expression on a specified
2695  * row of the window frame. The row is identified in lseek(2) style,
2696  * i.e. relative to the current, first, or last row.
2697  *
2698  * argno: argument number to evaluate (counted from 0)
2699  * relpos: signed rowcount offset from the seek position
2700  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2701  * set_mark: If the row is found and set_mark is true, the mark is moved to
2702  * the row as a side-effect.
2703  * isnull: output argument, receives isnull status of result
2704  * isout: output argument, set to indicate whether target row position
2705  * is out of frame (can pass NULL if caller doesn't care about this)
2706  *
2707  * Specifying a nonexistent row is not an error, it just causes a null result
2708  * (plus setting *isout true, if isout isn't NULL).
2709  */
2710 Datum
2712  int relpos, int seektype, bool set_mark,
2713  bool *isnull, bool *isout)
2714 {
2715  WindowAggState *winstate;
2716  ExprContext *econtext;
2717  TupleTableSlot *slot;
2718  bool gottuple;
2719  int64 abs_pos;
2720 
2721  Assert(WindowObjectIsValid(winobj));
2722  winstate = winobj->winstate;
2723  econtext = winstate->ss.ps.ps_ExprContext;
2724  slot = winstate->temp_slot_1;
2725 
2726  switch (seektype)
2727  {
2728  case WINDOW_SEEK_CURRENT:
2729  abs_pos = winstate->currentpos + relpos;
2730  break;
2731  case WINDOW_SEEK_HEAD:
2732  update_frameheadpos(winobj, slot);
2733  abs_pos = winstate->frameheadpos + relpos;
2734  break;
2735  case WINDOW_SEEK_TAIL:
2736  update_frametailpos(winobj, slot);
2737  abs_pos = winstate->frametailpos + relpos;
2738  break;
2739  default:
2740  elog(ERROR, "unrecognized window seek type: %d", seektype);
2741  abs_pos = 0; /* keep compiler quiet */
2742  break;
2743  }
2744 
2745  gottuple = window_gettupleslot(winobj, abs_pos, slot);
2746  if (gottuple)
2747  gottuple = row_is_in_frame(winstate, abs_pos, slot);
2748 
2749  if (!gottuple)
2750  {
2751  if (isout)
2752  *isout = true;
2753  *isnull = true;
2754  return (Datum) 0;
2755  }
2756  else
2757  {
2758  if (isout)
2759  *isout = false;
2760  if (set_mark)
2761  {
2762  int frameOptions = winstate->frameOptions;
2763  int64 mark_pos = abs_pos;
2764 
2765  /*
2766  * In RANGE mode with a moving frame head, we must not let the
2767  * mark advance past frameheadpos, since that row has to be
2768  * fetchable during future update_frameheadpos calls.
2769  *
2770  * XXX it is very ugly to pollute window functions' marks with
2771  * this consideration; it could for instance mask a logic bug that
2772  * lets a window function fetch rows before what it had claimed
2773  * was its mark. Perhaps use a separate mark for frame head
2774  * probes?
2775  */
2776  if ((frameOptions & FRAMEOPTION_RANGE) &&
2777  !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2778  {
2779  update_frameheadpos(winobj, winstate->temp_slot_2);
2780  if (mark_pos > winstate->frameheadpos)
2781  mark_pos = winstate->frameheadpos;
2782  }
2783  WinSetMarkPosition(winobj, mark_pos);
2784  }
2785  econtext->ecxt_outertuple = slot;
2786  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2787  econtext, isnull, NULL);
2788  }
2789 }
2790 
2791 /*
2792  * WinGetFuncArgCurrent
2793  * Evaluate a window function's argument expression on the current row.
2794  *
2795  * argno: argument number to evaluate (counted from 0)
2796  * isnull: output argument, receives isnull status of result
2797  *
2798  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2799  * WinGetFuncArgInFrame targeting the current row, because it will succeed
2800  * even if the WindowObject's mark has been set beyond the current row.
2801  * This should generally be used for "ordinary" arguments of a window
2802  * function, such as the offset argument of lead() or lag().
2803  */
2804 Datum
2805 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2806 {
2807  WindowAggState *winstate;
2808  ExprContext *econtext;
2809 
2810  Assert(WindowObjectIsValid(winobj));
2811  winstate = winobj->winstate;
2812 
2813  econtext = winstate->ss.ps.ps_ExprContext;
2814 
2815  econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2816  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2817  econtext, isnull, NULL);
2818 }
signed short int16
Definition: c.h:252
int ordNumCols
Definition: plannodes.h:735
void tuplestore_puttupleslot(Tuplestorestate *state, TupleTableSlot *slot)
Definition: tuplestore.c:693
#define NIL
Definition: pg_list.h:69
Datum WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
Definition: fmgr.h:53
List * qual
Definition: plannodes.h:122
bool WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
MemoryContext curaggcontext
Definition: execnodes.h:1901
static struct @76 value
ExprState * endOffset
Definition: execnodes.h:1895
TupleTableSlot * ExecProcNode(PlanState *node)
Definition: execProcnode.c:374
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:203
void * WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:855
Datum * ecxt_aggvalues
Definition: execnodes.h:138
bool tuplestore_advance(Tuplestorestate *state, bool forward)
Definition: tuplestore.c:1093
struct WindowStatePerAggData * WindowStatePerAgg
Definition: execnodes.h:1866
int64 WinGetPartitionRowCount(WindowObject winobj)
Datum startOffsetValue
Definition: execnodes.h:1896
static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, Datum *result, bool *isnull)
void ExecInitScanTupleSlot(EState *estate, ScanState *scanstate)
Definition: execTuples.c:845
ExprState * aggfilter
Definition: execnodes.h:641
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1059
#define Anum_pg_aggregate_agginitval
Definition: pg_aggregate.h:113
static bool window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
List * args
Definition: primnodes.h:344
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2716
Oid GetUserId(void)
Definition: miscinit.c:282
void tuplestore_trim(Tuplestorestate *state)
Definition: tuplestore.c:1343
ScanState ss
Definition: execnodes.h:1870
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:614
Definition: plannodes.h:96
void tuplestore_set_eflags(Tuplestorestate *state, int eflags)
Definition: tuplestore.c:357
ExprContext * ps_ExprContext
Definition: execnodes.h:1058
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:128
#define FRAMEOPTION_START_VALUE
Definition: parsenodes.h:501
AttrNumber * ordColIdx
Definition: plannodes.h:736
void ExecReScan(PlanState *node)
Definition: execAmi.c:73
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:442
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ExprState xprstate
Definition: execnodes.h:639
List * qual
Definition: execnodes.h:1042
Definition: nodes.h:492
bool execTuplesMatch(TupleTableSlot *slot1, TupleTableSlot *slot2, int numCols, AttrNumber *matchColIdx, FmgrInfo *eqfunctions, MemoryContext evalContext)
Definition: execGrouping.c:54
int64 aggregatedupto
Definition: execnodes.h:1891
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:849
List * targetlist
Definition: execnodes.h:1041
struct WindowStatePerFuncData WindowStatePerFuncData
bool frametail_valid
Definition: execnodes.h:1913
TupleTableSlot * ExecProject(ProjectionInfo *projInfo, ExprDoneCond *isDone)
Definition: execQual.c:5505
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1251
static void release_partition(WindowAggState *winstate)
struct WindowStatePerFuncData * WindowStatePerFunc
Definition: execnodes.h:1865
FmgrInfo * partEqfunctions
Definition: execnodes.h:1879
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:1994
WindowFuncExprState * wfuncstate
Definition: nodeWindowAgg.c:78
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:1009
EState * state
Definition: execnodes.h:1029
unsigned int Oid
Definition: postgres_ext.h:31
NodeTag
Definition: nodes.h:26
Index winref
Definition: primnodes.h:346
#define FRAMEOPTION_START_UNBOUNDED_PRECEDING
Definition: parsenodes.h:490
TupleTableSlot * temp_slot_1
Definition: execnodes.h:1921
#define OidIsValid(objectId)
Definition: c.h:530
WindowStatePerFunc perfunc
Definition: execnodes.h:1877
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:696
Oid * ordOperators
Definition: plannodes.h:737
static void spool_tuples(WindowAggState *winstate, int64 pos)
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:142
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:141
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:435
#define WINDOW_SEEK_TAIL
Definition: windowapi.h:34
TupleTableSlot * first_part_slot
Definition: execnodes.h:1916
static bool advance_windowaggregate_base(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
#define FUNC_MAX_ARGS
ExprContext * tmpcontext
Definition: execnodes.h:1902
PlanState ps
Definition: execnodes.h:1248
struct WindowObjectData * agg_winobj
Definition: execnodes.h:1888
Node * startOffset
Definition: plannodes.h:739
bool tuplestore_in_memory(Tuplestorestate *state)
Definition: tuplestore.c:1438
int64 frameheadpos
Definition: execnodes.h:1885
FmgrInfo * flinfo
Definition: fmgr.h:71
static WindowStatePerAggData * initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, WindowStatePerAgg peraggstate)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execQual.c:4452
void pfree(void *pointer)
Definition: mcxt.c:995
TupleTableSlot * ExecWindowAgg(WindowAggState *winstate)
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
bool fn_strict
Definition: fmgr.h:58
Expr * expr
Definition: execnodes.h:578
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1380
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:835
struct WindowObjectData WindowObjectData
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:160
static bool row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
#define DatumGetInt64(X)
Definition: postgres.h:615
#define EXEC_FLAG_BACKWARD
Definition: executor.h:59
#define outerPlanState(node)
Definition: execnodes.h:1072
TupleTableSlot * temp_slot_2
Definition: execnodes.h:1922
Datum WinGetFuncArgInPartition(WindowObject winobj, int argno, int relpos, int seektype, bool set_mark, bool *isnull, bool *isout)
Datum endOffsetValue
Definition: execnodes.h:1897
void * list_nth(const List *list, int n)
Definition: list.c:410
WindowStatePerAgg peragg
Definition: execnodes.h:1878
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:129
ExprState * startOffset
Definition: execnodes.h:1894
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:668
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3392
int64 aggregatedbase
Definition: execnodes.h:1890
Node * endOffset
Definition: plannodes.h:740
#define FRAMEOPTION_END_CURRENT_ROW
Definition: parsenodes.h:495
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:96
#define FRAMEOPTION_START_VALUE_PRECEDING
Definition: parsenodes.h:496
#define DatumGetBool(X)
Definition: postgres.h:401
#define FRAMEOPTION_END_UNBOUNDED_FOLLOWING
Definition: parsenodes.h:493
#define TupIsNull(slot)
Definition: tuptable.h:138
int partNumCols
Definition: plannodes.h:732
Oid winfnoid
Definition: primnodes.h:340
bool argnull[FUNC_MAX_ARGS]
Definition: fmgr.h:78
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
Tuplestorestate * buffer
Definition: execnodes.h:1881
MemoryContext aggcontext
Definition: execnodes.h:1900
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
Definition: fmgr.c:170
#define FRAMEOPTION_START_CURRENT_ROW
Definition: parsenodes.h:494
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2567
#define ereport(elevel, rest)
Definition: elog.h:122
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:128
Bitmapset * chgParam
Definition: execnodes.h:1052
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:179
bool IsBinaryCoercible(Oid srctype, Oid targettype)
#define outerPlan(node)
Definition: plannodes.h:151
Datum ExecEvalExprSwitchContext(ExprState *expression, ExprContext *econtext, bool *isNull, ExprDoneCond *isDone)
Definition: execQual.c:4404
int64 WinGetCurrentPosition(WindowObject winobj)
static void eval_windowaggregates(WindowAggState *winstate)
int64 spooled_rows
Definition: execnodes.h:1883
ExprDoneCond
Definition: execnodes.h:159
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:88
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
#define WINDOW_SEEK_HEAD
Definition: windowapi.h:33
bool * ecxt_aggnulls
Definition: execnodes.h:139
#define TextDatumGetCString(d)
Definition: builtins.h:807
#define Anum_pg_aggregate_aggminitval
Definition: pg_aggregate.h:114
WindowAggState * winstate
Definition: nodeWindowAgg.c:62
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:436
void * palloc0(Size size)
Definition: mcxt.c:923
AclResult
Definition: acl.h:169
uintptr_t Datum
Definition: postgres.h:374
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:990
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:250
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1152
int work_mem
Definition: globals.c:112
TupleTableSlot * agg_row_slot
Definition: execnodes.h:1920
static void initialize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
AttrNumber * partColIdx
Definition: plannodes.h:733
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:83
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:787
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
Plan * plan
Definition: execnodes.h:1027
static void begin_partition(WindowAggState *winstate)
#define InvalidOid
Definition: postgres_ext.h:36
bool more_partitions
Definition: execnodes.h:1909
Datum arg[FUNC_MAX_ARGS]
Definition: fmgr.h:77
#define WindowObjectIsValid(winobj)
Definition: windowapi.h:41
FmgrInfo * ordEqfunctions
Definition: execnodes.h:1880
#define Max(x, y)
Definition: c.h:792
Oid * partOperators
Definition: plannodes.h:734
TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: execTuples.c:798
#define makeNode(_type_)
Definition: nodes.h:540
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1061
void ExecEndWindowAgg(WindowAggState *node)
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:124
#define WINDOW_SEEK_CURRENT
Definition: windowapi.h:32
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
int64 frametailpos
Definition: execnodes.h:1886
#define NULL
Definition: c.h:226
#define FRAMEOPTION_RANGE
Definition: parsenodes.h:487
#define Assert(condition)
Definition: c.h:667
#define lfirst(lc)
Definition: pg_list.h:106
Index winref
Definition: plannodes.h:731
#define EXEC_FLAG_MARK
Definition: executor.h:60
static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
FormData_pg_aggregate * Form_pg_aggregate
Definition: pg_aggregate.h:87
bool MemoryContextContains(MemoryContext context, void *pointer)
Definition: mcxt.c:615
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition: parse_agg.c:1857
size_t Size
Definition: c.h:352
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:413
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:112
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:42
static int list_length(const List *l)
Definition: pg_list.h:89
MemoryContext aggcontext
void tuplestore_end(Tuplestorestate *state)
Definition: tuplestore.c:450
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:1969
#define FRAMEOPTION_ROWS
Definition: parsenodes.h:488
bool tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
Definition: tuplestore.c:1118
WindowAggState * ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
Oid inputcollid
Definition: primnodes.h:343
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:127
Datum WinGetFuncArgInFrame(WindowObject winobj, int argno, int relpos, int seektype, bool set_mark, bool *isnull, bool *isout)
#define FRAMEOPTION_END_VALUE_PRECEDING
Definition: parsenodes.h:497
#define INT64_FORMAT
Definition: c.h:312
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
List * targetlist
Definition: plannodes.h:121
int64 currentpos
Definition: execnodes.h:1884
#define DatumGetPointer(X)
Definition: postgres.h:557
Oid wintype
Definition: primnodes.h:341
int tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
Definition: tuplestore.c:381
static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, TupleTableSlot *slot2)
bool partition_spooled
Definition: execnodes.h:1906
void ExecAssignScanTypeFromOuterPlan(ScanState *scanstate)
Definition: execUtils.c:732
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool framehead_valid
Definition: execnodes.h:1911
#define ACL_EXECUTE
Definition: parsenodes.h:70
bool winagg
Definition: primnodes.h:348
AclResult pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4435
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:143
int i
Plan plan
Definition: plannodes.h:730
void tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
Definition: tuplestore.c:470
struct WindowStatePerAggData WindowStatePerAggData
void WinSetMarkPosition(WindowObject winobj, int64 markpos)
void ExecReScanWindowAgg(WindowAggState *node)
void * arg
MemoryContext partcontext
Definition: execnodes.h:1899
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:144
#define elog
Definition: elog.h:218
int frameOptions
Definition: plannodes.h:738
FmgrInfo * execTuplesMatchPrepare(int numCols, Oid *eqOperators)
Definition: execGrouping.c:189
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:136
bool ps_TupFromTlist
Definition: execnodes.h:1060
Definition: pg_list.h:45
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:2039
int16 AttrNumber
Definition: attnum.h:21
#define FRAMEOPTION_END_VALUE
Definition: parsenodes.h:503
static void eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, Datum *result, bool *isnull)
#define ResetExprContext(econtext)
Definition: executor.h:316
Oid resolve_aggregate_transtype(Oid aggfuncid, Oid aggtranstype, Oid *inputTypes, int numArguments)
Definition: parse_agg.c:1802
#define ExecEvalExpr(expr, econtext, isNull, isDone)
Definition: executor.h:72