Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The target use-cases (i.e. algorithms) can be described w.r.t. the properties described below. In the following, we first describe the definitions of define those properties, followed by the combinations of the property choices supported by the existing APIs and the proposed APIs, respectively.

...

  • In the sync mode, parallel subtasks, which execute the iteration body, update the model variables in a coordinated manner. There exists global epoch epochs, such that all subtasks read fetch the shared model variables variable values at the beginning of an epoch, calculate model variable updates based on the fetched variable values, and write updates of the model variable values at the end of this epoch. In other words, al subtasks read and update model variables in global lock steps.
  • In the async mode, each parallel subtask, which execute the iteration body, could read/update the shared model variables without waiting for variable updates from other subtask. For example, a subtask could have updated model variables 10 times when another subtask has updated model variables only 3 times.

...

In the following, we explain the key components of an iterative algorithm iteration paradigm that has motivated our choices of the proposed APIs.

An iterative algorithm has the following behavior pattern:

  • The iterative algorithm has an iteration body that is repeatedly invoked until some termination criteria is reached (e.g. after a user-specified number of epochs has been reached)
  • In each invocation, the iteration body updates the model parameters based on the user-provided data as well as the most recent model parameters.
  • The iterative algorithm takes as inputs the user-provided data and the initial model parameters.
  • The iterative algorithm could output arbitrary user-defined information, such as the loss after each epoch, or the final model parameters. 

Therefore, the behavior of an iterative algorithm could be characterized with the following iteration paradigm (w.r.t. Flink concepts):

  • An iteration-body is a Flink subgraph with the following inputs and outputs:
    • Inputs: model-variables (as a list of unbounded data streams) and user-provided-data (as a list of data streams)
    • Outputs: feedback-model-variables (as a list of unbounded data streams) and user-observed-outputs (as a list of data streams)
  • A termination-condition that specifies when the iterative execution of the iteration body should terminate.
  • In order to execute an iteration body, caller a user needs to provide execute an iteration body the following inputs, and can get gets the following outputs.
    • Inputs: initial-model-variables (as a list of bounded data streams) and user-provided-data (as a list of data streams)
    • Outputs: the user-observed-output emitted by the iteration body.

It is important import to note that the user typically should not invoke the IterationBody::process directly because the model-variables observed  expected by the iteration body is not the same as the initial-model-variables provided by the calleruser. Instead, model-variables are computed as the union of the feedback-model-variables (emitted by the iteration body) and the initial-model-variables (provided by the caller of the iteration body). To relieve user from creating this union operator, we have added utility class (see IterationUtils) to run an iteration-body with the user-provided inputs.

The figure below summarizes the iteration paradigm described above. The streams in the red color indicates are inputs provided by the caller user to the iteration body, as well as outputs emitted by the iteration body to the calleruser.

Public Interfaces

We propose to make the following API changes to support the iteration paradigm described above.

1) Add the IterationBody interface.

This interface abstracts the core computation logic that is expected to be iteratively computed. Any algorithm, which wants to apply iterative computation with feedback streams, should implement this interfacecorresponds to the iteration-body with the inputs and outputs described in the iteration paradigm. This interface should be implemented by the developer of the algorithm.

Note that the IterationBody also outputs the terminationCriteria which corresponds to the termination-condition described in the iteration paradigm. This allows the algorithm developer to use a stream created inside the IterationBody as the terminationCriteria.

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

@PublicEvolving
public interface IterationBody {
    /**
     * This method creates the graph for the iteration body.
     *
     * See Utils::iterate, Utils::iterateBoundedStreams and Utils::iterateAndReplayBoundedStreams for how the iteration
     * body can be executed and when execution of the corresponding graph should terminate.
     *
     * Required: the number of feedback variable streams returned by this method must equal the number of variable
     * streams given to this method.
     *
     * @param variableStreams the variable streams.
     * @param dataStreams the data streams.
     * @return a IterationBodyResult.
     */
    IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
}


2) Add the IterationListener interface.

If an UDF (a.k.a user-defined function) used inside the IterationBody implements this interface, the callbacks on this interface will be invoked when corresponding events happen.

This interface allows users to register callbacks that can be used to a) run an algorithm in sync mode; and b) emit final output after the iteration terminates.

