Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We plan to remove both methods after the APIs added in this doc is ready for production use. This change is needed to decouple the iteration-related APIs from core Flink core runtime  so that we can keep the Flink core runtime as simple and maintainable as possible.

Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs.

Support the Feedback Edges

1) How the feedback edge is supported.

The Flink core runtime can only execute a DAG of operators that does not involve cycles. Thus extra work needs to be done to support feedback edges (which effectively introduces cycles in the data flow).

Similar to the existing iterative API, this FLIP plans to implement the feedback edge using the following approach:

  • Automatically insert the HEAD and the TAIL operators as the first and the last operators in the iteration body.
  • Co-locate the HEAD and the TAIL operators on the same task manager.
  • Have the HEAD and the TAIL operators transmit the records of the feedback edges using an in-memory queue data structure. 

Since all the forward edges have limited buffers, to avoid the deadlocks the feedback queues must have unlimited size. To avoid unlimited memory footprint, when the queued size exceeds a threshold the records would be spilled to the disk. To avoid such spilling the HEAD operators would read the data in a "feedback-first" manner, namely it would always process the feedback records first if there are records from both initial input and feedback edges.

2) How the termination of the iteration execution is determined.

Having the feedback edges also complicate the termination detection of the job. Since the feedback edges is not visible in the JobGraph, the HEAD operators, as the first operators in the DAG of the iteration body, would decide when the whole iteration body could be terminated and initiate the termination process. The termination happens when

  1. All the inputs to the iteration body have been finished. 
  2. AND 
    1. If users have specify a reference stream, the number of records in each epoch would be counted after the epoch is done, and if the count is 0, HEAD would start to terminate. 
    2. OR If users have not specify a reference stream, then when no records are still being processed inside the iteration body. This is detected by have a special event travel through the whole iteration body. 

4) The execution mode that is required to execute the iteration body.

  • If all inputs streams are bounded, then the iteration body can be executed in either the stream mode or the batch mode.
  • If any input stream is unbounded, then the iteration body must be executed in the stream mode.

5) The requirements of the edge types and parallelism in the IterationBody.

All edges inside the iteration body are required to have the PIPELINE type. If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.

6) Lifetime of the operators inside the iteration body.

With the approach proposed in this FLIP, the operators inside the iteration body are only created once and destroyed after the entire iteration terminates.

In comparison, the existing DataSet::iterate(..) would destroy and re-create the iteration body once for each round of iteration, which in general could introduce more runtime overhead then the approach adopted in this FLIP.

7) How an iteration can resume from the most recent completed epoch after failover.

For any job that is executed in the batch mode, the job can not start from a recent epoch after failover. In other words, if an iterative job fails, it will start from the first epoch of this iteration. Note that the existing DataSet::iterate(...) has the same behavior after job failover.

For any job that is executed in the stream mode, the job can start from a recent epoch after failover. This is achieved by re-using the existing checkpoint mechanism (only available in the stream mode) and additionally checkpointing the values buffered on the feedback edges.

8) How to implement an iterative algorithm in the sync mode.

Definition of sync-mode

An iterative algorithm is run in sync-mode if there exists global epoch, such that at the time a given operator computes its output for the Nth epoch, this operator has received exactly the following records from its input edges:

  • We define a feedback loop as a circle composed of exactly 1 feedback edge and arbitrary number of non-feedback edges.
  • If an edge is a non-feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the Nth epoch, without receiving any record for the (N+1)th epoch.
  • If an edge is a feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the (N-1)th epoch, without receiving any record for the Nth epoch.

...

Example Usage

This sections shows how general used ML algorithms could be implemented with the iteration API. 

Offline Training with Bounded Iteration

We would like to first show the usage of the bounded iteration with the linear regression case: the model is Y = XA, and we would like to acquire the best estimation of A with the SGD algorithm. To simplify we assume the parameters could be held in the memory of one task.

The job graph of the algorithm could be shown in the Figure 3: in each round, we use the latest parameters to calculate the update to the parameters: ΔA = ∑(Y - XA)X. To achieve this, the Parameters vertex would broadcast the latest parameters to the Train vertex. Each subtask of the Train vertex holds a part of dataset. Follow the sprite of SGD, it would sample a small batch of training records, and calculate the update with the above equation. Then the Train vertex emit ΔA to the Parameters node to update the parameters.


draw.io Diagram
bordertrue
diagramNamesync_lr
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth591
revision4

Figure 3. The JobGraph for the offline training of the linear regression case.


We will start with the synchronous training. The synchronous training requires the updates from all the Train vertex subtask is merged before the next round of training. It could be done by only emit the next round of parameters on the end of round. The code is shown as follows:

Code Block
languagejava
linenumberstrue
public class SynchronousBoundedLinearRegression {
    private static final N_DIM = 50;
    private static final OutputTag<double[]> FINAL_MODEL_OUTPUT_TAG = new OutputTag<double[]>{};

