Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot and memory resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.

In this FLIP, we propose to optimize task deployment and resource utilization for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator will be blocking, which effectively let Flink schedule and execute this operator in batch mode.

In addition, this FLIP also adds EndOfStreamTrigger in GlobalWindows. GlobalWindows with EndOfStreamTrigger can be used with the DataStream API to specify whether the window will end only at the end of its inputs. DataStream programs (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.

...

Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

...

5) When DataStream API like DataStream#CoGroup is invoked with GlobalWindows andEndOfStreamTrigger and EndOfStreamTrigger as the window assigner and trigger, The operator will have OperatorAttributes with isOutputOnEOF = true to achieve higher throughput by using the optimizations in batch mode.

...

  • It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.
  • Setting the upstream operators of the dam operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.
  • Optimizing the implementation of frequently used dam operators,  such as aggregate/CoGroup/Join/..., can achieve higher throughput by using the optimizations currently done in batch mode. For example, we can instantiate an internal sorter in the operator, and it will not have to invoke WindowAssigner#assignWindows or triggerContext#onElement for each input record.