IterationBodyResult class.

This is a helper class which contains the objects returned by the IterationBody::process(...).

Code Block
languagejava
linenumberstrue
package 
Code Block
languagejava
linenumberstrue
org.apache.flink.ml.iteration;

/**
 * TheA callbackshelper definedclass belowthat willcontains bethe invokedstreams onlyreturned ifby the operator instance which implements this interface is used
 * within an iteration body.
 */
@PublicEvolving
publicclass interfaceIterationBodyResult IterationListener<T> {
    /**
     * ThisA callbacklist isof invokedfeedback everyvariable timestreams. the epoch watermark increments. The initial epoch watermark is -1.These streams will only be used during the iteration execution and will
     *
 not be returned to *the Thecaller epochWatermark isof the maximumiteration integerbody. thatIt meetsis thisassumed requirement:that everythe recordmethod thatwhich arrivesexecutes at the operator
     * goingiteration forwardbody shouldwill havefeed anthe epochrecords larger thanof the epochWatermark.feedback Seevariable Javastreams docsback into IterationUtilsthe forcorresponding howinput epochvariable
     * isstreams.
 determined for records ingested into*/
 the iteration body and for records emitted by operators within theDataStreamList feedbackVariableStreams;

    /**
     * iterationA body.
list of output streams. These *
streams will be returned to *the If all inputs are bounded, caller of the maximummethods epochthat ofexecute allthe
 records ingested into this operator* is used as theiteration body.
     */
 epochWatermark by the last invocation of this callback.DataStreamList outputStreams;

     /**
     * @paramAn epochWatermarkoptional Thetermination incrementedcriteria epoch watermarkstream.
 If this stream is *not @paramnull, contextit Awill contextbe thatused allowstogether emitting side output. The context is only valid during the invocation ofwith the
     * feedback variable streams to determine when the iteration        this methodshould terminate.
     * @param collector The collector for returning result values.
     */
    Optional<DataStream<?>> terminationCriteria;
}


3) Add the IterationListener interface.

If an UDF (a.k.a user-defined function) used inside the IterationBody implements this interface, the callbacks on this interface will be invoked when corresponding events happen.

This interface allows users to achieve the following goals:
- Run an algorithm in sync mode, i.e. each subtask will wait for model parameters updates from all other subtasks before reading the aggregated model parameters and starting the next epoch of execution. 
- Emit final output after the iteration terminates.


Code Block
languagejava
linenumberstrue
org.apache.flink.ml.iteration

/**
 * The callbacks defined below will be invoked only if the operator instance which implements this interface is used
 * within an iteration body.
 */
@PublicEvolving
public interface IterationListener<T> {
    /**void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector);

    /**
     * This callback is invoked after the execution of the iteration body has terminated.
     *
     * See Java doc of methods in IterationUtils for the termination conditions.
     *
     * @paramThis callback contextis Ainvoked contextevery thattime allowsthe emittingepoch sidewatermark outputincrements. The initial epoch contextwatermark is only-1.
 valid during the invocation of*
     * The epochWatermark is the maximum integer that meets this requirement: every record that arrives at thisthe method.operator
     * @paramgoing collector The collector for returning result values.
     */forward should have an epoch larger than the epochWatermark. See Java docs in IterationUtils for how epoch
    void onIterationTermination(Context context, Collector<T> collector);

    /**
     * Information available in an invocation of the callbacks defined in the IterationProgressListener* is determined for records ingested into the iteration body and for records emitted by operators within the
     * iteration body.
     */
    interface Context* {
If all inputs are bounded, the maximum epoch /**
of all records ingested into this operator is used *as Emitsthe
 a record to the side* output identifiedepochWatermark by the {@link OutputTag} last invocation of this callback.
     *
     *
 @param epochWatermark The incremented epoch watermark.
     * @param outputTagcontext the {@code OutputTag}A context that identifiesallows theemitting side output to emit to.
. The context is only valid during the invocation of
     *         *   @param value The record tothis emitmethod.
     * @param collector The collector for returning result */values.
     */
   <X> void output(OutputTag<X> outputTag, X value);onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector);

    }
}

3) Add the IterationUtils class.

