Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to address these issues and improve the relevant Flink design as much as possible, this FLIP proposes to add a couple APIs in the flink-ml repository to achieve the following goals:

...

We explain a few terminologies in the following to facilitate the reading understanding of this doc.

1) Feedback streams

We consider a stream to be a feedback stream if computation of records in this stream depends on the records in the stream itself. In other words, there is a circle in the Flink graph that generated this stream.

Note that the Flink core runtime supports only directed-acyclic-graph of operators. Thus, in order to support cyclic graph of operators, some "magic" needs to be done, which we will describe in the rest of this doc.

2) Iteration body

An iteration body is a subgraph of operators that implements the computation logic of e.g. an iterative machine learning algorithm. In particular, the iteration body will output values into some feedback streams and take values from the same feedback streams as part of its inputs.

Target Use-cases

The target use-cases (i.e. algorithms) can be described w.r.t. the properties described below. In the following, we first describe the definitions of properties, followed by the combinations of the property choices supported by the existing APIs and the proposed APIs, respectively.

Definitions of properties

Different algorithms might have different required properties for the input datasets (bounded or unbounded), synchronization between parallel subtasks (sync or async), amount of data processed for every variable update (a batch/subset or the entire dataset). We describe each of these properties below.

1) Algorithms have different needs for whether the input data streams should be bounded or unbounded. We classify those algorithms into online algorithm and offline algorithms as below.

  • For online training algorithms, the training samples will be unbounded streams of data. The corresponding iteration body should ingest these unbounded streams of data, read each value in each stream once, and update machine learning model repeatedly in near real-time. The iteration will never terminate in this case. The algorithm should be executed as a streaming job.
  • For offline training algorithms, the training samples will be bounded streams of data. The corresponding iteration body should read these bounded data streams for arbitrary number of rounds and update machine learning model repeatedly until a termination criteria is met (e.g. a given number of rounds is reached or the model has converged). The algorithm should be executed as a batch job.

2) Algorithms (either online or offline) have different needs of how their parallel subtasks.

  • In the sync mode, parallel subtasks, which execute the iteration body, update the model variables in a coordinated manner. There exists global epoch epochs, such that all subtasks read the shared model variables at the beginning of an epoch, calculate variable updates based on the fetched variable values, and write updates of the variable values at the end of this epoch.
  • In the async mode, each parallel subtask, which execute the iteration body, could read/update the shared model variables without waiting for variable updates from other subtask. For example, a subtask could have updated model variables 10 times when another subtask has updated model variables only 3 times.

The sync mode is useful when an algorithm should be executed in a deterministic way to achieve best possible accuracy, and the straggler issue (i.e. there is subtask which is considerably slower than others) does not cause slow down the algorithm execution too much. In comparison, the async mode is useful for algorithms which want to be parallelized and executed across many subtasks as fast as possible, without worrying about performance issue caused by stragglers, at the possible cost of reduced accuracy.

3) An algorithm may have additional requirements in how much data should be consumed each time before a subtask can update variables. There are two categories of choices here:

  • Per-batch variable update: The algorithm wants to update variables every time an arbitrary subset of the user-provided data streams (either bounded or unbounded) is processed.
  • Per-round variable update: The algorithm wants to update variables every time all data of the user-provided bounded data streams is processed.

In the machine learning domain, some algorithms allow users to configure a batch size and the model will be updated every time each subtask processes a batch of data. Those algorithms fits into the first category. And such an algorithm can be either online or offline.

Other algorithms only update variables every time the entire data is consumed for one round. Those algorithms fit into the second category. And such an algorithm must be offline because, by this definition, the user-provided dataset must be bounded.

Combinations of property choices supported by the existing APIs

The existing DataSet::iterate supports algorithms that process bounded streams + sync mode + per-round variable update.

The existing DataStream::iterate has a few bugs that prevent it from being used in production yet (see FLIP-15). Other than this, it expects to support algorithms that process unbounded streams + async mode + per-batch variable update.

Combinations of categories supported by the proposed APIs

As described above, there are 3 definition of properties where each property has 2 choices. So there are a total of 8 combination of property choices. Since that the "per-round variable update" can not be used with "unbounded data streams", only 6 out of the 8 choices are valid.

The APIs proposed in this FLIP support all the 6 valid combinations of property choices.

Summary

