Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For a given iteration body, a stream is said to be a feedback stream if it connects an output of this iteration body back to the input of this iteration body.

3) Epoch of records

In the proposed APIs, for any given record generated by the iteration body, we define the epoch of this record to be the number of times the iteration body has been invoked in the computation history of this record. The exact definition of epoch can be found in the Java doc of the IterationUtils class below.

...

  • The classic definition of epoch is well-defined only when the machine learning algorithm processes bounded streams AND all subtasks of the algorithm update the model variables synchronously.
  • Our definition of epoch can also be applies in the cases where the machine learning algorithm processes unbounded streams or subtasks of the algorithm update the model variables asynchronously.
  • In the cases where the class definition of epoch is well-defined (defined above), if the machine learning algorithm updates the model variables once after making a pass of the training dataset, then these two definitions of the epoch are exactly the same.

4) Epoch watermark

For any given operator, we can define the epoch watermark as the maximum integer such that every record that arrives at the operator, from any of its input edge, should have an epoch larger than the epoch watermark of this operator.

This concept of epoch watermark is used in the onEpochWatermarkIncremented to help implement iteration in sync-mode. And it is also used to determine when an iteration should terminate.


Target Use-cases

The target use-cases (i.e. algorithms) can be described w.r.t. the properties described below. In the following, we first define those properties, followed by the combinations of the property choices supported by the existing APIs and the proposed APIs, respectively.

...

The figure below summarizes the iteration paradigm described above. The streams in the red color are inputs provided by the user to the iteration body, as well as outputs emitted by the iteration body to the user.

Public InterfacesOriginal

We propose to make the following API changes to support the iteration paradigm described above.

...

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class to apply {@link IterationBody} to data streams.
 */
@PublicEvolving
public class IterationUtils {
    /**
     * This method can use an iteration body to process records in unbounded data streams.
     *
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=1.
     * 2) All records in the data streams has epoch=MAX_LONG. In this case, records in the data stream won't affect
     * any operator's epoch watermark.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.
     * 4) For any record emitted by this operator into a feedback variable stream, the epoch of the emitted record =
     * min(the epoch of the input record that triggers this emission, MAX_LONG - 1) + 1. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark + 1.
     *
     * The execution of the graph created by the iteration body will not terminate by itself. This is because at least
     * one of its data streams is unbounded.
     *
     * Required:
     * 1) All the init variable streams must be bounded.
     * 2) There is at least one unbounded stream in the data streams list.
     * 3) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param dataStreams The data streams. These streams will be used as the 2nd parameter to invoke the iteration
     *                    body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is
     * needed). Because this method does not replay records in the data streams, the iteration body needs to cache those
     * records in order to visit those records repeatedly.
     *
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=1.
     * 2) All records in the data streams has epoch=1.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.
     * 4) For any record emitted by this operator into a feedback variable stream, the epoch of the emitted record = the
     * epoch of the input record that triggers this emission + 1. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark + 1.
     *
     * Suppose there is a coordinator operator which takes all feedback variable streams (emitted by the iteration body)
     * and the termination criteria stream (if not null) as inputs. The execution of the graph created by the
     * iteration body will terminate when all input streams have been fully consumed AND any of the following conditions
     * is met:
     * 1) The termination criteria stream is not null. And the coordinator operator has not observed any new value from
     * the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochWatermarkIncremented invocations.
     *
     * Required:
The execution of the graph *created 1)by Allthe theiteration initbody variablewill streamsterminate andwhen theall datarecords streamsfrom mustthe beinitial bounded.data
     * 2)streams Theand parallelismthe offeedback anystreams streamhave inbeen theprocessed initialby variablethe streamsiteration mustbody equalAND the parallelismany of the streamfollowing atconditions theis
     * met:
    same index* of1) theThe feedbacktermination variablecriteria streamsstream returnedis bynot thenull IterationBody.
AND no more record will *
be emitted to this stream.
 * @param initVariableStreams The initial* variable2) streams.No Thesemore streamsrecord will be mergedemitted withto the feedback variablestreams.
     *
     * See FLIP-176 for more details regarding how the runtime determines the iteration termination.
     *
     streams before being used as* Required:
     * 1) All the 1stinit parametervariable tostreams invokeand the iteration body data streams must be bounded.
     * 2) The @paramparallelism dataStreamsof Theany datastream streams.in Thesethe streamsinitial willvariable bestreams usedmust asequal the 2ndparallelism of parameterthe tostream invokeat the iteration
     * same index of the feedback variable streams returned by the IterationBody.
         body.*
     * @param bodyinitVariableStreams The computation logic which takes variable/data streams and returns variable/output streams. initial variable streams. These streams will be merged with the feedback variable
     * @return The list of output streams returned by the iteration boy.
                       */
    staticstreams DataStreamListbefore iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**being used as the 1st parameter to invoke the iteration body.
     * This@param methoddataStreams canThe usedata anstreams. iterationThese bodystreams towill processbe recordsused inas somethe bounded2nd dataparameter streamsto iterativelyinvoke untilthe aiteration
     *   termination criteria is reached (e.g. the given number of rounds is completed or no further variable update isbody.
     * needed).@param Becausebody thisThe methodcomputation replayslogic recordswhich intakes the variable/data streams, theand iterationreturns body does not need to cache those
     * records to visit those records repeatedlyvariable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static *DataStreamList This method invokes the iteration body with the following parameters:iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * 1)This Themethod 1stcan parameteruse isan aiteration listbody ofto inputprocess variablerecords streams,in whichsome arebounded createddata asstreams theiteratively unionuntil ofa
 the initial variable
  * termination criteria *is streams and the corresponding feedback variable streams (returned by the iteration body).reached (e.g. the given number of rounds is completed or no further variable update is
     * 2needed). TheBecause 2ndthis parametermethod isreplays arecords listin of replayedthe data streams, whichthe areiteration createdbody bydoes replayingnot theneed initialto datacache streamsthose
     * round by round until the iteration terminates. The records in the Nth round will be emitted intorecords to visit those records repeatedly.
     *
     * This method invokes the iteration
 body with the following *parameters:
 body only if the low* watermark1) ofThe the1st firstparameter operatoris ina thelist iterationof bodyinput >=variable Nstreams, -which 1.