This class provides APIs to execute an iteration body with user-specified inputs. This class provides three APIs to run an iteration body, each with different input types (e.g. bounded data streams vs. unbounded data streams) and data replay semantics (i.e. whether to replay the user-provided data streams).

The common and the primary functionality of APIs in this class is this: Union the feedback variables streams (returned by the iteration body) with the initial variable streams (provided by the user) and use the merged streams as inputs to the iteration body.

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class to apply {@link IterationBody} to data streams.
 */
@PublicEvolving
public class IterationUtils {
    /*/**
     * This callback is invoked after the execution of the iteration body has terminated.
     *
     * See Java doc of methods in IterationUtils for the termination conditions.
     *
     * This@param methodcontext canA usecontext anthat iterationallows bodyemitting toside processoutput. recordsThe incontext unboundedis data streams.only valid during the invocation of
     *
       * This method invokes the iteration body with the followingthis parameters:method.
     * 1)@param collector The 1st parameter is a list of input variable streams, which are created as the union of the initial variable collector for returning result values.
     */
    void onIterationTermination(Context context, Collector<T> collector);

    /**
     * Information streamsavailable andin thean correspondinginvocation feedbackof variablethe streamscallbacks (returneddefined byin the iteration body)IterationProgressListener.
     */
 2) The 2nd parameterinterface isContext the data streams given to this method.
{
        /**
      * The epoch values* areEmits determineda asrecord describedto below.the Seeside IterationListeneroutput foridentified howby the epoch values are used{@link OutputTag}.
      * 1) All records*
 in the initial variable streams has epoch=0.
  * @param outputTag *the 2) All records in{@code OutputTag} that identifies the dataside output streamsto has epoch=MAX_LONG. In this case, records in the data stream won't affect
     * any operator's low watermark.
emit to.
         * @param value The record to emit.
         */
 3) For any record emitted by this operator<X> intovoid a non-feedback streamoutput(OutputTag<X> outputTag, X value);
    }
}


4) Add the IterationUtils class.

This class provides APIs to execute an iteration body with the user-provided inputs. This class provides three APIs to run an iteration body, each with different input types (e.g. bounded data streams vs. unbounded data streams) and data replay semantics (i.e. whether to replay the user-provided data streams).

Each of these three APIs provide the functionality as described in the iteration paradigm: Union the feedback variables streams (returned by the iteration body) with the initial variable streams (provided by the user) and use the merged streams as inputs to invoke IterationBody::process(...).


Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class to apply {@link IterationBody} to data streams.
 */