The following table summarizes the use-cases supported by the existing APIs and proposed APIs, respectively, with respect to the properties defined above.

...

, whose outputs might be be fed back as the inputs of this subgraph. Therefore there is circle in the Flink graph if the Flink program has an iteration body.

Note that not all outputs of an iteration body has to be fed back as the inputs of this subgraph.

2) Feedback stream

For a given iteration body, a stream is said to be a feedback stream if it connects an output of this iteration body back to the input of this iteration body.

3) Epoch of records

In the proposed APIs, for any given record generated by the iteration body, we define the epoch of this record to be the number of times the iteration body has been invoked in the computation history of this record. The exact definition of epoch can be found in the Java doc of the IterationUtils class below.

Note that epoch is also a term commonly used in the context of machine learning to indicate the number of passes the entire training dataset the machine learning algorithm has processed. We denote this definition as "classic definition of epoch" below.

Our definition of the term "epoch" is pretty much a natural extension of the "classic definition of epoch" to the context of asynchronous machine learning on the unbounded streams. We make the following observations regarding their comparison:

  • The classic definition of epoch is well-defined only when the machine learning algorithm processes bounded streams AND all subtasks of the algorithm update the model variables synchronously.
  • Our definition of epoch can also be applies in the cases where the machine learning algorithm processes unbounded streams or subtasks of the algorithm update the model variables asynchronously.
  • In the cases where the class definition of epoch is well-defined (defined above), if the machine learning algorithm updates the model variables once after making a pass of the training dataset, then these two definitions of the epoch are exactly the same.

Target Use-cases

The target use-cases (i.e. algorithms) can be described w.r.t. the properties described below. In the following, we first describe the definitions of properties, followed by the combinations of the property choices supported by the existing APIs and the proposed APIs, respectively.

Properties of machine learning algorithms

Different algorithms might have different required properties for the input datasets (bounded or unbounded), synchronization between parallel subtasks (sync or async), amount of data processed for every variable update (a batch/subset or the entire dataset). We describe each of these properties below.

1) Algorithms have different needs for whether the input data streams should be bounded or unbounded. We classify those algorithms into online algorithm and offline algorithms as below.

  • For online training algorithms, the training samples will be unbounded streams of data. The corresponding iteration body should ingest these unbounded streams of data, read each value in each stream once, and update machine learning model repeatedly in near real-time. The iteration will never terminate in this case. The algorithm should be executed as a streaming job.
  • For offline training algorithms, the training samples will be bounded streams of data. The corresponding iteration body should read these bounded data streams for arbitrary number of rounds and update machine learning model repeatedly until a termination criteria is met (e.g. a given number of rounds is reached or the model has converged). The algorithm should be executed as a batch job.

2) Algorithms (either online or offline) have different needs of how their parallel subtasks.

  • In the sync mode, parallel subtasks, which execute the iteration body, update the model variables in a coordinated manner. There exists global epoch epochs, such that all subtasks read the shared model variables at the beginning of an epoch, calculate variable updates based on the fetched variable values, and write updates of the variable values at the end of this epoch.
  • In the async mode, each parallel subtask, which execute the iteration body, could read/update the shared model variables without waiting for variable updates from other subtask. For example, a subtask could have updated model variables 10 times when another subtask has updated model variables only 3 times.

The sync mode is useful when an algorithm should be executed in a deterministic way to achieve best possible accuracy, and the straggler issue (i.e. there is subtask which is considerably slower than others) does not cause slow down the algorithm execution too much. In comparison, the async mode is useful for algorithms which want to be parallelized and executed across many subtasks as fast as possible, without worrying about performance issue caused by stragglers, at the possible cost of reduced accuracy.

3) An algorithm may have additional requirements in how much data should be consumed each time before a subtask can update variables. There are two categories of choices here:

  • Per-batch variable update: The algorithm wants to update variables every time an arbitrary subset of the user-provided data streams (either bounded or unbounded) is processed.
  • Per-round variable update: The algorithm wants to update variables every time all data of the user-provided bounded data streams is processed.

In the machine learning domain, some algorithms allow users to configure a batch size and the model will be updated every time each subtask processes a batch of data. Those algorithms fits into the first category. And such an algorithm can be either online or offline.

Other algorithms only update variables every time the entire data is consumed for one round. Those algorithms fit into the second category. And such an algorithm must be offline because, by this definition, the user-provided dataset must be bounded.

