Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class that contains the streams returned by the iteration body.
 */
class IterationBodyResult {
    /**
     * A list of feedback variable streams. These streams will only be used during the iteration execution and will
     * not be returned to the caller of the iteration body. It is assumed that the method which executes the
     * iteration body will feed the records of the feedback variable streams back to the corresponding input variable
     * streams.
     */
    DataStreamList feedbackVariableStreams;

    /**
     * A list of output streams. These streams will be returned to the caller of the methods that execute the
     * iteration body.
     */
    DataStreamList outputStreams;

    /**
     * An optional termination criteria stream. If this stream is not null, it will be used together with the
     * feedback variable streams to determine when the iteration should terminate.
     */
    Optional<DataStream<@Nullable DataStream<?>>> terminationCriteria;
}


3) Add the IterationListener interface.

...

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;

/**
 * A helper class to create applyiterations. {@linkTo IterationBody}construct toan data streams.
 */
@PublicEvolving
public class IterationUtils {
    /*iteration, Users are required to provide
 *
 * <ul>
 *  * This<li>initVariableStreams: methodthe caninitial usevalues anof iterationthe bodyvariable todata processstreams recordswhich inwould unboundedbe data streams.
updated
 *     *
  in each round.
 * This method invokes <li>dataStreams: the other data streams used inside the iteration body with the following parameters:
     * 1) The 1st, but would not be updated.
 *   <li>iterationBody: specifies the subgraph to update the variable streams and the outputs.
 * </ul>
 *
 * <p>The iteration body will be invoked with two parameters: The first parameter is a list of input
 * variable streams, which are created as the union of the initial variable
     * streams and the
 * corresponding feedback variable streams (returned by the iteration body).
     * 2); The 2ndsecond parameter is
 * the data streams given to this method.
     *
 * <p>During the execution of iteration body, each of  * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) Allthe records involved in the iteration has an
 * epoch attached, which is mark the progress of the iteration. The epoch is computed as:
 *
 * <ul>
 *   <li>All records in the initial variable streams and initial data streams has epoch =1 0.
 *   <li>For * 2) All records in the data streams has epoch=MAX_LONG. In this case, records in the data stream won't affect
     * any operator's epoch watermark.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of thisany record emitted by this operator into a non-feedback stream, the epoch of this
 *       emitted record = the
     * epoch of the input record that triggers this emission. If this record
 *  is emitted by
   is emitted *by onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.

 *       epochWatermark.
 *  4) For<li>For any record emitted by this operator into a feedback variable stream, the epoch of the
 * emitted record =
    emitted *record = min(the epoch of the input record that triggers this emission, MAX_LONG - 1) + 1.
 If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark + 1.
     *
     * The execution of the graph created by* </ul>
 *
 * <p>The framework would given the notification at the end of each epoch for operators and UDFs
 * that implements {@link IterationListener}.
 *
 * <p>The limitation of constructing the subgraph inside the iteration body could be willrefer notin terminate{@link
 by* itselfIterationBody}.
 This*
 is* because<p>An atexample least
of the iteration is like:
 *
 one of its data streams is unbounded.
     *
     * Required:
     * 1) All the init variable streams must be bounded.
     * 2) There is at least one unbounded stream in the data streams list.
     * 3) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param dataStreams The data streams. These streams will be used as the 2nd parameter to invoke the iteration
     *                    body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is
     * needed). Because this method does not replay records in the data streams, the iteration body needs to cache those
     * records in order to visit those records repeatedly.
     *
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=1.
     * 2) All records in the data streams has epoch=1.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.
     * 4) For any record emitted by this operator into a feedback variable stream, the epoch of the emitted record = the
     * epoch of the input record that triggers this emission + 1. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark + 1.
     *
     * Suppose there is a coordinator operator which takes all feedback variable streams (emitted by the iteration body)
     * and the termination criteria stream (if not null) as inputs. The execution of the graph created by the
     * iteration body will terminate when all input streams have been fully consumed AND any of the following conditions
     * is met:
     * 1) The termination criteria stream is not null. And the coordinator operator has not observed any new value from
     * the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochWatermarkIncremented invocations.
     *
     * Required:
     * 1) All the init variable streams and the data streams must be bounded.
     * 2) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param dataStreams The data streams. These streams will be used as the 2nd parameter to invoke the iteration
     *                    body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is
     * needed). Because this method replays records in the data streams, the iteration body does not need to cache those
     * records to visit those records repeatedly.
     *
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is a list of replayed data streams, which are created by replaying the initial data streams
     * round by round until the iteration terminates. The records in the Nth round will be emitted into the iteration
     * body only if the low watermark of the first operator in the iteration body >= N - 1.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=1.
     * 2) The records from the initial data streams will be replayed round by round into the iteration body. The records
     * in the first round have epoch=1. And records in the Nth round have epoch = N* <pre>{@code
 * DataStreamList result = Iterations.iterateUnboundedStreams(
 *  DataStreamList.of(first, second),
 *  DataStreamList.of(third),
 *  (variableStreams, dataStreams) -> {
 *      ...
 *      return new IterationBodyResult(
 *          DataStreamList.of(firstFeedback, secondFeedback),
 *          DataStreamList.of(output));
 *  }
 *  result.<Integer>get(0).addSink(...);
 * }</pre>
 */
@Experimental
public class Iterations {