    public static void main(String[] args) {
        DataStream<double[]> initParameters = loadParameters().setParallelism(1);
        DataStream<Tuple2<double[], Double>> dataset = loadDataSet().setParallelism(1);

        int batch = 5;
        int epochEachBatch = 10;

        ResultStreams resultStreams = new BoundedIteration()
            .withBody(new IterationBody(
                @IterationInput("model") DataStream<double[]> model,
                @IterationInput("dataset") DataStream<Tuple2<double[], Double>> dataset
            ) {
                SingleOutputStreamOperator<double[]> parameters = model.process(new ParametersCacheFunction());
                DataStream<double[]> modelUpdate = parameters.setParallelism(1)
                    .broadcast()
                    .connect(dataset)
                    .coProcess(new TrainFunction())
                    .setParallelism(10)

                return new BoundedIterationDeclarationBuilder()
                    .withFeedback("model", modelUpdate)
                    .withOutput("final_model", parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG))
                    .until(new TerminationCondition(null, context -> context.getRound() >= batch * epochEachBatch))
                    .build();
            })
            .build();
        
        DataStream<double[]> finalModel = resultStreams.get("final_model");
        finalModel.print();
    }

    public static class ParametersCacheFunction extends ProcessFunction<double[], double[]>
        implements BoundedIterationProgressListener<double[]> {  
        
        private final double[] parameters = new double[N_DIM];

        public void processElement(double[] update, Context ctx, Collector<O> output) {
            // Suppose we have a util to add the second array to the first.
            ArrayUtils.addWith(parameters, update);
        }

        public void onRoundEnd(int[] round, Context context, Collector<T> collector) {
            collector.collect(parameters);
        }

        public void onIterationEnd(int[] round, Context context) {
            context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
        }
    }

    public static class TrainFunction extends CoProcessFunction<double[], Tuple2<double[], Double>, double[]> implements BoundedIterationProgressListener<double[]> {

        private final List<Tuple2<double[], Double>> dataset = new ArrayList<>();
        private double[] firstRoundCachedParameter;

        private Supplier<int[]> recordRoundQuerier;

        public void setCurrentRecordRoundsQuerier(Supplier<int[]> querier) {
            this.recordRoundQuerier = querier;
        } 

        public void processElement1(double[] parameter, Context context, Collector<O> output) {
            int[] round = recordRoundQuerier.get();
            if (round[0] == 0) {
                firstRoundCachedParameter = parameter;
                return;
            }

            calculateModelUpdate(parameter, output);
        }

        public void processElement2(Tuple2<double[], Double> trainSample, Context context, Collector<O> output) {
            dataset.add(trainSample

...

An iterative algorithm will be run in the sync mode if its IterationBody meets the following requirements:

  • When any operator within the IterationBody receives values from its input edges, this operator does not immediately emit records to its output.
  • Operators inside the IterationBody only compute and emit records to their outputs in the onEpochWatermarkIncremented(...) callback. The emitted records should be computed based on the values received from the input edges up to the invocation of this callback.

See the Appendix section for a proof of why the solution described above could achieve the sync-mode execution as defined above.

9) How to run an iterative algorithm without dumping all user-provided data streams to disk.

As mentioned in the motivation section, the existing DataSet::iterate() always dump the user-provided data streams to disk so that it can replay the data streams regardless of the size of those data streams. Since this is the only available API to do iteration on bounded data streams, there is no way for algorithm developer to get rid of this performance overhead.

In comparison, the iterateBoundedStreamsUntilTermination(...) method proposed in this FLIP allows users to run an iteration body without incurring this disk performance overhead. Developers have the freedom to optimize the performance based on its algorithm and data size, e.g. cache data in memory in a more compact format.

10) How to support operators (e.g. ReduceOperator) that requires bounded inputs in the IterationBody.

TODO: explain it.

Examples

This sections shows how general used ML algorithms could be implemented with the iteration API. 

Offline Training with Bounded Iteration

We would like to first show the usage of the bounded iteration with the linear regression case: the model is Y = XA, and we would like to acquire the best estimation of A with the SGD algorithm. To simplify we assume the parameters could be held in the memory of one task.

The job graph of the algorithm could be shown in the Figure 3: in each round, we use the latest parameters to calculate the update to the parameters: ΔA = ∑(Y - XA)X. To achieve this, the Parameters vertex would broadcast the latest parameters to the Train vertex. Each subtask of the Train vertex holds a part of dataset. Follow the sprite of SGD, it would sample a small batch of training records, and calculate the update with the above equation. Then the Train vertex emit ΔA to the Parameters node to update the parameters.

...

Figure 3. The JobGraph for the offline training of the linear regression case.

We will start with the synchronous training. The synchronous training requires the updates from all the Train vertex subtask is merged before the next round of training. It could be done by only emit the next round of parameters on the end of round. The code is shown as follows:

Code Block
languagejava
linenumberstrue
public class SynchronousBoundedLinearRegression {
    private static final N_DIM = 50;
    private static final OutputTag<double[]> FINAL_MODEL_OUTPUT_TAG = new OutputTag<double[]>{};

    public static void main(String[] args) {
        DataStream<double[]> initParameters = loadParameters().setParallelism(1);
        DataStream<Tuple2<double[], Double>> dataset = loadDataSet().setParallelism(1);

        int batch = 5;
        int epochEachBatch = 10;

        ResultStreams resultStreams = new BoundedIteration()
        }

    .withBody(new IterationBody(
       public void onRoundEnd(int[] round, Context context, Collector<T> output) {
            if @IterationInput("model") DataStream<double[]> model,(round[0] == 0) {
                @IterationInput("dataset") DataStream<Tuple2<double[], Double>> dataset
calculateModelUpdate(firstRoundCachedParameter, output);
             ) {
  firstRoundCachedParameter = null;            SingleOutputStreamOperator<double[]> parameters = model.process(new ParametersCacheFunction());
            }
    DataStream<double[]>  modelUpdate = parameters.setParallelism(1) }

        private void calculateModelUpdate(double[] parameters, Collector<O> output) {
       .broadcast()
     List<Tuple2<double[], Double>> samples = sample(dataset);

            .connect(dataset)
    double[] modelUpdate = new double[N_DIM];
            for (Tuple2<double[], Double> record .coProcess(new TrainFunction()): samples) {
                double diff = (ArrayUtils.muladd(record.f0, parameters) - record.setParallelism(10f1);

                return new BoundedIterationDeclarationBuilder()
ArrayUtils.addWith(modelUpdate, ArrayUtils.multiply(record.f0, diff));
            }

            output.withFeedbackcollect("model", modelUpdate);
        }
            .withOutput("final_model", parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG))
                    .until(new TerminationCondition(null, context -> context.getRound() >= batch * epochEachBatch))
                    .build();}
}

If instead we want to do asynchronous training, we would need to do the following change:

  1. The Parameters vertex would not wait till round end to ensure received all the updates from the iteration. Instead, it would immediately output the current parameters values once it received the model update from one train subtask.
  2. To label the source of the update, we would like to change the input type to be Tuple2<Integer, double[]>. The Parameters would only output the new parameters values to the Train task that send the update.

We omit the change to the graph building code since the change is trivial (change the output type and the partitioner to be customized one). The change to the Parameters vertex is the follows:

Code Block
languagejava
public static class ParametersCacheFunction extends ProcessFunction<Tuple2<Integer, double[]>, Tuple2<Integer, double[]>>
    implements BoundedIterationProgressListener<double[]> {  
    
    private final double[] parameters = new double[N_DIM];

    public void processElement(Tuple2<Integer, double[]> update, Context ctx, Collector<Tuple2<Integer, double[]>> output) {
        // Suppose we have })
a util to add the second array to the    .build();first.
        
        DataStream<double[]> finalModel = resultStreams.get("final_model"ArrayUtils.addWith(parameters, update);
        finalModel.print();output.collect(new Tuple2<>(update.f0, parameters))
    }

    public static class ParametersCacheFunction extends ProcessFunction<doublevoid onIterationEnd(int[] round, double[]>
  Context context) {
      implements  context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
    }
}

Online Training with Unbounded Iteration

Suppose now we would change the algorithm to unbounded iteration, compared to the offline, the differences is that

  1. The dataset is unbounded. The Train operator could not cache all the data in the first round.
  2. The training algorithm might be changed to others like FTRL. But we keep using SGD in this example since it does not affect showing the usage of the iteration.

We also start with the synchronous case. for online training, the Train vertex usually do one update after accumulating one mini-batch. This is to ensure the distribution of the samples is similar to the global statistics. In this example we omit the complex data re-sample process and just fetch the next several records as one mini-batch. 

The JobGraph for online training is still shown in Figure 1, with the training dataset become unbounded. Similar to the bounded cases, for the synchronous training, the process would be expected like

  1. The Parameters broadcast the initialized values on received the input values.
  2. All the Train task read the next mini-batch of records, Calculating an update and emit to the Parameters vertex. Then it would wait till received update parameters from the Parameters Vertex before it head to process the next mini-batch.
  3. The Parameter vertex would wait received the updates from all the Train tasks before it broadcast the updated parameters. 

Since in the unbounded case there is not the concept of round, and we do update per-mini-batch, thus we could instead use the InputSelectable functionality to implement the algorithm:

Code Block
languagejava
public class SynchronousUnboundedLinearRegression {
    private static final N_DIM = 50;
    private static final OutputTag<double[]> MODEL_UPDATE_OUTPUT_TAG = new OutputTag<double[]>{};

    public static void main(String[] argsBoundedIterationProgressListener<double[]> {  
        
        private final double[] parameters = new double[N_DIM];

        public void processElement(double[] update, Context ctx, Collector<O> output) {
            // Suppose we have a util to add the second array to the first.
            ArrayUtils.addWith(parameters, update);
        }

        public void onRoundEnd(int[] round, Context context, Collector<T> collector) {
            collector.collect(parameters);
        }

        public void onIterationEnd(int[] round, Context context) {
        DataStream<double[]> initParameters =  context.output(FINAL_MODEL_OUTPUT_TAG, parametersloadParameters().setParallelism(1);
        }
    }DataStream<Tuple2<double[], Double>> dataset = loadDataSet().setParallelism(1);

    public static class TrainFunction extends CoProcessFunction<double[], Tuple2<double[], Double>, double[]> implements BoundedIterationProgressListener<double[]> {

ResultStreams resultStreams = new UnboundedIteration()
          private final List<Tuple2<double[], Double>> dataset = new ArrayList<>(); .withBody(new IterationBody(
        private double[] firstRoundCachedParameter;

        private Supplier<int@IterationInput("model") DataStream<double[]> recordRoundQuerier;

model,
              public void setCurrentRecordRoundsQuerier(Supplier<int@IterationInput("dataset") DataStream<Tuple2<double[]>, querier)Double>> {dataset
            this.recordRoundQuerier = querier;) {
        } 

       SingleOutputStreamOperator<double[]> publicparameters void= processElement1(double[] parameter, Context context, Collector<O> output) {
model.process(new ParametersCacheFunction(10));
                 intDataStream<double[]> roundmodelUpdate = recordRoundQuerierparameters.getsetParallelism(1);
                 if (round[0] == 0.broadcast() {
                firstRoundCachedParameter = parameter;
  .connect(dataset)
              return;
      .transform(
      }

            calculateModelUpdate(parameter, output);
        }

     "operator",
   public void processElement2(Tuple2<double[], Double> trainSample, Context context, Collector<O> output) {
            dataset.add(trainSample)
        }
BasicTypeInfo.INT_TYPE_INFO,
        public void onRoundEnd(int[] round, Context context, Collector<T> output) {
            if (round[0] == 0) {
   new TrainOperators(50));
                    calculateModelUpdate(firstRoundCachedParameter, output.setParallelism(10);

                firstRoundCachedParameterreturn = null;new UnBoundedIterationDeclarationBuilder()
                
    .withFeedback("model", modelUpdate)
       }
        }

        private void calculateModelUpdate(double[] parameters, Collector<O> output) {
.withOutput("model_update", parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG))
                List<Tuple2<double[], Double>> samples = sample.build(dataset);

            double[] modelUpdate = new double[N_DIM];})
            for (Tuple2<double[], Double> record : samples) {.build();
        
        DataStream<double[]> finalModel = resultStreams.get("model_update");
     double diff = finalModel.addSink(ArrayUtils..muladd(record.f0, parameters) - record.f1);.)
    }

    public static class ParametersCacheFunction extends ProcessFunction<double[], double[]> {  
      ArrayUtils.addWith(modelUpdate, ArrayUtils.multiply(record.f0, diff));
        private final int  }numOfTrainTasks;

        private final int numOfUpdatesReceived  output.collect(modelUpdate)= 0;
        }
private final double[] parameters }
}

If instead we want to do asynchronous training, we would need to do the following change:

  1. The Parameters vertex would not wait till round end to ensure received all the updates from the iteration. Instead, it would immediately output the current parameters values once it received the model update from one train subtask.
  2. To label the source of the update, we would like to change the input type to be Tuple2<Integer, double[]>. The Parameters would only output the new parameters values to the Train task that send the update.

We omit the change to the graph building code since the change is trivial (change the output type and the partitioner to be customized one). The change to the Parameters vertex is the follows:

Code Block
languagejava
public static class ParametersCacheFunction extends ProcessFunction<Tuple2<Integer, double[]>, Tuple2<Integer, double[]>>
    implements BoundedIterationProgressListener<double[]> {  
    
    private final double[] parameters = new double[N_DIM];

    public void processElement(Tuple2<Integer, double[]> update, Context ctx, Collector<Tuple2<Integer, double[]>> output) {
= new double[N_DIM];

        public ParametersCacheFunction(int numOfTrainTasks) {
            this.numOfTrainTasks = numOfTrainTasks;
        }

        public void processElement(double[] update, Context ctx, Collector<O> output) {
            // Suppose we have a util to add the second array to the first.
            ArrayUtils.addWith(parameters, update);
            numOfUpdatesReceived++;

            if (numOfUpdatesReceived == numOfTrainTasks) {
                output.collect(parameters);
                numOfUpdatesReceived // Suppose we have a util to add the second array to the first.= 0;
            }
        ArrayUtils.addWith(parameters, update);}
    }

    output.collect(new Tuple2<>(update.f0, parameters))
    }

    public void onIterationEnd(int[] round, Context context) {
public static class TrainOperators extends AbstractStreamOperator<double[]> implements TwoInputStreamOperator<double[], Tuple2<double[], Double>, double[]>, InputSelectable {

        private  context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
    }
}

Online Training with Unbounded Iteration

Suppose now we would change the algorithm to unbounded iteration, compared to the offline, the differences is that

  1. The dataset is unbounded. The Train operator could not cache all the data in the first round.
  2. The training algorithm might be changed to others like FTRL. But we keep using SGD in this example since it does not affect showing the usage of the iteration.

We also start with the synchronous case. for online training, the Train vertex usually do one update after accumulating one mini-batch. This is to ensure the distribution of the samples is similar to the global statistics. In this example we omit the complex data re-sample process and just fetch the next several records as one mini-batch. 

The JobGraph for online training is still shown in Figure 1, with the training dataset become unbounded. Similar to the bounded cases, for the synchronous training, the process would be expected like

  1. The Parameters broadcast the initialized values on received the input values.
  2. All the Train task read the next mini-batch of records, Calculating an update and emit to the Parameters vertex. Then it would wait till received update parameters from the Parameters Vertex before it head to process the next mini-batch.
  3. The Parameter vertex would wait received the updates from all the Train tasks before it broadcast the updated parameters. 
final int miniBatchSize;

        private final List<Tuple2<double[], Double>> miniBatch = new ArrayList<>();
        private double[] firstRoundCachedParameter;

        public TrainOperators(int miniBatchSize) {
            this.miniBatchSize = miniBatchSize;
        }

        public void processElement1(double[] parameter, Context context, Collector<O> output) {
            calculateModelUpdate(parameter, output);
			miniBatchSize.clear();
        }

        public void processElement2(Tuple2<double[], Double> trainSample, Context context, Collector<O> output) {
            dataset.add(trainSample);
        }

        public InputSelection nextSelection() {
            if (miniBatch.size() < miniBatchSize) {
                return InputSelection.SECOND;
            } else {
                return InputSelection.FIRST;
            }
        }

        private void calculateModelUpdate(double[] parameters, Collector<O> output) {
            double[] modelUpdate = new double[N_DIM];
            for (Tuple2<double[], Double> record : miniBatchSize) {
                double diff = (ArrayUtils.muladd(record.f0, parameters) - record.f1);
                ArrayUtils.addWith(modelUpdate, ArrayUtils.multiply(record.f0, diff));
            }

            output.collect(modelUpdate);
        }
    }
}

Also similar to the bounded case, for the asynchronous training the Parameters vertex would not wait for received updates from all the Train tasks. Instead, it would directly response to the task sending updateSince in the unbounded case there is not the concept of round, and we do update per-mini-batch, thus we could instead use the InputSelectable functionality to implement the algorithm:

Code Block
languagejava
public static class SynchronousUnboundedLinearRegression {
    private static final N_DIM = 50;ParametersCacheFunction extends ProcessFunction<Tuple2<Integer, double[]>, Tuple2<Integer, double[]>> {  
    
    private static final OutputTag<doubledouble[]> MODEL_UPDATE_OUTPUT_TAG parameters = new OutputTag<doubledouble[N_DIM]>{};

    public static void main(StringprocessElement(Tuple2<Integer, double[]> args) {
        DataStream<double[]> initParameters = loadParameters().setParallelism(1);update, Context ctx, Collector<Tuple2<Integer, double[]>> output) {
        DataStream<Tuple2<double[], Double>> dataset = loadDataSet().setParallelism(1);
ArrayUtils.addWith(parameters, update);
        ResultStreams resultStreams = new UnboundedIteration()
    
        .withBody(newif IterationBody(
update.f0 < 0) {
             @IterationInput("model") DataStream<double[]> model,
        // Received the initialized parameter values, broadcast to all the downstream tasks
        @IterationInput("dataset") DataStream<Tuple2<double[], Double>> dataset
 for (int i = 0; i <     10; ++i) {
                SingleOutputStreamOperator<double[]> parameters = model.processoutput.collect(new ParametersCacheFunction(10Tuple2<>(i, parameters));
        
         DataStream<double[]> modelUpdate = parameters.setParallelism(1) }
        } else {
            output.broadcastcollect()
                    .connect(datasetnew Tuple2<>(update.f0, parameters))
                    .transform(}
                                "operator",
                                BasicTypeInfo.INT_TYPE_INFO,
                                new TrainOperators(50));
                    .setParallelism(10);

                return new UnBoundedIterationDeclarationBuilder()
                    .withFeedback("model", modelUpdate)
                    .withOutput("model_update", parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG))
                    .build();
            })
            .build();
        
        DataStream<double[]> finalModel = resultStreams.get("model_update");
        finalModel.addSink(...)
    }

    public static class ParametersCacheFunction extends ProcessFunction<double[], double[]> {  
        
        private final int numOfTrainTasks;

        private final int numOfUpdatesReceived = 0;
        private final double[] parameters = new double[N_DIM];

        public ParametersCacheFunction(int numOfTrainTasks) {
            this.numOfTrainTasks = numOfTrainTasks;
        }

        public void processElement(double[] update, Context ctx, Collector<O> output) {
            // Suppose we have a util to add the second array to the first.
            ArrayUtils.addWith(parameters, update);
            numOfUpdatesReceived++;

            if (numOfUpdatesReceived == numOfTrainTasks) {
                output.collect(parameters);
                numOfUpdatesReceived = 0;
            }
        }
    }

    public static class TrainOperators extends AbstractStreamOperator<double[]> implements TwoInputStreamOperator<double[], Tuple2<double[], Double>, double[]>, InputSelectable {

        private final int miniBatchSize;

        private final List<Tuple2<double[], Double>> miniBatch = new ArrayList<>();
        private double[] firstRoundCachedParameter;

        public TrainOperators(int miniBatchSize) {
            this.miniBatchSize = miniBatchSize;
        }

        public void processElement1(double[] parameter, Context context, Collector<O> output) {
            calculateModelUpdate(parameter, output);
			miniBatchSize.clear();
        }

        public void processElement2(Tuple2<double[], Double> trainSample, Context context, Collector<O> output) {
            dataset.add(trainSample);
        }

        public InputSelection nextSelection() {
            if (miniBatch.size() < miniBatchSize) {
                return InputSelection.SECOND;
            } else {
                return InputSelection.FIRST;
            }
        }

        private void calculateModelUpdate(double[] parameters, Collector<O> output) {
            double[] modelUpdate = new double[N_DIM];
            for (Tuple2<double[], Double> record : miniBatchSize) {
                double diff = (ArrayUtils.muladd(record.f0, parameters) - record.f1);
                ArrayUtils.addWith(modelUpdate, ArrayUtils.multiply(record.f0, diff));
            }

            output.collect(modelUpdate);
        }
    }
}

Also similar to the bounded case, for the asynchronous training the Parameters vertex would not wait for received updates from all the Train tasks. Instead, it would directly response to the task sending update:

Code Block
languagejava
public static class ParametersCacheFunction extends ProcessFunction<Tuple2<Integer, double[]>, Tuple2<Integer, double[]>> {  
    
    private final double[] parameters = new double[N_DIM];

    public void processElement(Tuple2<Integer, double[]> update, Context ctx, Collector<Tuple2<Integer, double[]>> output) {
        ArrayUtils.addWith(parameters, update);
                
        if (update.f0 < 0) {
            // Received the initialized parameter values, broadcast to all the downstream tasks
            for (int i = 0; i < 10; ++i) {
                output.collect(new Tuple2<>(i, parameters))        
            }
        } else {
            output.collect(new Tuple2<>(update.f0, parameters))
        }
    }
}

Implementation Plan

Logically all the iteration types would support both BATCH and STREAM execution mode. However, according to the algorithms' requirements, we would implement 

  1. Unbounded iteration + STREAM mode.
  2. Bounded iteration + BATCH mode.

Currently we do not see requirements on Bounded iteration + STREAM mode, if there are additional requirement in the future we would implement this mode, and it could also be supported with the current framework. 

Compatibility, Deprecation, and Migration Plan

The API is added as a library inside flink-ml repository, thus it does not have compatibility problem. However, it has some difference with the existing iteration API and the algorithms would need some re-implementation.

For the long run, the new iteration implementation might provide an alternative for the iteration functionality, and we may consider deprecating and removing the existing API to reduce the complexity of core flink code. 

Rejected Alternatives

Naiad has proposed a unified model for watermark mechanism (namely progress tracking outside of the iteration) and the progress tracking inside the iteration. It extends the event time and watermark to be a vector (long timestamp, int[] rounds) and implements a vectorized alignment algorithm. Although Naiad provides an elegant model, the direct implementation on Flink would requires a large amount of modification to the flink runtime, which would cause a lot of complexity and maintenance overhead.  Thus we would choose to implement a simplified version on top of FLINK, as a part of the flink-ml library.

...

}
}


Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs.

Support the Feedback Edges

1) How the feedback edge is supported.

The Flink core runtime can only execute a DAG of operators that does not involve cycles. Thus extra work needs to be done to support feedback edges (which effectively introduces cycles in the data flow).

Similar to the existing iterative API, this FLIP plans to implement the feedback edge using the following approach:

  • Automatically insert the HEAD and the TAIL operators as the first and the last operators in the iteration body.
  • Co-locate the HEAD and the TAIL operators on the same task manager.
  • Have the HEAD and the TAIL operators transmit the records of the feedback edges using an in-memory queue data structure. 

Since all the forward edges have limited buffers, to avoid the deadlocks the feedback queues must have unlimited size. To avoid unlimited memory footprint, when the queued size exceeds a threshold the records would be spilled to the disk. To avoid such spilling the HEAD operators would read the data in a "feedback-first" manner, namely it would always process the feedback records first if there are records from both initial input and feedback edges.


2) How the termination of the iteration execution is determined.