Combinations of property choices supported by the existing APIs

The existing DataSet::iterate supports algorithms that process bounded streams + sync mode + per-round variable update.

The existing DataStream::iterate has a few bugs that prevent it from being used in production yet (see FLIP-15). Other than this, it expects to support algorithms that process unbounded streams + async mode + per-batch variable update.

Combinations of categories supported by the proposed APIs

As described above, there are 3 definition of properties where each property has 2 choices. So there are a total of 8 combination of property choices. Since that the "per-round variable update" can not be used with "unbounded data streams", only 6 out of the 8 choices are valid.

The APIs proposed in this FLIP support all the 6 valid combinations of property choices.

Summary

The following table summarizes the use-cases supported by the existing APIs and proposed APIs, respectively, with respect to the properties defined above.

Input typeSynchronization  modeVariable update modeSupported by the existing APIsSupported by the proposed APIs
bounded data streamssync modeper-batch variable updateNoYes
bounded data streamssync modeper-round variable updateYesYes
bounded data streamsasync modeper-batch variable updateNoYes
bounded data streamsasync modeper-round variable updateNoYes
unbounded data streamssync modeper-batch variable updateNoYes
unbounded data streamsasync modeper-batch variable updateYesYes


Overview of the Iteration Paradigm

In the following, we explain the key components of an iterative algorithm that has motivated our choices of the proposed APIs.

An iterative algorithm has the following behavior:

  • The iterative algorithm has an iteration body that is repeatedly invoked until some termination criteria is reached (e.g. after a user-specified number of epochs has been reached)
  • In each invocation, the iteration body updates the model parameters based on the user-provided data as well as the most recent model parameters.
  • The iterative algorithm takes as inputs the user-provided data and the initial model parameters.
  • The iterative algorithm could output arbitrary user-defined information, such as the loss after each epoch, or the final model parameters. 

Therefore, the behavior of an iterative algorithm could be characterized with the following iteration paradigm (w.r.t. Flink concepts):

  • An iteration body is a Flink subgraph with the following inputs and outputs
    • Inputs: model-variables (as a list of unbounded data streams) and user-provided-data (as a list of data streams)
    • Outputs: feedback-model-variables (as a list of unbounded data streams) and user-observed-outputs (as a list of data streams)
  • A termination-condition that specifies when the iterative execution of the iteration body should terminate.
  • In order to execute an iteration body, caller needs to provide the following inputs and can get the following outputs.
    • Inputs: initial-model-variables (as a list of bounded data streams) and user-provided-data (as a list of data streams)
    • Outputs: the user-observed-output emitted by the iteration body.

It is important to note that the model-variables observed by the iteration body is not the same as the initial-model-variables provided by the caller. Instead, model-variables are computed as the union of the feedback-model-variables (emitted by the iteration body) and the initial-model-variables (provided by the caller of the iteration body).

The figure below summarizes the iteration paradigm described above.

Image Added

Public Interfaces

We propose to add the following APIs based on the iteration paradigm described above.


1) Add the IterationBody interface.

This interface abstracts the core computation logic that is expected to be iteratively computed. Any algorithm, which wants to apply iterative computation with feedback streams, should implement this interface.

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
package org.apache.flink.ml.iteration;

@PublicEvolving
public interface IterationBody {
    /**
     * This method creates the graph for the iteration body.
     *
     * See Utils::iterate, Utils::iterateBoundedStreams and Utils::iterateAndReplayBoundedStreams for how the iteration
     * body can be executed and when execution of the corresponding graph should terminate.
     *
     * Required: the number of feedback variable streams returned by this method must equal the number of variable
     * streams given to this method.
     *
     * @param variableStreams the variable streams.
     * @param dataStreams the data streams.
     * @return a IterationBodyResult.
     */
    IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
}


2) Add the IterationListener interface.

If an UDF (a.k.a user-defined function) used inside the IterationBody implements this interface, the callbacks on this interface will be invoked when corresponding events happen.

This interface allows users to register callbacks that can be used to a) run an algorithm in sync mode; and b) emit final output after the iteration terminates.

Overview of the Iteration Paradigm

Public Interfaces

We propose to add a few interfaces and helper classes in the flink-ml repository.  We first provide a brief description for each interface/class proposed by this FLIP, followed by their API code with detailed Java doc.

