Status
...
Page properties | |
---|---|
|
...
...
JIRA: <TODO>
This method is easier to use, but it also limit some possible optimizations
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
[This FLIP proposal is a joint work between Yun Gao, Dong Lin and Zhipeng Zhang]
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.iteration; /** * A helper class to create iterations. To construct an iteration, Users are required to provide * * <ul> * <li>initVariableStreams: the initial values of the variable data streams which would be updated * in each round. * <li>dataStreams: the other data streams used inside the iteration, but would not be updated. * <li>iterationBody: specifies the subgraph to update the variable streams and the outputs. * </ul> * * <p>The iteration body will be invoked with two parameters: The first parameter is a list of input * variable streams, which are created as the union of the initial variable streams and the * corresponding feedback variable streams (returned by the iteration body); The second parameter is * the data streams given to this method. * * <p>During the execution of iteration body, each of the records involved in the iteration has an * epoch attached, which is mark the progress of the iteration. The epoch is computed as: * * <ul> * <li>All records in the initial variable streams and initial data streams has epoch = 0. * <li>For any record emitted by this operator into a non-feedback stream, the epoch of this * emitted record = the epoch of the input record that triggers this emission. If this record * is emitted by onEpochWatermarkIncremented(), then the epoch of this record = * epochWatermark. * <li>For any record emitted by this operator into a feedback variable stream, the epoch of the * emitted record = the epoch of the input record that triggers this emission + 1. * </ul> * * <p>The framework would given the notification at the end of each epoch for operators and UDFs * that implements {@link IterationListener}. * * <p>The limitation of constructing the subgraph inside the iteration body could be refer in {@link * IterationBody}. * * <p>An example of the iteration is like: * * <pre>{@code * DataStreamList result = Iterations.iterateUnboundedStreams( * DataStreamList.of(first, second), * DataStreamList.of(third), * (variableStreams, dataStreams) -> { * ... * return new IterationBodyResult( * DataStreamList.of(firstFeedback, secondFeedback), * DataStreamList.of(output)); * } * result.<Integer>get(0).addSink(...); * }</pre> */ public class Iterations { /** * This method uses an iteration body to process records in possibly unbounded data streams. The * iteration would not terminate if at least one of its inputs is unbounded. Otherwise it will * terminated after all the inputs are terminated and no more records are iterating. * * @param initVariableStreams The initial variable streams, which is merged with the feedback * variable streams before being used as the 1st parameter to invoke the iteration body. * @param dataStreams The non-variable streams also refered in the {@code body}. * @param body The computation logic which takes variable/data streams and returns * feedback/output streams. * @return The list of output streams returned by the iteration boy. */ public static DataStreamList iterateUnboundedStreams( DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) { return null; } /** * This method uses an iteration body to process records in some bounded data streams * iteratively until a termination criteria is reached (e.g. the given number of rounds is * completed or no further variable update is needed). Because this method does not replay * records in the data streams, the iteration body needs to cache those records in order to * visit those records repeatedly. * * @param initVariableStreams The initial variable streams, which is merged with the feedback * variable streams before being used as the 1st parameter to invoke the iteration body. * @param dataStreams The non-variable streams also refered in the {@code body}. * @param config The config for the iteration, like whether to re-create the operator on each * round. * @param body The computation logic which takes variable/data streams and returns * feedback/output streams. * @return The list of output streams returned by the iteration boy. */ public static DataStreamList iterateBoundedStreamsUntilTermination( DataStreamList initVariableStreams, ReplayableDataStreamList dataStreams, IterationConfig config, IterationBody body) { return null; } } |
5) Introduce the PerRoundSubGraphBuilder forEachRound utility method.
PerRoundSubgraphBuilder forEachRound allows the users to specify a sub-graph that executes as per-round mode, namely all the operators would be re-created for each round.
Code Block | ||
---|---|---|
| ||
public interface IterationBody { .... /** * Constructs a subgraph inside the iteration body that all the operators would have a lifecycle * of {@link org.apache.flink.iteration.IterationConfig.OperatorLifeCycle#PER_ROUND}. */ class PerRound { /** * @param inputs The inputs of the subgraph. * @param perRoundSubBody The computational logic that want to be executed as per-round. * @return The output of the subgraph. */ public static DataStreamList forEachRound( DataStreamList inputs, PerRoundSubBody perRoundSubBody) { return null; } } /** The sub-graph inside the iteration body that should be executed as per-round. */ interface PerRoundSubBody { DataStreamList process(DataStreamList input); } } |
6) Add the DataStreamList and ReplayableDataStreamList class.
...
Code Block | ||
---|---|---|
| ||
DataStreamList resultStreams =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(initParameters),
ReplayableDataStreamList.notReplay(dataset),
IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
(variableStreams, dataStreams) -> {
DataStream<double[]> parameterUpdates = variableStreams.get(0);
DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);
SingleOutputStreamOperator<double[]> parameters = parameterUpdates.process(new ParametersCacheFunction());
DataStream<double[]> modelUpdate = parameters.setParallelism(1)
.broadcast()
.connect(dataset)
.coProcess(new TrainFunction())
.setParallelism(10)
DataStream<double[]> reduced = PerRoundGraphBuilder.forEachRound(DataStreamList.of(modelUpdate), streams -> {
return streams.<double[]>get(0).windowAll().reduce((x, y) -> ArrayUtils.add(x, y));
}).<double[]>get(0);
return new IterationBodyResult(DataStreamList.of(modelUpdate), DataStreamList.of(parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG)));
}); |
...