Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state:  "Under Discussion" Work-in-progress. Not ready for discussion yet.

Discussion thread: To be added

...

  • Provide solution for all the iteration use-cases (see the use-case section below for more detail) supported by the existing APIs, without having the issues described above.
  • Provide solution for a few additional use-cases (e.g. bounded streams + async mode + per-round variable update) not supported by the existing APIs.

...

  • Decouple the iteration-related APIs from core Flink core runtime (by moving them to the flink-ml repo) so that we can keep the Flink core runtime as simple and maintainable as possible.

...

Terminology

We explain a few terminologies in the following to facilitate the reading of this doc.

1) Iteration body

An iteration body is a subgraph

Besides supporting the basic iteration primitive (i.e. the feedback stream), Flink also needs APIs to support synchronization between parallel execution of the iteration body. This is needed to execute machine learning algorithms that need to be parallelized and still ensure deterministic execution results.

In the following, we describe the example use-cases and the exact semantics that we aim to support via the proposed APIs.

Terminology

We explain a few terminologies in the following to facilitate the reading of this doc.

1) Iteration body

An iteration body is a subgraph of a Flink job graph that implements the iteration logic of e.g. a machine learning algorithm. In particular, the iteration body will create feedback variable streams and emit values into those streams. The feedback variable streams will be joined with the user-provided initial variable streams and the resulting streams will be ingested into the iteration body.

2) Feedback streams


Target Use-cases

The target use-cases (i.e. algorithms) can be described w.r.t. the categories properties described below. In the following, we first describe the categoriesdefinitions of properties, followed by the combinations of those categories the property choices supported by the existing APIs and the proposed APIs, respectively.

Categories Definitions of Algorithmsproperties

Different algorithms might have different requirements required properties for the input datasets (bounded or unbounded), synchronization between parallel subtasks (sync or async), amount of data processed for every variable update (a batch/subset or the entire dataset). We describe each of these requirements properties below.

1) Algorithms have different needs for whether the input data streams should be bounded or unbounded. We classify those algorithms into online algorithm and offline algorithms as below.

...

Other algorithms only update variables every time the entire data is consumed for one round. Those algorithms fit into the second category. And such an algorithm must be offline because, by this definition, the user-provided dataset must be bounded.

Combinations of categories property choices supported by the existing APIs

...

Combinations of categories supported by the proposed APIs

The proposed APIs support 6 out of the 8 combinations of the above categories. The following 2 combinations are not supported because, by definition, As described above, there are 3 definition of properties where each property has 2 choices. So there are a total of 8 combination of property choices. Since that the "per-round variable update" can only not be used with bonded data streams.

Public Interfaces

We propose to add the following interfaces and utility classes in the flink-ml repository.

"unbounded data streams", only 6 out of the 8 choices are valid.

The APIs proposed in this FLIP support all the 6 valid combinations of property choices.

Summary

The following table summarizes the use-cases supported by the existing APIs and proposed APIs, respectively, with respect to the properties defined above.

Input typeSynchronization  modeVariable update modeSupported by the existing APIsSupported by the proposed APIs
bounded data streamssync modeper-batch variable updateNoYes
bounded data streamssync modeper-round variable updateYesYes
bounded data streamsasync modeper-batch variable updateNoYes
bounded data streamsasync modeper-round variable updateNoYes
unbounded data streamssync modeper-batch variable updateNoYes
unbounded data streamsasync modeper-batch variable updateYesYes


Overview of the Iteration Paradigm



Public Interfaces

We propose to add a few interfaces and helper classes in the flink-ml repository.  We first provide a brief description for each interface/class proposed by this FLIP, followed by their API code with detailed Java doc.

Brief description of the proposed APIs

1) Add the IterationBody interface.

As its name suggests, this interface abstracts the core computation logic that is expected to be iteratively computed. Any algorithm, which wants to apply iterative computation with feedback streams, should implement this interface.

2) Add the IterationListener interface.