Brief description of the proposed APIs

1) Add the IterationBody interface.

As its name suggests, this interface abstracts the core computation logic that is expected to be iteratively computed. Any algorithm, which wants to apply iterative computation with feedback streams, should implement this interface.

2) Add the IterationListener interface.

If an UDF (a.k.a user-defined function) used inside the IterationBody implements this interface, the callbacks on this interface will be invoked when corresponding events happen.

This interface allows users to register callbacks that can be used to a) run an algorithm in sync mode; and b) emit final output after the iteration terminates.

3) Add the IterationUtils class.

This class provides APIs to execute an iteration body with user-specified inputs. This class provides three APIs to run an iteration body, each with different input types (e.g. bounded data streams vs. unbounded data streams) and data replay semantics (i.e. whether to replay the user-provided data streams).

The common and the primary functionality of APIs in this class is this: Union the feedback variables streams (returned by the iteration body) with the initial variable streams (provided by the user) and use the merged streams as inputs to the iteration body.

4) Add the DataStreamList and IterationBodyResult as helper classes.

DataStreamList is a helper class that serves as a container for a list of data streams with possibly different elements types. And IterationBodyResult is a helper class which contains the streams returned by the IterationBody::process(...)

...

Code Block
languagejava
titleThe IterationBody API
linenumberstruetrue
org.apache.flink.ml.iteration

/**
 * The callbacks defined below will be invoked only if the operator instance which implements this interface is used
 * within an iteration body.
 */public class DataStreamList {
    public DataStreamList(DataStream<?>... dataStreams) {...}

    // Returns the number of data streams in this list.
    public int size() {...}

    // Returns the data stream at the given index in this list.
    @SuppressWarnings("unchecked")
    public <T> DataStream<T> get(int index) {...}
}

@PublicEvolving
public interface IterationBodyIterationListener<T> {
    /**
     * This method creates the graph for the iteration body./**
     *
 This callback is invoked every time *the See Utils::iterate, Utils::iterateBoundedStreams and Utils::iterateAndReplayBoundedStreams for how the iterationepoch watermark increments. The initial epoch watermark is -1.
     *
 body can be executed and* whenThe executionepochWatermark ofis the corresponding graph should terminate.
     *maximum integer that meets this requirement: every record that arrives at the operator
     * Required: the number of feedback variable streams returned by this method must equal the number of variablegoing forward should have an epoch larger than the epochWatermark. See Java docs in IterationUtils for how epoch
     * streamsis givendetermined tofor thisrecords method.
ingested into the iteration body *
and for records emitted by *operators @paramwithin variableStreams the
 variable streams.
   * iteration *body.
 @param dataStreams the data streams.*
     * @returnIf aall IterationBodyResult.
inputs are bounded, the maximum */
epoch of all records IterationBodyResultingested process(DataStreamList variableStreams, DataStreamList dataStreams);
}


/**
 * A helper class that contains the streams returnedinto this operator is used as the
     * epochWatermark by the last iterationinvocation body.
 */
class IterationBodyResult {
of this callback.
     /**
     * A@param listepochWatermark ofThe feedbackincremented variableepoch streamswatermark.
 These streams will only be* used@param duringcontext theA iterationcontext executionthat andallows will
emitting side output. The context *is notonly bevalid returned toduring the callerinvocation of
  the iteration body. It* is assumed that the method which executes the
     * iteration body willthis feedmethod.
 the records of the feedback* variable@param streamscollector backThe tocollector thefor correspondingreturning inputresult variablevalues.
     * streams./
     */
    DataStreamList feedbackVariableStreamsvoid onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector);

    /**
     * AThis listcallback ofis output streams. These streams will be returned to the callerinvoked after the execution of the methodsiteration thatbody executehas theterminated.
     * iteration body.
     */
 See Java doc DataStreamList outputStreams;

    /**
     * An optional termination criteria stream. If this stream is not null, it will be used together with the
     * feedback variable streams to determine when the iteration should terminate.of methods in IterationUtils for the termination conditions.
     *
     * @param context A context that allows emitting side output. The context is only valid during the invocation of
     */
     Optional<DataStream<?>> terminationCriteria;
}


/**
 * The callbacks defined below will be invoked only ifthis themethod.
 operator instance which implements this* interface@param iscollector used
The *collector withinfor anreturning iterationresult bodyvalues.
     */
