Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As described in the FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we intend to deprecate and remove the DataSet API in the future in favor favour of the DataStream API. The switch between DataSet and DataStream Using the DataStream API for bounded applications should not result in degraded performance when running bounded Flink . Ideally, we can stay in the same performance ballpark as equivalent DataSet programs.

The DataStream API relies on StateBackends for performing reduce/aggregate-like operations. In streaming scenarios, there is no single point in time where we can perform the calculations and emit results downstream. The calculations are performed in an incremental way and we need to keep all the intermediate results at hand. If the state does not fit into memory users are forced to use RocksDB, which comes with the penalty of serializing records, often spilling to disk, costly metadata bookkeeping etc. This is not necessarily efficient for the bounded style execution where we have a single point in time after which the results will not change.

...

We performed some benchmarks to compare results of the different execution styles and using different state backends. The program we performed the tests for is  is a version of a WordCount, which in case of the DataStream API is a WordCount in a global window. The job counts 40_000_000 incoming records with 4_000_000 different keys. An interesting observation is that the performance of HeapStateBackend depends heavily on the number of keys. With a high number of keys it suffers a significant penalty and becomes even less performant for that particular case than the sorting approach. The results are as follows:

...

We do want this change to be mostly transparent to the user code. Therefore, we will introduce . We will introduce a sorting step (with potential spilling, reusing the UnilateralSortMerger implementation) before every keyed operator for sorting/grouping inputs by their keys. This will allow us to process records in per-key groups, which will enable us to use a simplified implementation of a StateBackend that is not organized in key groups and only ever keeps values for a single key.

...

We will replace the InternalTimeServiceManager with a customized one which will keep timers only for the current key, and it will fire timers only at the end of each key. The value of InternalTimeService#getCurrentWatermark  will  will be Watermark.MIN_VALUE  when  when processing a key. and will be set to Watermark.MAX_VALUE  when  when firing triggers.

User-visible implications

...

  1. Users chosen StateBackend will not be used in a bounded style execution, instead we will use the SingleKeyStateBackend
  2. Checkpointing will not work in the bounded execution scenario. We do not keep any state that can be checkpointed.
  3. We do process each key separately, this has to be considered for custom implementations of StreamOperators. Some of the custom StreamOperators might not work in the bounded execution.
    1. the Watermark can jump from the MAX_VALUE (at the end of a key) to MIN_VALUE (at the beginning of the next key). This might break implementations of operators that cache the value of a Watermark and assume it can never move back.
    2. Timers will fire first in the key order and then in timestamp order within the key.
    3. Operators can no longer freely switch processing between different keys. Methods such as e.g. setCurrentKey, applyToAllKeys will not work correctly. Switching the current key to a new value will result in pruning the current state and potentially even firing the timers.

...