@PublicEvolving
public class IterationUtils {
    /**the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW - 1.
     * 4)This Formethod anycan recorduse emittedan byiteration thisbody operatorto intoprocess arecords feedbackin variableunbounded stream, the epoch of the emitted record =data streams.
     *
     * min(theThis epochmethod ofinvokes the iteration inputbody recordwith thatthe triggersfollowing thisparameters:
 emission, MAX_LONG - 1) +* 1.) IfThe this1st recordparameter is a emittedlist by
of input variable streams, which * onEpochLWIncremented(), thenare created as the epochunion of thisthe record = incrementedEpochLW.initial variable
     *
 streams and the corresponding *feedback Thevariable execution of the graph createdstreams (returned by the iteration body will not terminate by itself. This is because at least
     * one of its data streams is unbounded).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     * Required:
The epoch values are determined *as 1)described Allbelow. theSee initIterationListener variablefor streamshow must be boundedthe epoch values are used.
     * 21) ThereAll is at least one unbounded stream records in the datainitial variable streams listhas epoch=0.
     * 32) TheAll parallelism of any stream records in the data initialstreams variable streams must equal the parallelism ofhas epoch=MAX_LONG. In this case, records in the data stream atwon't theaffect
     * sameany indexoperator's of the feedback variable streams returned by the IterationBodylow watermark.
     *
 3) For any record *emitted @paramby initVariableStreamsthis Theoperator initialinto variable streams. These streams will be merged with the feedback variablea non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record streams= beforeincrementedEpochLW being used as the 1st parameter to invoke the iteration body- 1.
     * 4) For @paramany dataStreamsrecord Theemitted databy streams.this Theseoperator streamsinto willa befeedback usedvariable asstream, the 2ndepoch parameterof tothe invokeemitted therecord iteration=
     * min(the epoch of the input record that triggers this emission, MAX_LONG - 1) + 1. If this record is emitted body.by
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.onEpochLWIncremented(), then the epoch of this record = incrementedEpochLW.
     *
     * @return The listexecution of outputthe streamsgraph returnedcreated by the iteration boy.
     */ body will not terminate by itself. This is because at least
    static DataStreamList* iterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

one of its data streams is unbounded.
     /**
     * ThisRequired:
 method can use an iteration* body1) toAll processthe records in some bounded data init variable streams iterativelymust untilbe abounded.
     * termination2) criteriaThere is reached (e.g. the given number of rounds is completed or no further variable update isat least one unbounded stream in the data streams list.
     * needed3). BecauseThe thisparallelism methodof doesany not replaystream records in the initial datavariable streams, must equal the iterationparallelism bodyof needsthe tostream cacheat thosethe
     * recordssame inindex orderof tothe visit those records repeatedlyfeedback variable streams returned by the IterationBody.
     *
     * This method invokes the iteration body @param initVariableStreams The initial variable streams. These streams will be merged with the followingfeedback parameters:variable
     *  1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams before andbeing theused correspondingas feedbackthe variable1st streamsparameter (returnedto byinvoke the iteration body).
     * @param 2)dataStreams The 2nddata parameterstreams. isThese thestreams datawill streamsbe givenused toas thisthe method.
2nd parameter to invoke the *iteration
     * The epoch    values are determined as described below. See IterationListener for how the epoch values are usedbody.
     * @param 1)body AllThe recordscomputation inlogic thewhich initialtakes variable/data streams has epoch=0and returns variable/output streams.
     * 2) All records in the data streams has epoch=0 @return The list of output streams returned by the iteration boy.
     */
 3) For any recordstatic emittedDataStreamList by this operator into a non-feedback stream, the epoch of this emitted record = theiterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This epochmethod ofcan theuse inputan recorditeration thatbody triggersto thisprocess emission.records Ifin thissome recordbounded isdata emittedstreams by onEpochLWIncremented(), theniteratively until a
     * termination criteria is reached (e.g. the epochgiven number of this record = incrementedEpochLW - 1. rounds is completed or no further variable update is
     * 4needed). ForBecause anythis recordmethod emitteddoes bynot thisreplay operatorrecords intoin athe feedbackdata variable streamstreams, the epochiteration ofbody theneeds emittedto recordcache = thethose
     * epochrecords ofin theorder inputto recordvisit thatthose triggers this emission + 1. If this record is emitted by onEpochLWIncremented(),
     * then the epoch of this record = incrementedEpochLW.records repeatedly.
     *
     * This method invokes the iteration body with the following parameters:
     *
 1) The 1st parameter *is Supposea therelist isof ainput coordinatorvariable operatorstreams, which takesare allcreated feedbackas variablethe streams (emitted byunion of the iterationinitial body)variable
     * streams and the corresponding terminationfeedback criteriavariable streamstreams (ifreturned notby null)the asiteration inputs. The execution of the graph created by thebody).
     * iteration2) bodyThe will2nd terminateparameter whenis allthe inputdata streams havegiven beento fully AND any of the following conditions is met:this method.
     * consumed:
     * 1)The Theepoch terminationvalues criteriaare streamdetermined isas notdescribed nullbelow. AndSee theIterationListener coordinatorfor operatorhow hasthe notepoch observedvalues any new value fromare used.
     * the termination criteria stream between two consecutive onEpochLWIncremented invocations1) All records in the initial variable streams has epoch=0.
     * 2) The coordinator operatorAll records in the data streams has not observed any new value fromepoch=0.
     * 3) For any feedbackrecord variableemitted streamby betweenthis two
operator into a non-feedback stream, *the consecutiveepoch onEpochLWIncrementedof invocations.
this emitted record =  *the
     * Required:
epoch of the input record *that 1)triggers Allthis theemission. initIf variablethis streamsrecord andis theemitted data streams must be bounded.by onEpochLWIncremented(), then
     * 2) The parallelismthe epoch of anythis streamrecord in= theincrementedEpochLW initial- variable1.
 streams must equal the parallelism* of4) theFor streamany atrecord the