@PublicEvolving
public interface IterationListener<T> {    void onIterationTermination(Context context, Collector<T> collector);

    /**
     * ThisInformation callbackavailable isin invokedan everyinvocation timeof the epochcallbacks watermarkdefined increments.in The initial epoch watermark is -1.
     *the IterationProgressListener.
     */
  The epochWatermark isinterface theContext maximum{
 integer that meets this requirement: every record that/**
 arrives at the operator
     * Emits goinga forwardrecord shouldto havethe anside epochoutput largeridentified thanby the {@link epochWatermarkOutputTag}.
  See Java docs in IterationUtils for how epoch*
     * is determined for records* ingested@param intooutputTag the iteration{@code bodyOutputTag} andthat foridentifies recordsthe emittedside byoutput operatorsto withinemit theto.
         * iteration body @param value The record to emit.
     *
    */
 * If all inputs are bounded, the maximum<X> epochvoid output(OutputTag<X> outputTag, X value);
    }
}


3) Add the IterationUtils class.

This class provides APIs to execute an iteration body with user-specified inputs. This class provides three APIs to run an iteration body, each with different input types (e.g. bounded data streams vs. unbounded data streams) and data replay semantics (i.e. whether to replay the user-provided data streams).

The common and the primary functionality of APIs in this class is this: Union the feedback variables streams (returned by the iteration body) with the initial variable streams (provided by the user) and use the merged streams as inputs to the iteration body.

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class to apply {@link IterationBody} to data streams.
 */
@PublicEvolving
public class IterationUtils {
    /**of all records ingested into this operator is used as the
     * epochWatermark by the last invocation of this callback.
     *
     * @param epochWatermark The incremented epoch watermark.
     * @param context A context that allows emitting side output. The context is only valid during the invocation of
     *                this method.
     * @param collector The collector for returning result values.
     */
 This method can voiduse onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector);

    /**an iteration body to process records in unbounded data streams.
     *
 This callback is invoked after* theThis executionmethod ofinvokes the iteration body with the hasfollowing terminated.parameters:
     *
 1) The 1st parameter *is Seea Javalist docof ofinput methodsvariable instreams, IterationUtilswhich forare thecreated terminationas conditions.
the union of the initial *variable
     * @paramstreams contextand Athe contextcorresponding thatfeedback allowsvariable emittingstreams side(returned output.by Thethe context is only valid during the invocation of
     *      iteration body).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     this* method.
The epoch values are determined *as @paramdescribed collectorbelow. TheSee collectorIterationListener for how returningthe resultepoch values are used.
     */
 1) All records voidin onIterationTermination(Context context, Collector<T> collector);

    /**the initial variable streams has epoch=0.
     * 2) InformationAll availablerecords in an invocation of the callbacksdata definedstreams in the IterationProgressListener.
     */
    interface Context {
 has epoch=MAX_LONG. In this case, records in the data stream won't affect
       /**
 any operator's low watermark.
     * Emits a record to the side output identified3) For any record emitted by thethis {@link OutputTag}.
         *
    operator into a non-feedback stream, the epoch of this emitted record = the
     * @paramepoch outputTagof the {@codeinput OutputTag}record that identifiestriggers thethis sideemission. outputIf tothis emitrecord to.
is emitted by onEpochLWIncremented(), then
     * the @paramepoch valueof Thethis record = incrementedEpochLW to- emit1.
     * 4) For  */
        <X> void output(OutputTag<X> outputTag, X value);
    }
}

/**
 * A helper class to apply {@link IterationBody} to data streams.
 */