If an UDF (a.k.a user-defined function) used inside the IterationBody implements this interface, the callbacks on this interface will be invoked when corresponding events happen.

This interface allows users to register callbacks that can be used to a) run an algorithm in sync mode; and b) emit final output after the iteration terminates.

3) Add the IterationUtils class.

This class provides APIs to execute an iteration body with user-specified inputs. This class provides three APIs to run an iteration body, each with different input types (e.g. bounded data streams vs. unbounded data streams) and data replay semantics (i.e. whether to replay the user-provided data streams).

The common and the primary functionality of APIs in this class is this: Union the feedback variables streams (returned by the iteration body) with the initial variable streams (provided by the user) and use the merged streams as inputs to the iteration body.

4) Add the DataStreamList and IterationBodyResult as helper classes.

DataStreamList is a helper class that serves as a container for a list of data streams with possibly different elements types. And IterationBodyResult is a helper class which contains the streams returned by the IterationBody::process(...)

Code and Java doc of the proposed APIs

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
public class DataStreamList {
    public DataStreamList(DataStream<?>... dataStreams) {...}

    // Returns the number of data streams in this list.
    public int size() {...}

    // Returns the data stream at the given index in this list.
    @SuppressWarnings("unchecked")
    public <T> DataStream<T> get(int index) {...}
}

@PublicEvolving
public interface IterationBody {
Code Block
languagejava
titleThe IterationBody API
linenumberstrue
public class DataStreamList {
    public DataStreamList(DataStream<?>... dataStreams) {...}

    // Returns the number of data streams in this list.
    public int size() {...}

    // Returns the data stream at the given index in this list.
    @SuppressWarnings("unchecked")
    public <T> DataStream<T> get(int index) {...}
}

@PublicEvolving
public interface IterationBody {
    /**
     * This method creates the graph for the iteration body.
     *
     * See Utils::iterate, Utils::iterateBoundedStreams and Utils::iterateAndReplayBoundedStreams for how the iteration
     * body can be executed and when execution of the corresponding graph should terminate.
     *
     * Required: the number of feedback variable streams returned by this method must equal the number of variable
     * streams given to this method.
     *
     * @param variableStreams the variable streams.
     * @param dataStreams the data streams.
     * @return a IterationBodyResult.
     */
    IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
}


/**
 * A helper class that contains the streams returned by the iteration body.
 */
class IterationBodyResult {
    /**
     * A list of feedback variable streams. These streams will only be used during the iteration execution and will
     * not be returned to the caller of the iteration body. It is assumed that the method which executes the
     * iteration body will feed the records of the feedback variable streams back to the corresponding input variable
     * streams.
     */
    DataStreamList feedbackVariableStreams;

    /**
     * AThis list of output streams. These streams will be returned to the caller ofmethod creates the graph for the methods that execute theiteration body.
     * iteration body.
     */
    DataStreamList outputStreams;

    /**See Utils::iterate, Utils::iterateBoundedStreams and Utils::iterateAndReplayBoundedStreams for how the iteration
     * Anbody optionalcan terminationbe criteriaexecuted stream.and Ifwhen thisexecution streamof isthe notcorresponding null,graph itshould willterminate.
 be used together with the*
     * Required: the number of feedback variable streams to determine when returned by this method must equal the iterationnumber shouldof terminate.variable
     */ streams given to this method.
    Optional<DataStream<?>> terminationCriteria;
}


/**
 * The callbacks defined below* will@param variableStreams bethe invokedvariable onlystreams.
 if the operator instance which* implements@param thisdataStreams interfacethe isdata usedstreams.
     * within@return ana iteration bodyIterationBodyResult.
     */
@PublicEvolving
public  interface IterationListener<T> {
IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
}


/**
 * A helper class *that Thiscontains callbackthe isstreams invokedreturned everyby time the epochiteration watermark incrementsbody.
 The initial epoch watermark is -1.*/
class IterationBodyResult {
     /**
     * TheA epochWatermarklist isof the maximum integer that meets this requirement: every record that arrives at the operatorfeedback variable streams. These streams will only be used during the iteration execution and will
     * goingnot forwardbe shouldreturned haveto anthe epochcaller largerof thanthe theiteration epochWatermarkbody. It Seeis Javaassumed docsthat inthe IterationUtilsmethod forwhich howexecutes epochthe
     * iteration body iswill determinedfeed forthe records ingested intoof the iterationfeedback bodyvariable andstreams forback recordsto emittedthe bycorresponding operatorsinput within thevariable
     * iteration bodystreams.
     */
    DataStreamList feedbackVariableStreams;

