Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The termination criteria stream is not null. And the coordinator operator has not observed any new value from the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
  • The coordinator operator has not observed any new value from any feedback variable stream between two consecutive onEpochWatermarkIncremented invocations.

TODO: explain how this is implemented.


2) The execution mode that is required to execute the iteration body.

  • If all inputs streams are bounded, then the iteration body can be executed in either the stream mode or the batch mode.
  • If any input stream is unbounded, then the iteration body must be executed in the stream mode.


3) The requirements of the edge types and parallelism in the IterationBody.

TODO: explain it.

The edge type that should be used inside the iteration body.

...

With the approach proposed in this FLIP, the operators inside the iteration body are only created once and destroyed after the entire iteration terminates.

In comparison, the existing DataSet::iterate(..) would destroy and re-create the iteration body once for each round of iteration, which in general could introduce more runtime overhead then the approach adopted in this FLIP.

...

In comparison, the iterateBoundedStreamsUntilTermination(...) method proposed in this FLIP allows users to run an iteration body without incurring this disk performance overhead. Developers have the freedom to optimize the performance based on its algorithm and data size, e.g. cache data in memory in a more compact format.


9) How to support operators (e.g. ReduceOperator) that requires bounded inputs in the IterationBody.

TODO: explain it.


Example Usages

In this section we provide examples code snippets to demonstrate how we can use the APIs proposed in this FLIP to address the target use-cases described above.

...

Compatibility, Deprecation, and Migration Plan

Deprecation plan

The following APIs will be deprecated and removed in the future Flink release:

  • The entire DataSet class. See FLIP-131 for its motivation and the migration plan. The deprecation of DataSet::iterate(...) proposed by this FLIP is covered by FLIP-131.
  • The DataStream::iterate(...) and DataStream::iterate(long).

Compatibility

  • This FLIP introduces backward in-compatible changes by proposing to remove DataStream::iterate(...) in the future.

...

  • We expect the APIs proposed in this FLIP can address most of the use-cases supported by the DataStream::iterate(...) and

...

  • DataSet::iterate(

...

  • ...). The only use-cases that we have dropped support are those that require nested iteration on bounded data streams in sync mode. We have made this choice because we are not aware of any reasonable use-cases that require nested iteration. This support can be added if any user provide a good use-case for nested iteration.

Migration plan

  • Users will need to re-write their application code in order to migrate from the existing iterative APIs to the proposed APIs.

Rejected Alternatives

1) Support use-cases that require nested iterations.

The existing FLIP explicitly does not support nested iterations. This is because we have not seen clear use-case that require nested iterations and we would prefer to only add the APIs with known use-cases.

In the future if we decide to support nested iterations, we will need extra APIs to run the iteration body, with the semantics that operators inside the iteration body will be re-created for every round of iteration. We expect that the APIs proposed in this FLIP can support all use-cases supported by the existing iterative APIs.


Appendix

1) In the following, we prove that the proposed solution can be used to implement an iterative algorithm in the sync mode.

...