are created as the union *
of the initial variable
  * The epoch values* arestreams determinedand asthe describedcorresponding below.feedback Seevariable IterationListenerstreams for(returned howby the epoch values are usediteration body).
     * 12) AllThe records2nd inparameter theis initiala variablelist streamsof has epoch=1.
     * 2) The records from the initial data streams will be replayedreplayed data streams, which are created by replaying the initial data streams
     * round by round intountil the iteration bodyterminates. The records in the Nth round will be emitted into the iteration
     * body only inif the firstlow roundwatermark have epoch=1. And recordsof the first operator in the Nthiteration roundbody have>= epochN =- N1.
     *
   3) For any* recordThe emittedepoch byvalues thisare operatordetermined intoas a non-feedback stream, the epoch of this emitted record = thedescribed below. See IterationListener for how the epoch values are used.
     * epoch1) ofAll therecords inputin the recordinitial thatvariable triggersstreams thishas emission. If this record is emitted byepoch=1.
     * 2) The records from the initial data streams will be replayed round by round into the iteration body. The records
     * onEpochWatermarkIncremented(), thenin the first round have epoch=1. And records in the epochNth ofround thishave recordepoch = epochWatermarkN.
     * 43) For any record emitted by this operator into a non-feedback stream, the epoch of thethis emitted record = the epoch
     * epoch of the input record that triggers this emission + 1. If this record is emitted by onEpochWatermarkIncremented(),
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark + 1.
     *
 4) For any record emitted *by Supposethis thereoperator isinto a coordinatorfeedback operatorstream, whichthe takesepoch allof feedbackthe variableemitted streams (emitted byrecord = the iteration body)epoch
     * andof the terminationinput criteriarecord streamthat (iftriggers notthis null)emission as+ inputs1. TheIf executionthis ofrecord theis graph createdemitted by theonEpochWatermarkIncremented(),
     * iterationthen bodythe willepoch terminateof whenthis allrecord input= streamsepochWatermark have been fully consumed AND any of the following conditions+ 1.
     *
     * isThe met:
execution of the graph created *by 1)the Theiteration terminationbody criteriawill streamterminate iswhen notall null.records Andfrom the initial data
 coordinator operator has not observed* anystreams newand valuethe from
feedback streams have been processed *by the terminationiteration body criteriaAND streamany betweenof twothe consecutivefollowing onEpochWatermarkIncrementedconditions invocations.is
     * 2)met:
 The coordinator operator has not* observed1) anyThe newtermination valuecriteria fromstream anyis feedbacknot variablenull streamAND betweenno two