    /**
 If all inputs are bounded,* theA maximum epochlist of alloutput recordsstreams. ingestedThese intostreams thiswill operatorbe isreturned used asto the
 caller of the methods that *execute epochWatermarkthe
 by the last invocation of* thisiteration callbackbody.
     */
    DataStreamList *outputStreams;

 @param epochWatermark The incremented epoch watermark. /**
     * @paramAn contextoptional Atermination contextcriteria thatstream. allowsIf emittingthis sidestream output.is Thenot contextnull, isit onlywill validbe duringused thetogether invocationwith ofthe
     * feedback variable streams to determine when the iteration should terminate.
      this method.*/
    Optional<DataStream<?>> terminationCriteria;
}


/* @param collector The collector for returning result values.
     */
    void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector);

 *
 * The callbacks defined below will be invoked only if the operator instance which implements this interface is used
 * within an iteration body.
 */
@PublicEvolving
public interface IterationListener<T> {
    /**
     * This callback is invoked afterevery time the execution of the iteration body has terminatedepoch watermark increments. The initial epoch watermark is -1.
     *
     * SeeThe JavaepochWatermark docis ofthe methodsmaximum ininteger IterationUtilsthat formeets thethis termination conditions.
     *requirement: every record that arrives at the operator
     * going @paramforward contextshould Ahave contextan thatepoch allowslarger emittingthan sidethe outputepochWatermark. TheSee contextJava isdocs onlyin validIterationUtils duringfor thehow invocation ofepoch
     * is determined for records ingested into the iteration body and for records emitted by operators thiswithin method.the
     * @paramiteration collectorbody.
 The collector for returning result values.*
     */
 If all inputs void onIterationTermination(Context contextare bounded, Collector<T> collector);

    /**
     * Information available in an invocation of the callbacks defined in the IterationProgressListenerthe maximum epoch of all records ingested into this operator is used as the
     * epochWatermark by the last invocation of this callback.
     */
    interface Context* {
@param epochWatermark The incremented     /**epoch watermark.
     * @param context A *context Emitsthat a record to theallows emitting side output identified by the {@link OutputTag}.
 The context is only valid during the invocation *of
     *    * @param outputTag the {@code OutputTag} that identifies the side output to emitthis tomethod.
         * @param valuecollector The recordcollector for toreturning emitresult values.
         */
    void onEpochWatermarkIncremented(int epochWatermark,  <X> void output(OutputTag<X> outputTagContext context, XCollector<T> valuecollector);

    }
}

/**
 * A helper class to* applyThis {@linkcallback IterationBody}is toinvoked dataafter streams.
 */
@PublicEvolving
public class IterationUtils {
    /**the execution of the iteration body has terminated.
     *
 This method can use an* iterationSee bodyJava todoc processof recordsmethods in IterationUtils for unboundedthe datatermination streamsconditions.
     *
     * This@param methodcontext invokesA thecontext iterationthat bodyallows withemitting theside following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body)output. The context is only valid during the invocation of
     *                this method.
     * @param collector The collector for returning result values.
     */
 2) The 2nd parametervoid is the data streams given to this method.onIterationTermination(Context context, Collector<T> collector);

