Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Combinations of property choices supported by the existing APIs

  • The existing DataSet::iterate supports algorithms that process bounded streams + sync mode + per-round variable update.
  • The existing DataStream::iterate has a few bugs that prevent it from being used in production yet (see FLIP-15). Other than this, it expects to support algorithms that process unbounded streams + async mode + per-batch variable update.

Combinations of categories supported by the proposed APIs

...

The APIs proposed in this FLIP support all the 6 valid combinations of property choices.

Note that the proposed APIs do not support nested iterations because we have not found concrete use-case for it yet.


Summary

The following table summarizes the use-cases supported by the existing APIs and proposed APIs, respectively, with respect to the properties defined above.

...

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs.

1) How the termination of the iteration execution is determined.

We will add a coordinator operator which takes all feedback variable streams (emitted by the iteration body) and the termination criteria stream (if not null) as inputs. The execution of the graph created by the iteration body will terminate when all input streams have been fully consumed AND any of the following conditions is met:

  • The termination criteria stream is not null. And the coordinator operator has not observed any new value from the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
  • The coordinator operator has not observed any new value from any feedback variable stream between two consecutive onEpochWatermarkIncremented invocations.

TODO: explain how this is implemented.

2) The execution mode that is required to execute the iteration body.

  • If all inputs streams are bounded, then the iteration body can be executed in either the stream mode or the batch mode.
  • If any input stream is unbounded, then the iteration body must be executed in the stream mode.

3) The requirements of the edge types and parallelism in the IterationBody.

TODO: explain it.

The edge type that should be used inside the iteration body.

All edges inside the iteration body are required to have the PIPELINE type.

If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.

4) How the feedback edge is supported.

The Flink core runtime supports only DAG of operators. Thus it does not provide native support for feedback edges since feedback edges introduce circle in the operator graph.

Same as the implementation of the DataSet::iterate() API, the proposed APIs are implemented with the following approach:

  • Automatically insert the HEAD and the TAIL operators as the first and the last operators in the iteration body.
  • Co-locate the HEAD and the TAIL operators on the same task manager.
  • Have the HEAD and the TAIL operators transmit the records of the feedback edges using an in-memory queue.

feedback edge is supported.

The Flink core runtime can only execute a DAG of operators that does not involve cycles. Thus extra work needs to be done to support feedback edges (which effectively introduces cycles in the data flow).

Similar to the existing iterative API, this FLIP plans to implement the feedback edge using the following approach:

  • Automatically insert the HEAD and the TAIL operators as the first and the last operators in the iteration body. The TAIL operator 
  • Co-locate the HEAD and the TAIL operators on the same task manager.
  • Have the HEAD and the TAIL operators transmit the records of the feedback edges using an in-memory queue.

TODO: explain the maximum buffer size of the queue between the HEAD and the TAIL operators. Will we give it infinite buffer size or spill data to disk?


2) How the termination of the iteration execution is determined.

We will add a coordinator operator which takes all feedback variable streams (emitted by the iteration body) and the termination criteria stream (if not null) as inputs. The execution of the graph created by the iteration body will terminate when all input streams have been fully consumed AND any of the following conditions is met:

  • The termination criteria stream is not null. And the coordinator operator has not observed any new value from the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
  • The coordinator operator has not observed any new value from any feedback variable stream between two consecutive onEpochWatermarkIncremented invocations.

TODO: explain how this is implemented.


3) Cyclic Flow Control

TODO: explain it. "feedback-first" or "feedback-first when feedback queue buffers are utilized"?


4) The execution mode that is required to execute the iteration body.

  • If all inputs streams are bounded, then the iteration body can be executed in either the stream mode or the batch mode.
  • If any input stream is unbounded, then the iteration body must be executed in the stream mode.


5) The requirements of the edge types and parallelism in the IterationBody.

All edges inside the iteration body are required to have the PIPELINE type. If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.


65) Lifetime of the operators inside the iteration body.

...

In comparison, the existing DataSet::iterate(..) would destroy and re-create the iteration body once for each round of iteration, which in general could introduce more runtime overhead then the approach adopted in this FLIP.


67) How an iteration can resume from the most recent completed epoch after failover.

...

For any job that is executed in the stream mode, the job can start from a recent epoch after failover. This is achieved by re-using the existing checkpoint mechanism (only available in the stream mode) and additionally checkpointing the values buffered on the feedback edges.


78) How to implement an iterative algorithm in the sync mode.

...

See the Appendix section for a proof of why the solution described above could achieve the sync-mode execution as defined above.


89) How to run an iterative algorithm without dumping all user-provided data streams to disk.

...

In comparison, the iterateBoundedStreamsUntilTermination(...) method proposed in this FLIP allows users to run an iteration body without incurring this disk performance overhead. Developers have the freedom to optimize the performance based on its algorithm and data size, e.g. cache data in memory in a more compact format.


910) How to support operators (e.g. ReduceOperator) that requires bounded inputs in the IterationBody.

...

The existing FLIP explicitly does not support nested iterations. This is because we have not seen clear use-case that require nested iterations and we iteration. We would prefer to only add the APIs with known introduce additional complexity that are required by some reasonable use-cases.,

In the future if we decide to support nested iterations, we will need extra APIs to run the iteration body, with the semantics that operators inside the iteration body will be re-created for every round of iteration.

...