Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The performance optimization is a long-term plan. As the window operations are based on windowed data which is an infinite sequence of bounded data. The SLA of such jobs are usually the window size. So we can utilize the duration of window size to buffer data, pre-aggregate them, reduce operation on state, and improve throughput. In the long term, the window operation can work in a continuously batch mode. The source can buffer input data until all data of a specific window has arrived (by watermark), then the source emit the window data to the downstream operators. All the downstream operators are batch operators and can be scheduled when each window data is ready and don't need the state mechanism. The downstream operators process the bounded window data and emit window result in a batch fashion. That means we can utilize the existing optimizations in batch modebatch operator implementations and optimizations. This approach would be quite efficient for throughput, because we don't need to access states. However, this is blocked by several runtime building blocks, such as continuously batch mode, the combination of streaming and batch operators, etc...

Therefore, in the short term, we have to use the streaming operators and work in the stream mode. The optimization idea is to buffer batch data as much as possible. Similar to the current mini-batch mechanisim for unbounded aggregates, bufferred batch data can be trigger only when (1) watermark of window end is arrived, or(2) reserved memory for buffering is full, or (3) checkpoint barrier is arrived. We will reserve a pre-defined memory space (similar to batch operator to allocate memory from MemoryManager) for buffering data. The buffered data can be records or pre-aggregated accumulators, this is depends on the operator. Users can tune the config configuration of the preserved memory size. In this way, we can aggregate lots of as much data as possible before access states to improve the thoughout. This will be more efficient than the current mini-batch on unbounded aggregates, because we can buffer more data. In the other side, due to we will incrementally calculate input data, instead of calculating the whole window data in one shot, it will take a lot less computation when window is closed. Thus, the latency from window is closed to window result is emit will be shorter, and the cluster CPU load will be more smooth and steady

For overlapping windows, e.g. HOP, CUMULATE, we can use the shared pane state to reduce state size and operations. For example, hopping with 1 hour window size and 10 minutes slide size, we can store state for every 10 minutes pane size, this can reduce to store lots of duplicate records for window join. Another example, cumulating with 1 day max window size and 1 hour step size, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00), this can greatly reduce state size and operations.

...