Having the feedback edges also complicate the termination detection of the job. Since the feedback edges is not visible in the JobGraph, the HEAD operators, as the first operators in the DAG of the iteration body, would decide when the whole iteration body could be terminated and initiate the termination process. The termination happens when

  1. All the inputs to the iteration body have been finished. 
  2. AND 
    1. If users have specify a reference stream, the number of records in each epoch would be counted after the epoch is done, and if the count is 0, HEAD would start to terminate. 
    2. OR If users have not specify a reference stream, then when no records are still being processed inside the iteration body. This is detected by have a special event travel through the whole iteration body. 

The Execution Mode

Currently Flink 


Dealing With Failover


4) The execution mode that is required to execute the iteration body.

  • If all inputs streams are bounded, then the iteration body can be executed in either the stream mode or the batch mode.
  • If any input stream is unbounded, then the iteration body must be executed in the stream mode.


5) The requirements of the edge types and parallelism in the IterationBody.

All edges inside the iteration body are required to have the PIPELINE type. If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.


6) Lifetime of the operators inside the iteration body.

With the approach proposed in this FLIP, the operators inside the iteration body are only created once and destroyed after the entire iteration terminates.

In comparison, the existing DataSet::iterate(..) would destroy and re-create the iteration body once for each round of iteration, which in general could introduce more runtime overhead then the approach adopted in this FLIP.