@PublicEvolving
public class IterationUtils {
    /**any record emitted by this operator into a feedback variable stream, the epoch of the emitted record =
     * min(the epoch of the input record that triggers this emission, MAX_LONG - 1) + 1. If this record is emitted by
     * This method can use an iteration body to process records in unbounded data streams.onEpochLWIncremented(), then the epoch of this record = incrementedEpochLW.
     *
     *
 The execution of the *graph Thiscreated method invokesby the iteration body withwill thenot followingterminate parameters:
by itself. This is because *at 1)least
 The 1st parameter is a* listone of inputits variabledata streams, which are created as the union of the initial variable is unbounded.
     *
     * Required:
     * streams1) andAll the correspondinginit feedback variable streams (returnedmust by the iteration body)be bounded.
     * 2) The 2nd parameter is There is at least one unbounded stream in the data streams given to this methodlist.
     *
 3) The parallelism of *any Thestream epochin valuesthe areinitial determinedvariable asstreams describedmust below.equal Seethe IterationListener forparallelism howof the epochstream values are used.at the
     * 1)same Allindex records inof the initialfeedback variable streams returned has epoch=0by the IterationBody.
     *
 2) All records in the* data@param streamsinitVariableStreams has epoch=MAX_LONG. In this case, records in the data stream won't affect
     * any operator's low watermark.The initial variable streams. These streams will be merged with the feedback variable
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input recordstreams thatbefore triggersbeing thisused emission.as Ifthe this1st recordparameter isto emittedinvoke by onEpochLWIncremented(), thenthe iteration body.
     * @param thedataStreams epochThe ofdata thisstreams. recordThese =streams incrementedEpochLWwill -be 1.
used as the 2nd parameter *to 4)invoke Forthe anyiteration
 record emitted by this operator* into a feedback variable stream, the epoch of the emitted record =
     * min(the epoch ofbody.
 the input record that triggers* this@param emission, MAX_LONG - 1) + 1. If this record is emitted bybody The computation logic which takes variable/data streams and returns variable/output streams.
     * onEpochLWIncremented(), then the epoch of this record = incrementedEpochLW @return The list of output streams returned by the iteration boy.
     */
    static * The execution of the graph created by the iteration body will not terminate by itself. This is because at least
     * one of its data streams is unbounded.DataStreamList iterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     *
 termination criteria is reached * Required:
     * 1) All the init variable streams must be bounded.(e.g. the given number of rounds is completed or no further variable update is
     * 2needed). ThereBecause isthis atmethod leastdoes onenot unboundedreplay streamrecords in the data streams list.
     * 3) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the, the iteration body needs to cache those
     * records in order to visit those records repeatedly.
     *
     * sameThis indexmethod ofinvokes the feedbackiteration variablebody streamswith returnedthe by the IterationBody.following parameters:
     *
 1) The 1st parameter is *a @paramlist initVariableStreamsof The initialinput variable streams., which Theseare streamscreated willas bethe mergedunion withof the feedbackinitial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is the  data   streams beforegiven beingto usedthis asmethod.
 the 1st parameter to invoke*
 the iteration body.
  * The epoch *values @paramare dataStreamsdetermined Theas datadescribed streamsbelow. TheseSee streamsIterationListener willfor behow usedthe asepoch thevalues 2ndare parameterused.
 to invoke the iteration
 * 1) All records *in the initial variable streams has epoch=0.
     * 2) All records in the data streams has bodyepoch=0.
     * 3) @paramFor bodyany Therecord computationemitted logicby whichthis takesoperator variable/datainto streamsa and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is
     * needed). Because this method does not replay records in the data streams, the iteration body needs to cache thosenon-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW - 1.
     * 4) For any record emitted by this operator into a feedback variable stream, the epoch of the emitted record = the
     * epoch of the input record that triggers this emission + 1. If this record is emitted by onEpochLWIncremented(),
     * then the epoch of this record = incrementedEpochLW.
     *
 records in order to visit* thoseSuppose recordsthere repeatedly.
is a coordinator operator which *
takes all feedback variable streams * This method invokes (emitted by the iteration body with)
     * and the followingtermination parameters:
criteria stream (if not null) *as 1)inputs. The 1st parameter is a list of input variable streams, which are created as the unionexecution of the graph created by the
     * iteration body will terminate when any of the following conditions initialis variablemet:
     * streams1) andThe thetermination correspondingcriteria feedbackstream variableis streamsnot (returnednull. byAnd the coordinator iteration body).
     * 2) The 2nd parameter is the data streams given to this methodoperator has not observed any new value from
     * the termination criteria stream between two consecutive onEpochLWIncremented invocations.
     *
 2) The coordinator operator has not *observed any Thenew epochvalue valuesfrom areany determinedfeedback asvariable describedstream below.between Seetwo
 IterationListener for how the epoch* valuesconsecutive areonEpochLWIncremented usedinvocations.
     * 1) All records in the initial variable streams has epoch=0.
     * Required:
     * 21) All records in the init variable streams and the data streams has epoch=0must be bounded.
     * 32) The parallelism Forof any recordstream emittedin bythe thisinitial operatorvariable intostreams amust non-feedback stream, equal the epochparallelism of thisthe emittedstream record =at the
     * same epochindex of the inputfeedback recordvariable thatstreams triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW - 1.returned by the IterationBody.
     *
     * 4) For any record emitted by this operator into a @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
 stream, the epoch of the* emitted record = the
     * epoch of the input record that triggers this emission + 1. If this record is emitted by onEpochLWIncremented(),
 streams before being used *as thenthe the1st epochparameter ofto thisinvoke recordthe =iteration incrementedEpochLWbody.
     *
 @param dataStreams The data *streams. SupposeThese therestreams iswill abe coordinatorused operatoras whichthe takes all feedback variable streams (emitted by2nd parameter to invoke the iteration body)
     *   and the termination criteria stream (if not null) as inputs. The execution of the graph created by thebody.
     * iteration@param body The willcomputation terminatelogic whenwhich anytakes ofvariable/data thestreams followingand conditionsreturns is met:variable/output streams.
     * 1)@return The termination criteria stream is not null. And the coordinator operator has not observed any new value from list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * theThis terminationmethod criteriacan streamuse betweenan twoiteration consecutivebody onEpochLWIncrementedto invocations.
process records in some bounded *data 2)streams Theiteratively coordinatoruntil operatora
 has not observed any new* valuetermination fromcriteria anyis feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     *reached (e.g. the given number of rounds is completed or no further variable update is
     * Required:
     * 1) All the init variable streams and the data streams must be bounded.needed). Because this method replays records in the data streams, the iteration body does not need to cache those
     * 2)records Theto parallelismvisit ofthose anyrecords streamrepeatedly.
 in the initial variable streams*
 must equal the parallelism of* theThis streammethod atinvokes the
 iteration body with the following *parameters:
 same index of the feedback* variable1) streamsThe returned1st byparameter theis IterationBody.
a list of input variable *
streams, which are created as *the @paramunion initVariableStreamsof Thethe initial variable
 streams. These streams will be* mergedstreams withand the corresponding feedback variable
 streams (returned by the *  iteration body).
     * 2) The 2nd parameter is a list of replayed data streams, which are created by replaying the initial  data streams
 before being used as the* 1stround parameterby toround invokeuntil the iteration bodyterminates.
 The records in the * @param dataStreams The data streams. These streams Nth round will be used as the 2nd parameter to invokeemitted into the iteration
     * body only if the low watermark of the first operator in the iteration body >= N -   body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.1.
     *
     * @return The list of output streams returned by the iteration boy The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     */
 1) All records staticin DataStreamListthe iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