    /**
     *
 Information available in an *invocation Theof epochthe valuescallbacks aredefined determinedin asthe described belowIterationProgressListener.
 See IterationListener for how the*/
 epoch values are used.
interface Context {
   * 1) All records in the/**
 initial variable streams has epoch=0.
     * 2)Emits Alla recordsrecord into the dataside streamsoutput has epoch=MAX_LONG. In this case, records in the data stream won't affect
     * any operator's low watermark.
identified by the {@link OutputTag}.
         *
         * @param 3)outputTag Forthe any{@code recordOutputTag} emittedthat byidentifies thisthe operatorside intooutput a non-feedback stream, the epoch of this emitted record = theto emit to.
         * @param value The record to emit.
     *  epoch of the*/
 input record that triggers this emission. If this<X> recordvoid is emitted by onEpochLWIncremented(), thenoutput(OutputTag<X> outputTag, X value);
    }
}

/**
 * A thehelper epochclass ofto thisapply record{@link =IterationBody} incrementedEpochLWto -data 1streams.
 */
@PublicEvolving
public class IterationUtils {
 * 4) For any/**
 record emitted by this operator into a feedback variable stream, the epoch of the emitted record =* This method can use an iteration body to process records in unbounded data streams.
     *
     * min(theThis epochmethod ofinvokes the iteration inputbody recordwith thatthe triggersfollowing thisparameters:
 emission, MAX_LONG - 1) +* 1.) IfThe this1st recordparameter is emitteda by
list of input variable streams, which * onEpochLWIncremented(), thenare created as the epochunion of thisthe record = incrementedEpochLW.initial variable
     *
 streams and the corresponding *feedback Thevariable executionstreams of the graph created (returned by the iteration body will not terminate by itself. This is because at least
     * one of its data streams is unbounded).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     * Required:
The epoch values are  * 1) All the init variable streams must be boundeddetermined as described below. See IterationListener for how the epoch values are used.
     * 21) ThereAll is at least one unbounded stream records in the initial datavariable streams listhas epoch=0.
     * 32) TheAll parallelism of any stream records in the initialdata variable streams must equal the parallelism of the stream at thehas epoch=MAX_LONG. In this case, records in the data stream won't affect
     * sameany indexoperator's of the feedback variable streams returned by the IterationBodylow watermark.
     *
 3) For any record *emitted @paramby initVariableStreamsthis Theoperator initialinto variable streams. These streams will be merged with the feedback variablea non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record streams= beforeincrementedEpochLW being- used1.
 as the 1st parameter to* invoke4) theFor iterationany body.
record emitted by this operator *into @parama dataStreamsfeedback Thevariable datastream, streams.the Theseepoch streamsof willthe beemitted usedrecord as=
 the 2nd parameter to invoke* min(the iteration
epoch of the input record *that triggers this emission, MAX_LONG - 1) + 1. If this record is emitted by
      body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.* onEpochLWIncremented(), then the epoch of this record = incrementedEpochLW.
     *
     * @return The listexecution of outputthe streamsgraph returnedcreated by the iteration boy.
     */ body will not terminate by itself. This is because at least
    static DataStreamList* iterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