7) How an iteration can resume from the most recent completed epoch after failover.

For any job that is executed in the batch mode, the job can not start from a recent epoch after failover. In other words, if an iterative job fails, it will start from the first epoch of this iteration. Note that the existing DataSet::iterate(...) has the same behavior after job failover.

For any job that is executed in the stream mode, the job can start from a recent epoch after failover. This is achieved by re-using the existing checkpoint mechanism (only available in the stream mode) and additionally checkpointing the values buffered on the feedback edges.


8) How to implement an iterative algorithm in the sync mode.

Definition of sync-mode

An iterative algorithm is run in sync-mode if there exists global epoch, such that at the time a given operator computes its output for the Nth epoch, this operator has received exactly the following records from its input edges:

  • We define a feedback loop as a circle composed of exactly 1 feedback edge and arbitrary number of non-feedback edges.
  • If an edge is a non-feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the Nth epoch, without receiving any record for the (N+1)th epoch.
  • If an edge is a feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the (N-1)th epoch, without receiving any record for the Nth epoch.


Solution to run an iterative algorithm in the sync mode

An iterative algorithm will be run in the sync mode if its IterationBody meets the following requirements:

  • When any operator within the IterationBody receives values from its input edges, this operator does not immediately emit records to its output.
  • Operators inside the IterationBody only compute and emit records to their outputs in the onEpochWatermarkIncremented(...) callback. The emitted records should be computed based on the values received from the input edges up to the invocation of this callback.