emitted by this operator into *a samefeedback indexvariable ofstream, the epoch feedbackof variablethe streamsemitted returnedrecord by= the IterationBody.
     *
 epoch of the input *record @paramthat initVariableStreamstriggers Thethis initialemission variable+ streams1. TheseIf streamsthis willrecord beis mergedemitted with the feedback variableby onEpochLWIncremented(),
     * then the epoch of this record = incrementedEpochLW.
     *
     * Suppose there is a coordinator operator which takes all streamsfeedback beforevariable beingstreams used as the 1st parameter to invoke(emitted by the iteration body.)
     * @param dataStreams The data streams. These streams will be used as the 2nd parameter to invoke the iterationand the termination criteria stream (if not null) as inputs. The execution of the graph created by the
     * iteration body will terminate when all input streams have been fully AND any of the following conditions is  body.met:
     * consumed:
 @param body The computation logic* which1) takesThe variable/datatermination streamscriteria andstream returnsis variable/outputnot streamsnull.
 And the coordinator operator *has @returnnot Theobserved listany ofnew outputvalue streamsfrom
 returned by the iteration boy.
* the termination criteria stream */
between two consecutive onEpochLWIncremented staticinvocations.
 DataStreamList iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams,* IterationBody2) body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until aThe coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     *
     * terminationRequired:
 criteria is reached (e.g. the* given1) numberAll ofthe roundsinit variable isstreams completedand orthe nodata furtherstreams variablemust updatebe isbounded.
     * needed2). BecauseThe thisparallelism methodof replaysany recordsstream in the initial datavariable streams, themust iterationequal bodythe doesparallelism notof needthe tostream cacheat thosethe
     * records to visit those records repeatedlysame index of the feedback variable streams returned by the IterationBody.
     *
     * This method invokes the iteration body @param initVariableStreams The initial variable streams. These streams will be merged with the followingfeedback parameters:variable
     * 1) The 1st parameter is a list of input variable streams, which are  created as the union of the initial variable
     * streams before andbeing theused correspondingas feedbackthe variable1st streamsparameter (returnedto byinvoke the iteration body).
     * @param 2)dataStreams The 2nddata parameterstreams. isThese astreams listwill ofbe replayedused dataas streams,the which2nd areparameter createdto by replayinginvoke the initial data streamsiteration
     * round  by round until the iteration terminates. The records in the Nth round will be emitted into the iterationbody.
     * @param body onlyThe ifcomputation thelogic lowwhich watermarktakes ofvariable/data thestreams firstand operatorreturns in the iteration body >= N - 1.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=0.
     * 2) The records from the initial data streams will be replayed round by round into the iteration body. The records
     * in the first round have epoch=0. And records in the Nth round have epoch = N - 1.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = thevariable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is
     * needed). Because this method replays records in the data streams, the iteration body does not need to cache those
     * records to visit those records repeatedly.
     *
 epoch of the input record* thatThis triggersmethod thisinvokes emission.the Ifiteration thisbody recordwith isthe emittedfollowing by onEpochLWIncremented(), then
parameters:
     * 1) *The the1st epochparameter ofis thisa recordlist =of incrementedEpochLWinput - 1.
     * 4) For any record emitted by this operator into a feedback stream, the epoch of the emitted record = the epochvariable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * of2) theThe input2nd recordparameter thatis triggersa thislist emission + 1. If this record is emitted by onEpochLWIncremented(), thenof replayed data streams, which are created by replaying the initial data streams
     * theround epochby ofround thisuntil recordthe =iteration incrementedEpochLW.
     *
     * Suppose there is a coordinator operator which takes all feedback variable streams (emitted byterminates. The records in the Nth round will be emitted into the iteration
     * body only if the low watermark of the first operator in the iteration body)
 >= N - 1.
 * and the termination criteria*
 stream (if not null) as inputs. * The executionepoch ofvalues theare graphdetermined createdas bydescribed the
below. See IterationListener for how *the iterationepoch bodyvalues willare terminateused.
 when all input streams have* been1) fullyAll consumed AND any ofrecords in the followinginitial conditions