one of its data streams is unbounded.
     /**
     * ThisRequired:
 method can use an iteration* body1) toAll processthe records in some bounded data init variable streams iterativelymust untilbe abounded.
     * termination2) criteriaThere is reached (e.g. the given number of rounds is completed or no further variable update isat least one unbounded stream in the data streams list.
     * needed3). BecauseThe thisparallelism methodof doesany not replaystream records in the initial datavariable streams, must equal the iterationparallelism bodyof needsthe tostream cacheat thosethe
     * records in order to visit those records repeatedly same index of the feedback variable streams returned by the IterationBody.
     *
     * This method invokes the iteration body @param initVariableStreams The initial variable streams. These streams will be merged with the followingfeedback parameters:variable
     * 1) The 1st parameter is a list  of input variable streams, which are created as the union of the initial variable
     * streams before andbeing theused correspondingas feedbackthe variable1st streamsparameter (returnedto byinvoke the iteration body).
     * 2)@param dataStreams The 2nddata parameterstreams. isThese thestreams datawill streamsbe givenused toas thisthe method.
2nd parameter to invoke the *iteration
     *    The epoch values are determined as described below. See IterationListener for how the epoch values are usedbody.
     * @param 1)body AllThe recordscomputation inlogic thewhich initialtakes variable/data streams has epoch=0.
  and returns variable/output streams.
     * 2) All records in the data streams has epoch=0 @return The list of output streams returned by the iteration boy.
     */
 3) For any recordstatic emittedDataStreamList by this operator into a non-feedback stream, the epoch of this emitted record = theiterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * epochThis method ofcan theuse inputan recorditeration thatbody triggersto thisprocess emission.records Ifin thissome recordbounded isdata emittedstreams by onEpochLWIncremented(), theniteratively until a
     * termination criteria is reached (e.g. the epochgiven number of this record = incrementedEpochLW - 1. rounds is completed or no further variable update is
     * 4needed). ForBecause anythis recordmethod emitteddoes bynot thisreplay operatorrecords intoin athe feedbackdata variable streamstreams, the epochiteration ofbody theneeds emittedto recordcache = thethose
     * epochrecords ofin theorder inputto recordvisit thatthose triggers this emission + 1. If this record is emitted by onEpochLWIncremented(),records repeatedly.
     *
     * This thenmethod invokes the epochiteration ofbody thiswith recordthe =following incrementedEpochLW.parameters:
     *
 1) The 1st parameter *is Supposea therelist isof ainput coordinatorvariable operatorstreams, which takesare allcreated feedbackas variablethe streams (emitted byunion of the iterationinitial body)variable
     * streams and the terminationcorresponding feedback criteriavariable streamstreams (ifreturned notby null)the asiteration inputsbody).
 The execution of the graph* created2) byThe the
2nd parameter is the data *streams iterationgiven bodyto will terminate when any of the following conditions is met:this method.
     *
     * 1)The Theepoch terminationvalues criteriaare streamdetermined isas notdescribed nullbelow. AndSee theIterationListener coordinatorfor operatorhow hasthe notepoch observedvalues any new value fromare used.
     * the termination criteria stream between two consecutive onEpochLWIncremented invocations1) All records in the initial variable streams has epoch=0.
     * 2) TheAll coordinatorrecords operatorin hasthe notdata observedstreams any new value from any feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     *
     * Required:has epoch=0.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * 1)epoch Allof the init variable streams and the data streams must be bounded. input record that triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * 2) The parallelismthe epoch of anythis streamrecord in= theincrementedEpochLW initial- variable1.
 streams must equal the parallelism* of4) theFor streamany atrecord the
emitted by this operator into *a samefeedback indexvariable ofstream, the epoch feedbackof variablethe streamsemitted returnedrecord by= the IterationBody.
     *
 epoch of the input *record @paramthat initVariableStreamstriggers Thethis initialemission variable+ streams1. TheseIf streamsthis willrecord beis mergedemitted with the feedback variableby onEpochLWIncremented(),
     * then the epoch of this record = incrementedEpochLW.
     *
     * Suppose there is a coordinator operator which takes all streamsfeedback beforevariable beingstreams used as the 1st parameter to invoke(emitted by the iteration body.)
     * and @paramthe termination dataStreamscriteria Thestream data(if streams.not Thesenull) streamsas willinputs. beThe usedexecution asof the 2ndgraph parametercreated to invokeby the iteration
     * iteration body will terminate when any of the following conditions is met:
     * 1) The body.