See the Appendix section for a proof of why the solution described above could achieve the sync-mode execution as defined above.


9) How to run an iterative algorithm without dumping all user-provided data streams to disk.

As mentioned in the motivation section, the existing DataSet::iterate() always dump the user-provided data streams to disk so that it can replay the data streams regardless of the size of those data streams. Since this is the only available API to do iteration on bounded data streams, there is no way for algorithm developer to get rid of this performance overhead.

In comparison, the iterateBoundedStreamsUntilTermination(...) method proposed in this FLIP allows users to run an iteration body without incurring this disk performance overhead. Developers have the freedom to optimize the performance based on its algorithm and data size, e.g. cache data in memory in a more compact format.


10) How to support operators (e.g. ReduceOperator) that requires bounded inputs in the IterationBody.

TODO: explain it.

Compatibility, Deprecation, and Migration Plan

Deprecation plan

The following APIs will be deprecated and removed in the future Flink release:

  • The entire DataSet class. See FLIP-131 for its motivation and the migration plan. The deprecation of DataSet::iterate(...) proposed by this FLIP is covered by FLIP-131.
  • The DataStream::iterate(...) and DataStream::iterate(long).

Compatibility

  • This FLIP introduces backward in-compatible changes by proposing to remove DataStream::iterate(...) in the future.
  • We expect the APIs proposed in this FLIP can address most of the use-cases supported by the DataStream::iterate(...) and DataSet::iterate(...). The only use-cases that we have dropped support are those that require nested iteration on bounded data streams in sync mode. We have made this choice because we are not aware of any reasonable use-cases that require nested iteration. This support can be added if any user provide a good use-case for nested iteration.