variable streams    * is met:has epoch=0.
     * 12) The terminationrecords from criteriathe streaminitial isdata notstreams null.will Andbe thereplayed coordinatorround operatorby hasround notinto observedthe anyiteration newbody. valueThe fromrecords
     * in the terminationfirst criteriaround stream between two consecutive onEpochLWIncremented invocations.
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     *have epoch=0. And records in the Nth round have epoch = N - 1.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * Required:
epoch of the input record *that 1)triggers Allthis theemission. initIf variablethis streamsrecord andis theemitted data streams must be bounded.by onEpochLWIncremented(), then
     * 2)the Theepoch parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same indexthis record = incrementedEpochLW - 1.
     * 4) For any record emitted by this operator into a feedback stream, the epoch of the feedbackemitted variablerecord streams returned= by the IterationBody.epoch
     *
 of the input record *that @paramtriggers initVariableStreamsthis Theemission initial+ variable streams1. TheseIf streamsthis willrecord beis mergedemitted with the feedback variableby onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW.
     *
     * Suppose there is a coordinator operator which takes all feedback variable streams before being used as(emitted by the 1st parameter to invoke the iteration body.)
     * @param initDataStreams The initial data streams. Records from these streams will be repeatedly replayed and usedand the termination criteria stream (if not null) as inputs. The execution of the graph created by the
     * iteration body will terminate when all input streams have been fully consumed AND any of the following conditions
     * asis themet:
 2nd parameter to invoke the* iteration body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams1) The termination criteria stream is not null. And the coordinator operator has not observed any new value from
     * the termination criteria stream between two consecutive onEpochLWIncremented invocations.
     * @return2) The list of output streams returned by the iteration boy coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     */
    static DataStreamList iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody body) {...}

}

4) Add IterationBodyResult as a helper class.

IterationBodyResult is a helper class which contains the streams returned by the IterationBody::process(...)

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class that contains the streams returned by the iteration body.
 */
class IterationBodyResult {
    /**
     * A list of feedback * Required:
     * 1) All the init variable streams and the data streams must be bounded.
     * 2) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will only be usedmerged duringwith the iteration execution and willfeedback variable
     *  not be returned to the caller of the iteration body. It is assumed that the method which executes the
     * iteration body willstreams feedbefore thebeing recordsused ofas the feedback1st variableparameter streamsto back toinvoke the corresponding input variableiteration body.
     * streams.
@param initDataStreams The initial data */
streams. Records from these DataStreamList feedbackVariableStreams;

streams will be repeatedly replayed and used
     /**
      * A list of output streams. These streams will be returned to the caller of the methods that executeas the
 2nd parameter to invoke *the iteration body.
     */
 @param body The DataStreamList outputStreams;

    /**
     * An optional termination criteria stream. If this stream is not null, it will be used together with thecomputation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
 feedback variable streams tostatic determine when the iteration should terminate.
 DataStreamList iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody body) {...}

}


5) Add the DataStreamList class.

DataStreamList is a helper class that contains a list of data streams with possibly different elements types.

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

public class DataStreamList {
    */
// Returns the number of data streams in this list.
     Optional<DataStream<?>> terminationCriteria;
}

5) Add the DataStreamList as a helper class.

DataStreamList is a helper class that serves as a container for a list of data streams with possibly different elements types.

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

public class DataStreamList {
    // Returns the number of data streams in this list.
    public int size() {...}

    // Returns the data stream at the given index in this list.
    public <T> DataStream<T> get(int index) {...}
}

6) Deprecate the existing DataStream::iterate() and the DataStream::iterate(long maxWaitTimeMillis) methods.

We plan to remove both methods after the APIs added in this doc is ready for production use. This change is needed to decouple the iteration-related APIs from core Flink core runtime  so that we can keep the Flink core runtime as simple and maintainable as possible.

Proposed Changes

1) Termination of the iteration execution.

See the Java doc of those APIs in the IterationUtils for how each API determine the iteration termination.

2) Execution mode.

If all inputs are bounded streams, then the iteration body can be executed in either the stream mode or the batch mode.

If some inputs are unbounded streams, then the iteration body must be executed in the stream mode.

3) Type of edges inside the iteration body.

All edges inside the iteration body are required to have the PIPELINE type.

If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.

4) Implementation of the feedback edge.

The Flink core runtime supports only DAG of operators. Thus it does not provide native support for feedback edges since feedback edges introduce circle in the operator graph.

