Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Let's consider a use-case that requires a job graph shown where user wants to bootstrap job state (operatorA and operatorB in the figure below. pipelinedRegionA will aggregate ) using records from a bounded source. There is no need to emit any records when the job is still processing records from the two bounded inputs using aggregate/join/cogroup operations and emit records only after all its bounded inputs have ended. And operatorB can start to process records from the input_3 only after having received inputs from pipelinedRegionA, and will need to run continuously since input_3 is unboundedbounded source. After this bootstrap phase is done (i.e. all records from the bounded source have been processed by operatorA and operatorB), user wants to process records from the unbounded source and emit results continuously in real-time. Additionally, we assume operatorA involves aggregation computation (e.g. co-group) and its throughput in batch mode can be 10X faster than in stream mode (see Benchmark Results below).

Currently, supporting such use-case requires the use of the STREAMING execution mode, where all operators must to be deployed at the start of the job , and operators cannot apply typical "batch-mode" optimizations like sort joinand run in the streaming mode. This approach has a couple drawbacks: 1) operatorA is 10X slower than when it is run in batch mode; 2) operatorB has to be deployed while not being able to do any useful work; 2) operatorB might waste a lot of memory and disk IO on buffering the records from the unbounded input until it receives records from the pipelinedRegionA; and 3) the throughput of aggregation operations (e.g. co-group) can be 10X worse in stream mode than in batch mode.it is a waste of slot resource to deploy operatorB when it can not do more useful work than waiting for results from operatorA; and 3) operatorB might end up spilling a lot of records from the unbounded source to its local disk until it has received all records from operatorA.

In this FLIP, we propose adding public APIs to support running a mixture of batch-mode and stream-mode operators in the same job where the stream part of the use-case can be executed after batch-part of the use-case, with both parts using the mode the best fits their needs. This to optimize performance for the above use-case by 1) allowing operator to declare its attributes so that JM will only deploy an operator after all its inputs are "un-blocked"; and 2) allowing a given operator to effectively switch its from batch mode to stream mode during execution. We hope these capability can further enhance Flink as a stream-batch unified processing engine.



Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

...