Migration plan

  • Users will need to re-write their application code in order to migrate from the existing iterative APIs to the proposed APIs.

Rejected Alternatives

Naiad has proposed a unified model for watermark mechanism (namely progress tracking outside of the iteration) and the progress tracking inside the iteration. It extends the event time and watermark to be a vector (long timestamp, int[] rounds) and implements a vectorized alignment algorithm. Although Naiad provides an elegant model, the direct implementation on Flink would requires a large amount of modification to the flink runtime, which would cause a lot of complexity and maintenance overhead.  Thus we would choose to implement a simplified version on top of FLINK, as a part of the flink-ml library.

For the iteration DAG build graph, it would be more simpler if we could directly refer to the data stream variables outside of the closure of iteration body. However, since we need to make the iteration DAG creation first happen in the mock execution environment, we could not use these variables directly, otherwise we would directly modify the real environment and won't have chance to add wrappers to the operators. 

The existing FLIP does not explicitly support nested iterations. This is because we have not seen clear use-case that require nested iteration. We would prefer to only introduce additional complexity that are required by some reasonable use-cases, In the future if we decide to support nested iterations, we will need extra APIs to run the iteration body, with the semantics that operators inside the iteration body will be re-created for every round of iteration.

Appendix

1) In the following, we prove that the proposed solution can be used to implement an iterative algorithm in the sync mode.