    /**
     * This method uses an iteration body to process records in unbounded data streams. The
     * iteration would not terminate if at least one of its inputs is unbounded. Otherwise it will
     * terminated after all the inputs are terminated and no more records are iterating.
     *
     * @param initVariableStreams The initial variable streams, which is merged with the feedback
     *     variable streams before being used as the 1st parameter to invoke the iteration body.
     * @param dataStreams The non-variable streams also refered in the {@code body}.
     * 3)@param For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = thebody The computation logic which takes variable/data streams and returns
     *  epoch of the input record that triggers this emission. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.
     * 4) For any record emitted by this operator into a feedback stream, the epoch of the emitted record = the epochfeedback/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    public static DataStreamList iterateUnboundedStreams(
            DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body){}

    /**
     * This ofmethod theuses inputan recorditeration thatbody triggersto thisprocess emissionrecords +in 1.some Ifbounded thisdata recordstreams
 is emitted by onEpochWatermarkIncremented(),
 * iteratively until a *termination thencriteria theis epoch of this record = epochWatermark + 1.reached (e.g. the given number of rounds is
     *
 completed or no further *variable Supposeupdate there is aneeded). coordinatorBecause operatorthis whichmethod takesdoes allnot feedbackreplay
 variable streams (emitted by the* iterationrecords body)
in the data streams, the *iteration andbody theneeds terminationto criteriacache streamthose (ifrecords notin null)order asto
 inputs. The execution of the* graphvisit createdthose byrecords therepeatedly.
     *
 iteration body will terminate when* all@param inputinitVariableStreams streamsThe haveinitial beenvariable fullystreams, consumedwhich ANDis anymerged ofwith the following conditionsfeedback
     *    is met:
variable streams before being used *as 1)the The1st terminationparameter criteriato streaminvoke isthe notiteration null. And the coordinator operator has not observed any new value frombody.
     * @param dataStreams The non-variable streams also refered in the {@code body}.
     * @param body theThe terminationcomputation criterialogic streamwhich betweentakes twovariable/data consecutivestreams onEpochWatermarkIncrementedand invocations.returns
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochWatermarkIncremented invocationsfeedback/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    public * Required:static DataStreamList iterateBoundedStreamsUntilTermination(
     * 1) All the init variable streams andDataStreamList theinitVariableStreams, dataDataStreamList streamsdataStreams, mustIterationBody be bounded.body) {}

     /**
 2) The parallelism of any* streamThis inmethod thecan initialuse variablean streamsiteration mustbody equalto theprocess parallelismrecords ofin thesome streambounded atdata thestreams
     * sameiteratively indexuntil ofa thetermination feedbackcriteria variableis streams returned byreached (e.g. the IterationBody.
given number of rounds  *is
     * @paramcompleted initVariableStreamsor Theno initialfurther variable streams.update Theseis streamsneeded). willBecause bethis mergedmethod withreplays therecords feedback variablein
     * the data streams, the iteration body does not need to cache those records to visit those
     * records repeatedly. This corresponds   streamsto beforethe beingbulk usediteration asin the 1stdataset. parameter
 to invoke the iteration body.*
     * @param initDataStreamsinitVariableStreams The initial datavariable streams., Recordswhich fromis thesemerged streamswith willthe befeedback
 repeatedly replayed and used
 *    * variable streams before being used as the 1st parameter to invoke the iteration body.
     * @param dataStreams The non-variable asstreams thealso 2ndrefered parameterin to invoke the iteration{@code body}.
     * @param body The computation logic which takes variable/data streams and returns variable
     *     feedback/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    public static DataStreamList iterateAndReplayBoundedStreamsUntilTermination(bulkIterateAndReplayBoundedStreamsUntilTermination(
            DataStreamList initVariableStreams, DataStreamList initDataStreamsdataStreams, IterationBody body) {...}
}


5) Add the DataStreamList class.

...