THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.ml.iteration; import org.apache.flink.annotation.Experimental; import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper; /** * A helper class to create iterations. To construct an iteration, Users are required to provide * * <ul> * <li>initVariableStreams: the initial values of the variable data streams which would be updated * in each round. * <li>dataStreams: the other data streams used inside the iteration, but would not be updated. * <li>iterationBody: specifies the subgraph to update the variable streams and the outputs. * </ul> * * <p>The iteration body will be invoked with two parameters: The first parameter is a list of input * variable streams, which are created as the union of the initial variable streams and the * corresponding feedback variable streams (returned by the iteration body); The second parameter is * the data streams given to this method. * * <p>During the execution of iteration body, each of the records involved in the iteration has an * epoch attached, which is mark the progress of the iteration. The epoch is computed as: * * <ul> * <li>All records in the initial variable streams and initial data streams has epoch = 0. * <li>For any record emitted by this operator into a non-feedback stream, the epoch of this * emitted record = the epoch of the input record that triggers this emission. If this record * is emitted by onEpochWatermarkIncremented(), then the epoch of this record = * epochWatermark. * <li>For any record emitted by this operator into a feedback variable stream, the epoch of the * emitted record = the epoch of the input record that triggers this emission + 1. * </ul> * * <p>The framework would given the notification at the end of each epoch for operators and UDFs * that implements {@link IterationListener}. * * <p>The limitation of constructing the subgraph inside the iteration body could be refer in {@link * IterationBody}. * * <p>An example of the iteration is like: * * <pre>{@code * DataStreamList result = Iterations.iterateUnboundedStreams( * DataStreamList.of(first, second), * DataStreamList.of(third), * (variableStreams, dataStreams) -> { * ... * return new IterationBodyResult( * DataStreamList.of(firstFeedback, secondFeedback), * DataStreamList.of(output)); * } * result.<Integer>get(0).addSink(...); * }</pre> */ @Experimental public class Iterations { /** * This method uses an iteration body to process records in unbounded data streams. The * iteration would not terminate if at least one of its inputs is unbounded. Otherwise it will * terminated after all the inputs are terminated and no more records are iterating. * * @param initVariableStreams The initial variable streams, which is merged with the feedback * variable streams before being used as the 1st parameter to invoke the iteration body. * @param dataStreams The non-variable streams also refered in the {@code body}. * @param body The computation logic which takes variable/data streams and returns * feedback/output streams. * @return The list of output streams returned by the iteration boy. */ public static DataStreamList iterateUnboundedStreams( DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body){} /** * This method uses an iteration body to process records in some bounded data streams * iteratively until a termination criteria is reached (e.g. the given number of rounds is * completed or no further variable update is needed). Because this method does not replay * records in the data streams, the iteration body needs to cache those records in order to * visit those records repeatedly. * * @param initVariableStreams The initial variable streams, which is merged with the feedback * variable streams before being used as the 1st parameter to invoke the iteration body. * @param dataStreams The non-variable streams also refered in the {@code body}. * @param body The computation logic which takes variable/data streams and returns * feedback/output streams. * @return The list of output streams returned by the iteration boy. */ public static DataStreamList iterateBoundedStreamsUntilTermination( DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {} /** * This method can use an iteration body to process records in some bounded data streams * iteratively until a termination criteria is reached (e.g. the given number of rounds is * completed or no further variable update is needed). Because this method replays records in * the data streams, the iteration body does not need to cache those records to visit those * records repeatedly. This corresponds to the bulk iteration in the dataset. * * @param initVariableStreams The initial variable streams, which is merged with the feedback * variable streams before being used as the 1st parameter to invoke the iteration body. * @param dataStreams The non-variable streams also refered in the {@code body}. * @param body The computation logic which takes variable/data streams and returns * feedback/output streams. * @return The list of output streams returned by the iteration boy. */ public static DataStreamList bulkIterateAndReplayBoundedStreamsUntilTerminationiterateAndReplayBoundedStreamsUntilTermination( DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {} } |
...