Same as the implementation of the DataSet::iterate() API, the proposed APIs are implemented with the following trick:

  • Automatically insert HEAD and TAIL operators as the first and last operators in the iteration body.
  • Co-locate HEAD and TAIL operators on the same task manager.
  • Have HEAD and TAIL operators transmit the records of the feedback edges using an in-memory queue.

5) Lifetime of the operators inside the iteration body.

The operator inside the iteration body are only created once and destroyed after the iteration terminates. In contrast, the existing DataSet::iterate(..) would re-create the iteration body (together with all states inside it) once for every round of execution, which in general could introduce more runtime overhead then the approach adopted in this FLIP.

6) Failover

To be described.

7) Support for synchronous iteration.

public int size() {...}

    // Returns the data stream at the given index in this list.
    public <T> DataStream<T> get(int index) {...}
}


6) Deprecate the existing DataStream::iterate() and the DataStream::iterate(long maxWaitTimeMillis) methods.

We plan to remove both methods after the APIs added in this doc is ready for production use. This change is needed to decouple the iteration-related APIs from core Flink core runtime  so that we can keep the Flink core runtime as simple and maintainable as possible.

Proposed Changes

1) Termination of the iteration execution.

See the Java doc of those APIs in the IterationUtils for how each API determine the iteration termination.

2) Execution mode.

If all inputs are bounded streams, then the iteration body can be executed in either the stream mode or the batch mode.

If some inputs are unbounded streams, then the iteration body must be executed in the stream mode.

3) Type of edges inside the iteration body.

All edges inside the iteration body are required to have the PIPELINE type.

If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.

4) Implementation of the feedback edge.

The Flink core runtime supports only DAG of operators. Thus it does not provide native support for feedback edges since feedback edges introduce circle in the operator graph.

Same as the implementation of the DataSet::iterate() API, the proposed APIs are implemented with the following trick:

  • Automatically insert HEAD and TAIL operators as the first and last operators in the iteration body.
  • Co-locate HEAD and TAIL operators on the same task manager.
  • Have HEAD and TAIL operators transmit the records of the feedback edges using an in-memory queue.

5) Lifetime of the operators inside the iteration body.

The operator inside the iteration body are only created once and destroyed after the iteration terminates. In contrast, the existing DataSet::iterate(..) would re-create the iteration body (together with all states inside it) once for every round of execution, which in general could introduce more runtime overhead then the approach adopted in this FLIP.

6) Failover

For jobs that are executed in the batch mode, this FLIP does not support failover from the middle of an iteration, i.e.. if an iterative job fails, it will start from the every first epoch of this iteration. Note that the existing DataSet::iterate(...) has the same pattern after job failover.

For jobs that are executed in the stream mode, this FLIP supports failover from the middle of an iteration, i.e. if an iterative job fails, it will be re-started from the latest epoch that has been completed before the job fails.

This failover basically re-use the existing checkpoint mechanism with the following extra work: the runtime will recognizes the records buffered on the feedback edge and include these records in the checkpoint.

7) Support for synchronous iteration.

We consider an iteration to be synchronous if there exists a global epoch such that, every subtask emits records to the feedback variable streams only after it has received records emitted to the feedback variable streams from all subtasks in the previous epoch.

The proposed API can be used to implement both synchronous and asynchronous iterative algorithms. Here are the guidelines of how to achieve these two modes respectively.

Users can do the following to implement an algorithm in the async mode:

  • Emit records to the feedback variable streams in the callback that is invoked when the IterationBody receives new records from the input variable streams.
  • Provide an empty implementation for the onEpochWatermarkIncremented(...).

Users can do the following to implement an algorithm in the sync mode:

  • In the callback that is invoked when the IterationBody receives new records from the input variable streams, do not immediately emit records to the feedback variable streams. Just buffer the inputs or update internal states.
  • Emit records to the feedback variable streams in the onEpochWatermarkIncremented(...) callback.
  • Make sure every feedback edge is part of a circle that does not involve other feedback edge.

Here is the reason why the iteration will be synchronous:

To be explained. 


8) Replay data for iteration without requiring the runtime to dump the user-provided data streams to disk.

To be explained.To be descried


Example Usages

Offline Training with Bounded Iteration

...