initial variable streams has epoch=0.
     /**
 2) The records from *the Thisinitial methoddata canstreams usewill anbe iterationreplayed bodyround toby processround recordsinto inthe someiteration boundedbody. data streams iteratively until aThe records
     * in terminationthe criteriafirst isround reached (e.g. the given number of rounds is completed or no further variable update ishave epoch=0. And records in the Nth round have epoch = N - 1.
     * needed). Because this method replays records in the data streams3) For any record emitted by this operator into a non-feedback stream, the iterationepoch bodyof doesthis notemitted needrecord to= cache thosethe
     * records to visit those records repeatedly.
     *
     * This method invokes the iteration body with the following parameters: epoch of the input record that triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW - 1.
     * 14) TheFor any 1strecord parameteremitted isby athis listoperator ofinto inputa variablefeedback streamsstream, whichthe areepoch createdof asthe theemitted unionrecord of= the initial variableepoch
     * streams andof the correspondinginput feedbackrecord variablethat streamstriggers (returnedthis byemission the iteration body).
     * 2) The 2nd parameter is a list of replayed data streams, which are created by replaying the initial data streams+ 1. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW.
     *
 round   by round* untilSuppose thethere iterationis terminates.a Thecoordinator recordsoperator inwhich thetakes Nthall roundfeedback willvariable bestreams (emitted intoby the iteration body)
     * bodyand only if the lowtermination watermarkcriteria ofstream the(if firstnot operatornull) inas theinputs. iterationThe bodyexecution >=of Nthe -graph 1.
created by    *the
     * Theiteration epochbody valueswill areterminate determinedwhen as described below. See IterationListener for howany of the epochfollowing valuesconditions areis used.met:
     * 1) All records in The termination criteria stream is not null. And the initialcoordinator variableoperator streamshas has epoch=0.
     * 2) The records from the initial data streams will be replayed round by round into the iteration body. The records
     * in the first round have epoch=0. And records in the Nth round have epoch = N - 1not observed any new value from
     * the termination criteria stream between two consecutive onEpochLWIncremented invocations.
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     *
  3) For any record* emittedRequired:
 by this operator into a non-feedback stream, the epoch of this emitted record = the* 1) All the init variable streams and the data streams must be bounded.
     * 2) The epochparallelism of any thestream inputin recordthe thatinitial triggersvariable thisstreams emission.must Ifequal thisthe recordparallelism isof emittedthe by onEpochLWIncremented(), thenstream at the
     * thesame epochindex of the feedback thisvariable recordstreams =returned incrementedEpochLWby -the 1IterationBody.
     * 4) For any record emitted by this operator into a feedback stream, the epoch of the emitted record = the epoch

     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *      * of the input record that triggers this emission + 1. If this record is emitted by onEpochLWIncremented(), then
    streams *before thebeing epochused ofas thisthe record1st =parameter incrementedEpochLW.
to invoke the iteration  *body.
     * Suppose@param thereinitDataStreams isThe ainitial coordinatordata operatorstreams. whichRecords takesfrom allthese feedbackstreams variablewill streamsbe (emittedrepeatedly byreplayed the iteration body)and used
     * and the termination criteria stream (if not null) as inputs. The execution of the graph created by the
     * iterationas bodythe will2nd terminateparameter whento any ofinvoke the following conditions is met:iteration body.
     * @param 1)body The computation terminationlogic criteriawhich streamtakes isvariable/data notstreams null.and Andreturns the coordinator operator has not observed any new value from
     * the termination criteria stream between two consecutive onEpochLWIncremented invocationsvariable/output streams.
     * @return The list of output streams returned by the iteration boy.
     * 2) The coordinator operator*/
 has not observed anystatic newDataStreamList value from any feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     *
     * Required:iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody body) {...}

}


4) Add IterationBodyResult as a helper class.

IterationBodyResult is a helper class which contains the streams returned by the IterationBody::process(...)

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class that contains the streams returned by the iteration body.
 */
class IterationBodyResult {
    /**
     * 1)A Alllist theof initfeedback variable streams. andThese thestreams datawill streams mustonly be bounded.
used during the iteration execution *and 2)will
 The parallelism of any stream* innot thebe initialreturned variable streams must equal to the parallelismcaller of the streamiteration atbody. the
It is assumed   * same index of that the feedbackmethod variable streams returned bywhich executes the IterationBody.
     *
 iteration body will feed *the @paramrecords initVariableStreamsof Thethe initialfeedback variable streams. These streams will be merged with the feedback variable streams back to the corresponding input variable
     * streams.
     */
    DataStreamList feedbackVariableStreams;

    /**
     * A list of output streams. These streams will be returned to the caller streamsof beforethe beingmethods usedthat asexecute the
 1st parameter to invoke the* iteration body.
     * @param initDataStreams The initial data streams. Records from these streams will be repeatedly replayed and used
     *      /
    DataStreamList outputStreams;

    /**
     * An optional termination criteria stream. If this stream is not null, it will be used together with the
     * feedback variable streams to determine when the iteration should terminate.
   as the 2nd*/
 parameter to invoke the iteration body.
     * @param body TheOptional<DataStream<?>> terminationCriteria;
}


5) Add the DataStreamList as a helper class.

DataStreamList is a helper class that serves as a container for a list of data streams with possibly different elements types.

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
package org.apache.flink.ml.iteration;

public class DataStreamList {
    // Returns the number of data streams in this list computation logic which takes variable/data streams and returns variable/output streams.
    public *int @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody bodysize() {...}

    // Returns the data stream at the given index in this list.
    public <T> DataStream<T> get(int index) {...}

}

The iteration body also have one or multiple output streams. The iteration body might output records at each round, and the records emitted in all the rounds composed the final outputs.

Therefore, an iteration body composed of

...

}


Overall Design

To reduce the development and maintenance overhead, it would be preferred to have a unified implementation for different types of iterations. In fact, the different iteration types shares the same requirements in runtime implementation:

...