Refer to the "Proposed Changes" section for the definition of sync mode and the description of the solution. In the following, we prove that the solution does work as expected.

Proof

In the following, we will prove that the solution described above could enforce the sync-mode execution. Note that the calculation of the record's epoch and the semantics of onEpochWatermarkIncremented(...) are described in the Java doc of the corresponding APIs.

Lemma-1: For any operator OpB defined in the IterationBody, at the time its Nth invocation of onEpochWatermarkIncremented(...) starts, it is guaranteed that:

  • If an input edge is a non-feedback edge from OpA, then OpA's Nth invocation of onEpochWatermarkIncremented(...) has been completed.
  • If an input edge is a feedback edge from OpA, then OpA's (N-1)th invocation of onEpochWatermarkIncremented(...) has been completed.

Let's prove the lemma-1 by contradiction:

  • At the time the OpB's Nth invocation starts, its epoch watermark has incremented to N, which means OpB will no longer receive any record with epoch <= N.
  • Suppose there is a non-feedback edge from OpA AND OpA's Nth invocation has not been completed. Then when OpA's Nth invocation completes, OpA can generate a record with epoch=N and send it to OpB via this non-feedback edge, which contradicts the guarantee described above.
  • Suppose there is a feedback edge from OpA AND OpA's (N-1)th invocation has not been completed. Then when OpA's (N-1)th invocation completes, OpA can generate a record with epoch=N and send it to OpB via this feedback edge, which contradicts the guarantee described above.

