Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. Then operatorA needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.

In this FLIP, we propose to optimize performance for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator, as well as the results emitted by its upstream operators, will all be blocking, which effectively let Flink schedule and execute this operator as well as its upstream operators in batch mode.

...

3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has isOutputOnEOF == true:
    • The results of this operator as well as its upstream operators have blocking partition type.
    • This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
  • If all Transformation has isOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.


5) When DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.

...

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.


Future Work

It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.