Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. We need to bump up Calcite to v1.25+ first.
  2. We will not introduce any new logical nodes as there no new relational algebras.
  3. We will introduce new physical nodes for window join, window aggregate, window rank which works on existing window_start, window_end columns.
  4. We will introdcue a MetadataHandler to infer window start and window end columns. This metadata will be used to convert from logical join/aggregate/rank to physical window join/aggregate/rank.
  5. If the input node of window join/aggregate/rank is the windowing TVF, we can merge the TVF into the wiindow node. So that the window assigning is co-locate in the same operator, this can help us to have better performance by reusing window state, reducing records shuffling. For example, for cumulating window, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00).
  6. We will reuse the existing implementation of WindowOperator (i.e. window aggregate operator).
  7. We will support the mini-batch, local-global, distinct-split optimizations on the window operators to have better performance. Currently, they are only supported in non-window aggregates.

Performance Optimization

The performance optimization is a long-term plan. As the window operations are based on windowed data which is an infinite sequence of bounded data. The SLA of such jobs are usually the window size. So we can utilize the duration of window size to buffer data, pre-aggregate them, reduce operation on state, and improve throughput. In the long term, the window operation can work in a continuously batch mode. The source can buffer input data until all data of a specific window has arrived (by watermark), then the source emit the window data to the downstream operators. All the downstream operators are batch operators and may be scheduled when each window data is ready and don't need the state mechanism. This approach would be quite efficient. However, this is blocked by several runtime building blocks, such as continuously batch mode, the combination of streaming and batch operators, etc...

Therefore, in the short term, we have to use the streaming operators and work in the stream mode. The optimization idea is to buffer batch data as much as possible. Similar to the current mini-batch mechanisim, bufferred batch data can be trigger only when (1) watermark of window end is arrived, (2) reserved memory for buffering is full, (3) checkpoint barrier is arrived. We will reserve a pre-defined memory space (similar to batch operator to allocate memory from MemoryManager) for buffering data. The buffered data can be records or pre-aggregated accumulators, this is depends on the operator. Users can tune the config of the preserved memory size. In this way, we can aggregate lots of data before access states to improve the thoughout. 

For overlapping windows, e.g. HOP, CUMULATE, we can use the shared pane state to reduce state size and operations. For example, hopping with 1 hour window size and 10 minutes slide size, we can store state for every 10 minutes pane size, this can reduce to store lots of duplicate records for window join. Another example, cumulating with 1 day max window size and 1 hour step size, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00), this can greatly reduce state size and operations.


Compatibility, Deprecation, and Migration Plan

...