Lemma-2: For any operator OpB defined in the IterationBody, at the time its Nth invocation of onEpochWatermarkIncremented(...) starts, it is guaranteed that:

  • If an edge is a non-feedback input edge from OpA and this edge is part of a feedback loop, then OpA's (N+1)th invocation of onEpochWatermarkIncremented(...) has not started.
  • If an edge is a feedback input edge from OpA and this edge is part of a feedback loop, then OpA's Nth invocation of onEpochWatermarkIncremented(...) has not started.

Let's prove this lemma by contradiction:

  • Suppose there is a non-feedback edge from OpA, this edge is part of a feedback loop, and OpA's (N+1)th invocation has started. Since this non-feedback edge is part of a feedback loop, there is a backward path from OpA to OpB with exactly 1 feedback edge on this path. By applying the lemma-1 recursively for operators on this path, we can tell that OpB's Nth invocation has been completed. This contradicts the assumption that OpB's Nth invocation just started.
  • Suppose there is a feedback edge from OpA, this edge is part of a feedback loop, and OpA's Nth invocation has started. Since this feedback edge is part of feedback loop, there is a backward path from OpA to OpB with no feedback edge on this path. By applying lemma-1 recursively for operators on this path, we can tell that OpB's Nth invocation has been completed. This contradicts the assumption that OpB's Nth invocation just started.

Let's now prove that the sync-mode is achieved:

  • For any operator in the IterationBody, we define its output for the Nth epoch as the output emitted by the Nth invocation of onEpochWatermarkIncremented(). This definition is well-defined because operators only emit records in onEpochWatermarkIncremented().
  • At the time an operator OpB computes its output for the Nth epoch, this operator must have received exactly the following records from its input edges:
    • Suppose an edge is a non-feedback input edge from OpA and this edge is part of a feedback loop. It follows that OpA has emitted records for its Nth epoch (by lemma-1) and has not started to emit records for its (N+1)th epoch (by lemma-2).
    • Suppose an edge is a feedback input edge from OpA and this edge is part of a feedback loop. It follows that OpA has emitted records for its (N-1)th epoch (by lemma-1) and has not started to emit records for its Nth epoch (by lemma-2).