more record will be emitted *to consecutivethis onEpochWatermarkIncremented invocationsstream.
     *
 2) No more record *will Required:
 be emitted to the feedback streams.
     *
     * See FLIP-176 for more details regarding how the runtime determines the iteration termination.
     *
     * Required:
     * 1) All the init variable streams and the data streams must be bounded.
     * 2) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param initDataStreams The initial data streams. Records from these streams will be repeatedly replayed and used
     *                        as the 2nd parameter to invoke the iteration body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody body) {...}
}

...

2) How the termination of the iteration execution is determined.

We will add a coordinator operator which takes all feedback variable streams (emitted by the iteration body) and the termination criteria stream (if not null) as inputs. The As mentioned in the iterateBoundedStreamsUntilTermination and iterateAndReplayBoundedStreamsUntilTermination Java doc, the execution of the graph created by the iteration the iteration body will terminate when all input records from the initial data streams and the feedback streams have been fully consumed processed by the iteration body AND any of the following conditions is conditions is met:

  • The termination criteria stream is not null . And the coordinator operator has not observed any new value from the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
  • The coordinator operator has not observed any new value from any feedback variable stream between two consecutive onEpochWatermarkIncremented invocations.
  • AND no more record will be emitted to this stream.
  • No more record will be emitted to the feedback streams.

Flink will use the following solution to determine that no more record will be emitted to the feedback streams:

  • Insert a HEAD operator at the beginning of the iteration body. This operator takes all feedback streams as inputs.
  • At any given time t, the HEAD operator maintains the following statistics:
    • num_records_t: the number of records that this operator has processed up to time t.
    • max_epoch_t: the maximum epoch of the records that this operator has ever received up to time t.
    • epoch_watermark_t: the maximum integer such that every record that arrives at this operator after time t should have an epoch > epoch_watermark_t.
  • Flink will look for two timestamp t1 and t2 that meets all of the following conditions:
    • t1 < t2 AND num_records_t1 == num_records_t2. In other words, this operator has received no records from time t1 to time t2.
    • max_epoch_t1 < epoch_watermark_t2. In other words, all inflight records (buffered in edges or in operators) at the time t1 must have been processed at time t2. This is because the epoch of inflight records at time t1 must be in the range (epoch_watermark_t1, max_epoch_t1 + 1]. And the epoch of inflight records at time t2 must be in the range (epoch_watermark_t2, max_epoch_t2 + 1]. We can guarantee that these two ranges don't overlap as long as max_epoch_t1 < epoch_watermark_t2.
  • The two conditions described above, if satisfied, indicate that no more record will be emitted to the HEAD operator (and thus the feedback streams). Here are the reasons:
    • All inflight records at time t1 has been processed but no record has been generated since t1. Thus there is no inflight record at time t2.
    • Since the epoch of every operator in the iteration body has incremented at least once,  onEpochWatermarkIncremented(...) must have been invoked at least once for every operator, which gives operators' the chance to emit buffered records.

And here is how Flink can determine the epoch watermark for any given operator:

  • Flink ML library creates a new control flow message called EndOfEpoch_i. This control flow message is only used within the IterationBody. And it is parameterized with an integer i.
  • The following invariant will be guaranteed for this control flow message:  For any given stream e and any integer i, the epoch of every record on the stream e after the EndOfEpoch_i message must be larger than i.
  • The EndOfEpoch_i will be emitted using the following rules throughout the graph of the iteration body:
    • For any user-provided bounded data streams, the EndOfEpoch_i message(s) will be sent immediately after the last message with epoch i.
    • For any non-feedback stream outputted by operator_a inside the iteration body, EndOfEpoch_i will be emitted to this stream once operator_a has received EndOfEpoch_i on all its input edges AND the corresponding callback has been invoked.
    • For any feedback stream outputted by operator_a inside the iteration body, EndOfEpoch_{i+1} will be emitted to this stream once operator_a has received EndOfEpoch_i on all its input edges AND the corresponding callback has been invoked
  • For any given operator, it keeps track of the watermark for each of its input edge based on parameter of the latest EndOfEpoch message from that input edge. The watermark of this operator is the minimum watermark across all its input edges.


TODO: discuss how to determine termination based on the termination criteria if there are still more records emitted to the feedback streamsTODO: explain how this is implemented.


3) Cyclic Flow Control

TODO: explain it. "feedback-first" or "feedback-first when feedback queue buffers are utilized"?

...