termination criteria stream is not *null. @paramAnd bodythe Thecoordinator computationoperator logichas whichnot takesobserved variable/dataany streamsnew and returns variable/output streams.value from
     * @returnthe Thetermination listcriteria ofstream outputbetween streamstwo returnedconsecutive by the iteration boyonEpochLWIncremented invocations.
     */
 2) The coordinator operator statichas DataStreamListnot iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**observed any new value from any feedback variable stream between two
     * Thisconsecutive methodonEpochLWIncremented caninvocations.
 use an iteration body to*
 process records in some bounded data streams iteratively until a* Required:
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is1) All the init variable streams and the data streams must be bounded.
     * needed2). BecauseThe thisparallelism methodof replaysany recordsstream in the initial datavariable streams, themust iterationequal bodythe doesparallelism notof needthe tostream cacheat thosethe
     * recordssame toindex visitof thosethe recordsfeedback repeatedly.
variable streams returned by the IterationBody.
     *
     * This method invokes the iteration body @param initVariableStreams The initial variable streams. These streams will be merged with the followingfeedback parameters:variable
     *  1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams before andbeing theused correspondingas feedbackthe variable1st streamsparameter (returnedto byinvoke the iteration body).
     * @param 2)dataStreams The 2nddata parameterstreams. isThese astreams listwill ofbe replayedused dataas streams,the which2nd areparameter createdto by replayinginvoke the initialiteration
 data streams
   *  * round by round until the iteration terminates. The records in the Nth round will be emitted into the iterationbody.
     * @param body onlyThe ifcomputation thelogic lowwhich watermarktakes ofvariable/data thestreams firstand operatorreturns in the iteration body >= N - 1.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=0.variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * 2) The records fromtermination criteria is reached (e.g. the initialgiven datanumber streamsof willrounds beis replayedcompleted roundor byno roundfurther intovariable the iteration body. The recordsupdate is
     * inneeded). theBecause firstthis round have epoch=0. Andmethod replays records in the Nthdata roundstreams, havethe epochiteration =body Ndoes - 1.not need to cache those
     * 3)records Forto anyvisit recordthose emittedrecords byrepeatedly.
 this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW - 1.*
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * 4)streams Forand anythe recordcorresponding emittedfeedback byvariable thisstreams operator(returned intoby athe feedback stream, the epoch of the emitted record = the epoch
     * of the input record that triggers this emission + 1. If this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW.
     *iteration body).
     * 2) The 2nd parameter is a list of replayed data streams, which are created by replaying the initial data streams
     * round by round until the iteration terminates. The records in the Nth round will be emitted into the iteration
     * Supposebody thereonly isif athe coordinatorlow operatorwatermark of whichthe takesfirst alloperator feedbackin variablethe streamsiteration (emittedbody by>= theN iteration- body)1.
     *
 and   the termination* criteriaThe streamepoch (ifvalues notare null)determined as described inputsbelow. See TheIterationListener executionfor ofhow the graphepoch createdvalues byare theused.
     * iteration1) bodyAll willrecords terminatein whenthe anyinitial ofvariable thestreams following conditions is met:has epoch=0.
     * 12) The terminationrecords criteriafrom streamthe isinitial notdata null.streams Andwill thebe coordinatorreplayed operatorround hasby round notinto observedthe anyiteration newbody. valueThe fromrecords
     * in the terminationfirst criteriaround stream between two consecutive onEpochLWIncremented invocations.
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochLWIncremented invocations.
     *have epoch=0. And records in the Nth round have epoch = N - 1.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * Required:
epoch of the input record *that 1)triggers Allthis theemission. init variable streams and the data streams must be boundedIf this record is emitted by onEpochLWIncremented(), then
     * the epoch of this record = incrementedEpochLW - 1.
     * 24) TheFor parallelismany ofrecord anyemitted streamby inthis theoperator initialinto variablea streams must equalfeedback stream, the parallelismepoch of the emitted streamrecord at= the epoch
     * same index of the input feedbackrecord variablethat streamstriggers returnedthis byemission the+ IterationBody.
     *1. If this record is emitted by onEpochLWIncremented(), then
     * @paramthe initVariableStreamsepoch Theof initialthis variablerecord streams= incrementedEpochLW.
 These streams will be merged*
 with the feedback variable
 * Suppose there is *a coordinator operator which takes all feedback variable streams (emitted by the iteration body)
     * and the termination criteria stream (if not null) as streamsinputs. beforeThe beingexecution used asof the 1stgraph parametercreated to invokeby the iteration body.
     * @paramiteration initDataStreamsbody Thewill initialterminate datawhen streams.any Recordsof fromthe thesefollowing streamsconditions will be repeatedly replayed and usedis met:
     * 1) The termination criteria stream is not null. And the coordinator operator has not observed any new value from
     as* the termination 2ndcriteria parameterstream tobetween invoketwo theconsecutive iterationonEpochLWIncremented bodyinvocations.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * @return The list of output streams returned by the iteration boy. consecutive onEpochLWIncremented invocations.
     *
     * Required:
     */
 1) All the init staticvariable DataStreamListstreams iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody body) {...}

}

The Goals of the Iteration Library

The Iteration Body and Round Semantics

At the iteration API level, we would need the corresponding concept corresponding to Epoch and Batch. We would call processing one epoch as a round: users would specify a subgraph as the body of the iteration to specify how to calculate the update, after the iteration body process the whole dataset for one time (namely one Epoch). Apparently the round is meaningful only for the bounded cases.

Figure 1. The structure of an iteration body.

To process the inputs for multiple rounds, we would need feedback edges to emit the outputs of the last round to the inputs of the iteration body, and we union the initial inputs and the feedbacks. For bounded dataset when processing the first epoch, the data would be from the initial input edges and when processing the remaining epochs, the data would be from the feedback edges. 

There are also inputs to the iteration body that do not have feedbacks. For example, an ML algorithm might have two inputs, one is the initialized model and the other is the training data. The input corresponding to the initial model will have a feedback after each epoch about the update to the model, but the training data would not need to be updated. 

The iteration body also have one or multiple output streams. The iteration body might output records at each round, and the records emitted in all the rounds composed the final outputs.

Therefore, an iteration body composed of

  1. Variable inputs with feedbacks.
  2. Constant inputs without feedbacks.
  3. Outputs.

Synchronization

Since for the bounded dataset, all the algorithms, to the best of out extend, are all able to be converted into epoch-based algorithms, thus we could only support the synchronization between epoch, namely between rounds.

How to 

Besides, the previous DataStream and DataSet iteration APIs also have some caveats to support algorithm implementation:

  1. Lack of the support for multiple inputs, arbitrary outputs and nested iteration for both iteration APIs, which is required by scenarios like Metapath (multiple-inputs), boost algorithms (nested iteration) or when we want to output both loss and model (multiple-outputs). In the new iteration we would support these functionalities.
  2. Lack of asynchronous iteration support for the DataSet iteration, which is required by algorithms like asynchronous linear regression, in the new iterations we would support both synchronous and asynchronous modes for the bounded iteration. 
  3. The current DataSet iteration by default provides a "for each round" semantics, namely users only need to specify the computation logic in each round, and the framework would executes the subgraph multiple times until convergence. To cooperate with the semantics, the DataSet iteration framework would merge the initial input and the feedback (bulk style and delta style), and replay the datasets comes from outside of the iteration. This method is easier to use, but it also limit some possible optimizations.

...

and the data streams must be bounded.
     * 2) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param initDataStreams The initial data streams. Records from these streams will be repeatedly replayed and used
     *                        as the 2nd parameter to invoke the iteration body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody body) {...}

}





The iteration body also have one or multiple output streams. The iteration body might output records at each round, and the records emitted in all the rounds composed the final outputs.

Therefore, an iteration body composed of

  1. Variable inputs with feedbacks.
  2. Constant inputs without feedbacks.
  3. Outputs.



Overall Design

To reduce the development and maintenance overhead, it would be preferred to have a unified implementation for different types of iterations. In fact, the different iteration types shares the same requirements in runtime implementation:

...