Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to make the following API changes to support the iteration paradigm described above. 


1) Add the IterationBody interface.

...

This interface allows users to achieve the following goals:
- Run an algorithm in sync mode, i.e. each subtask will wait for model parameters updates from all other subtasks before reading the aggregated model parameters and starting the next epoch of execution. As long as we could find a cut in the subgraph of the iteration body that all the operators in the cut only emit records in onEpochWatermarkIncremented,  the algorithm would be synchronous. The detailed proof could be found in the appendix. 
- Emit final output after the iteration terminates.

...

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

public class DataStreamList {
	public static DataStreamList of(DataStream<?>... streams);

    // Returns the number of data streams in this list.
    public int size() {...}

    // Returns the data stream at the given index in this list.
    public <T> DataStream<T> get(int index) {...}
}

...

We will start with the synchronous training. The synchronous training requires the updates from all the Train vertex subtask is merged before the next round of training. It could be done by only emit the next round of parameters on the end of round. The  The code is shown as follows:

Code Block
languagejava
linenumberstrue
public class SynchronousBoundedLinearRegression {
    private static final int N_DIM = 50;
    private static final int N_EPOCH = 5;
    private static final int N_BATCH_PER_EPOCH = 10;
    private static final OutputTag<double[]> FINAL_MODEL_OUTPUT_TAG = new OutputTag<double[]>{};

    public static void main(String[] args) {
        DataStream<double[]> initParameters = loadParameters().setParallelism(1);
        DataStream<Tuple2<double[], Double>> dataset = loadDataSet().setParallelism(1);

        intDataStreamList batchresultStreams = 5;
        int epochEachBatch = 10;

        ResultStreams resultStreams = new BoundedIteration()
     IterationUtils.iterateBoundedStreamsUntilTermination(DataStreamList.of(initParameters), DataStreamList.of(dataset), (variableStreams, dataStreams) -> {
        .withBody(new IterationBody(
       DataStream<double> parameterUpdates        @IterationInput("model") DataStream<double[]> model,
 = variableStreams.get(0);
               @IterationInput("dataset") DataStream<Tuple2<double[], Double>> dataset
            ) {= dataStreams.get(0);

                SingleOutputStreamOperator<double[]> parameters = modelparameterUpdates.process(new ParametersCacheFunction());
                DataStream<double[]> modelUpdate = parameters.setParallelism(1)
                    .broadcast()
                    .connect(dataset)
                    .coProcess(new TrainFunction())
                    .setParallelism(10)

                return new BoundedIterationDeclarationBuilderIterationBodyResult(DataStreamList.of(modelUpdate)
     , DataStreamList.of(parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG)));
               .withFeedback("model", modelUpdate)});
        
        DataStream<double[]> finalModel =  resultStreams.withOutputget("final_model", parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG)));
        finalModel.print();
    }

    public static class ParametersCacheFunction .until(new TerminationCondition(null, context -> context.getRound() >= batch * epochEachBatch))extends ProcessFunction<double[], double[]>
        implements IterationListener<double[]> {  
        .build();
        private final double[] parameters })
= new double[N_DIM];

        public void .build();processElement(double[] update, Context ctx, Collector<O> output) {
        
    // Suppose we have DataStream<double[]> finalModel = resultStreams.get("final_model");
        finalModel.print();
    }

a util to add the second array to the first.
       public static class ParametersCacheFunction extends ProcessFunction<double[], double[]> ArrayUtils.addWith(parameters, update);
        implements BoundedIterationProgressListener<double[]> {}

    
    void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector) {
        private  final double[] parametersif =(epochWatermark new< double[N_DIM];

EPOCH        public void processElement(double[] update, Context ctx, Collector<O> output* N_BATCH_PER_EPOCH) {
            // Suppose we have a util to add the second array to the first. collector.collect(parameters);
            ArrayUtils.addWith(parameters, update);}
        }

        public void onRoundEndonIterationEnd(int[] round, Context context, Collector<T> collector) {
            collectorcontext.collect(output(FINAL_MODEL_OUTPUT_TAG, parameters);
        }

    }

    public void onIterationEnd(int[] round, Context context) {
            context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
        }
    }

    public static class static class TrainFunction extends CoProcessFunction<double[], Tuple2<double[], Double>, double[]> implements BoundedIterationProgressListener<doubleIterationListener<double[]> {

        private final List<Tuple2<double[], Double>> dataset = new ArrayList<>();
        private double[] firstRoundCachedParameter;

        private Supplier<int[]> recordRoundQuerier;

        public void setCurrentRecordRoundsQuerier(Supplier<int[]> querier) {
            this.recordRoundQuerier = querier;
        } 

        public void processElement1(double[] parameter, Context context, Collector<O> output) {
            int[] round = recordRoundQuerier.get();
            if (round[0] == 0) {
                firstRoundCachedParameter = parameter;
                return;
            }

            calculateModelUpdate(parameter, output);
        }

        public void processElement2(Tuple2<double[], Double> trainSample, Context context, Collector<O> output) {
            dataset.add(trainSample)
        }

        public void onRoundEndonEpochWatermarkIncremented(int[] roundepochWatermark, Context context, Collector<T> outputcollector) {
            if (round[0]epochWatermark == 0) {
                calculateModelUpdate(firstRoundCachedParameter, output);
                firstRoundCachedParameter = null;                
            }
        }

        private void calculateModelUpdate(double[] parameters, Collector<O> output) {
            List<Tuple2<double[], Double>> samples = sample(dataset);

            double[] modelUpdate = new double[N_DIM];
            for (Tuple2<double[], Double> record : samples) {
                double diff = (ArrayUtils.muladd(record.f0, parameters) - record.f1);
                ArrayUtils.addWith(modelUpdate, ArrayUtils.multiply(record.f0, diff));
            }

            output.collect(modelUpdate);
        }
    }
}

...

Code Block
languagejava
public static class ParametersCacheFunction extends ProcessFunction<Tuple2<Integer, double[]>, Tuple2<Integer, double[]>>
    implements BoundedIterationProgressListener<double[]> {  
    
    private final double[] parameters = new double[N_DIM];

    public void processElement(Tuple2<Integer, double[]> update, Context ctx, Collector<Tuple2<Integer, double[]>> output) {
        // Suppose we have a util to add the second array to the first.
        ArrayUtils.addWith(parameters, update);
        output.collect(new Tuple2<>(update.f0, parameters))
    }

    public void onIterationEnd(int[] round, Context context) {
        context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
    }
}

Online Algorithms

Suppose now we would change the algorithm to unbounded iteration, compared to the offline, the differences is that

  1. The dataset is unbounded. The Train operator could not cache all the data in the first round.
  2. The training algorithm might be changed to others like FTRL. But we keep using SGD in this example since it does not affect showing the usage of the iteration.

We also start with the synchronous case. for online training, the Train vertex usually do one update after accumulating one mini-batch. This is to ensure the distribution of the samples is similar to the global statistics. In this example we omit the complex data re-sample process and just fetch the next several records as one mini-batch. 

The JobGraph for online training is still shown in Figure 1, with the training dataset become unbounded. Similar to the bounded cases, for the synchronous training, the process would be expected like

  1. The Parameters broadcast the initialized values on received the input values.
  2. All the Train task read the next mini-batch of records, Calculating an update and emit to the Parameters vertex. Then it would wait till received update parameters from the Parameters Vertex before it head to process the next mini-batch.
  3. The Parameter vertex would wait received the updates from all the Train tasks before it broadcast the updated parameters. 

Since in the unbounded case there is not the concept of round, and we do update per-mini-batch, thus we could instead use the InputSelectable functionality to implement the algorithm:

Code Block
languagejava
public class SynchronousUnboundedLinearRegression {
    private static final N_DIM = 50;
    private static final OutputTag<double[]> MODEL_UPDATE_OUTPUT_TAG = new OutputTag<double[]>{};

    public static void main(String[] args) {
        DataStream<double[]> initParameters = loadParameters().setParallelism(1);


public class AsynchronousBoundedLinearRegression {
    
    ...

    DataStreamList resultStreams = 
        IterationUtils.iterateBoundedStreamsUntilTermination(DataStreamList.of(initParameters), DataStreamList.of(dataset), (variableStreams, dataStreams) -> {
            DataStream<double> parameterUpdates = variableStreams.get(0);
            DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);

            SingleOutputStreamOperator<double[]> parameters = parameterUpdates.process(new ParametersCacheFunction());
            DataStream<double[]> modelUpdate = parameters.setParallelism(1)
                .partitionCustom((key, numPartitions) -> key % numPartitions, update -> update.f0)
                .connect(dataset)
                .coProcess(new TrainFunction())
            DataStream<Tuple2<double[], Double>> dataset = loadDataSet().setParallelism(110);

          ResultStreams resultStreams =return new UnboundedIteration() IterationBodyResult(DataStreamList.of(modelUpdate), DataStreamList.of(parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG)));
        });

    ...
}



Online Algorithms

Suppose now we would change the algorithm to unbounded iteration, compared to the offline, the differences is that

  1. The dataset is unbounded. The Train operator could not cache all the data in the first round.
  2. The training algorithm might be changed to others like FTRL. But we keep using SGD in this example since it does not affect showing the usage of the iteration.

We also start with the synchronous case. for online training, the Train vertex usually do one update after accumulating one mini-batch. This is to ensure the distribution of the samples is similar to the global statistics. In this example we omit the complex data re-sample process and just fetch the next several records as one mini-batch. 

The JobGraph for online training is still shown in Figure 1, with the training dataset become unbounded. Similar to the bounded cases, for the synchronous training, the process would be expected like

  1. The Parameters broadcast the initialized values on received the input values.
  2. All the Train task read the next mini-batch of records, Calculating an update and emit to the Parameters vertex. Then it would wait till received update parameters from the Parameters Vertex before it head to process the next mini-batch.
  3. The Parameter vertex would wait received the updates from all the Train tasks before it broadcast the updated parameters. 

Since in the unbounded case there is not the concept of round, and we do update per-mini-batch, thus we could instead use the InputSelectable functionality to implement the algorithm:

Code Block
languagejava
public class SynchronousUnboundedLinearRegression {
    private static final N_DIM = 50;
    private static final OutputTag<double[]> MODEL_UPDATE_OUTPUT_TAG = new OutputTag<double[]>{};

    public static void main(String[] args) {
.withBody(new IterationBody(
                @IterationInput("model") DataStream<double[]> model,
                @IterationInput("dataset") DataStream<Tuple2<double[], Double>> dataset
            ) {
                SingleOutputStreamOperator<double[]> parameters = model.process(new ParametersCacheFunction(10));
                DataStream<double[]> modelUpdate = parameters.setParallelism(1)
                    .broadcast()
                    .connect(dataset)
                    .transform(
                                "operator",
                                BasicTypeInfo.INT_TYPE_INFO,
                                new TrainOperators(50));
                 DataStream<double[]> initParameters = loadParameters().setParallelism(101);

        DataStream<Tuple2<double[], Double>> dataset      return new UnBoundedIterationDeclarationBuilder()= loadDataSet().setParallelism(1);

        DataStreamList resultStreams           .withFeedback("model", modelUpdate)
    = IterationUtils.iterateUnboundedStreams(DataStreamList.of(initParameters), DataStreamList.of(dataset), (variableStreams, dataStreams) -> {
            DataStream<double> parameterUpdates =  variableStreams.withOutput("model_update", parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG))get(0);
             DataStream<Tuple2<double[], Double>> dataset =    dataStreams.buildget(0);

            })
            .build(SingleOutputStreamOperator<double[]> parameters = model.process(new ParametersCacheFunction(10));
        
        DataStream<double[]> finalModelmodelUpdate = resultStreamsparameters.get("model_update");
setParallelism(1)
                    finalModel.addSinkbroadcast(...)
       }

    public static class ParametersCacheFunction extends ProcessFunction<double[], double[]> {  .connect(dataset)
         
        private final int numOfTrainTasks;

.transform(
         private final int numOfUpdatesReceived = 0;
        private final double[] parameters = new double[N_DIM];

        public ParametersCacheFunction(int numOfTrainTasks) {"operator",
            this.numOfTrainTasks = numOfTrainTasks;
        }

        public void processElement(double[] update, Context ctx, Collector<O> output) {
     BasicTypeInfo.INT_TYPE_INFO,
        // Suppose we have a util to add the second array to the first.
           new ArrayUtils.addWith(parameters, updateTrainOperators(50));
            numOfUpdatesReceived++;

            if (numOfUpdatesReceived == numOfTrainTasks) {.setParallelism(10);
            return    output.collect(parametersnew IterationBodyResult(DataStreamList.of(modelUpdate), DataStreamList.of(parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG)));
          })
      numOfUpdatesReceived = 0;
        DataStream<double[]> finalModel   }= resultStreams.get("model_update");
        }finalModel.addSink(...)
    }

    public static class TrainOperatorsParametersCacheFunction extends AbstractStreamOperator<double[]> implements TwoInputStreamOperator<double[], Tuple2<doubleProcessFunction<double[], Double>, double[]>, { InputSelectable {

        
        private final int miniBatchSizenumOfTrainTasks;

        private final List<Tuple2<double[], Double>> miniBatchint numOfUpdatesReceived = new ArrayList<>()0;
        private final double[] firstRoundCachedParameterparameters = new double[N_DIM];

        public TrainOperatorsParametersCacheFunction(int miniBatchSizenumOfTrainTasks) {
            this.miniBatchSizenumOfTrainTasks = miniBatchSizenumOfTrainTasks;
        }

        public void processElement1processElement(double[] parameterupdate, Context contextctx, Collector<O> output) {
            // Suppose calculateModelUpdate(parameter, output);
			miniBatchSize.clear();
        }

  we have a util to add the second array to the first.
      public void processElement2(Tuple2<double[], Double> trainSample, Context contextArrayUtils.addWith(parameters, Collector<O> output) {update);
            dataset.add(trainSample);numOfUpdatesReceived++;

        }

    if (numOfUpdatesReceived == numOfTrainTasks) public{
 InputSelection nextSelection() {
            if (miniBatchoutput.sizecollect(parameters);
 < miniBatchSize) {
             numOfUpdatesReceived = 0;
 return InputSelection.SECOND;
            }
 else {
      }
    }

    public static return InputSelection.FIRST;
            }
class TrainOperators extends AbstractStreamOperator<double[]> implements TwoInputStreamOperator<double[], Tuple2<double[], Double>, double[]>, InputSelectable {

        private final int }miniBatchSize;

        private voidfinal calculateModelUpdate(doubleList<Tuple2<double[] parameters, Collector<O>Double>> output)miniBatch {
= new ArrayList<>();
         private double[] modelUpdate = new double[N_DIM];firstRoundCachedParameter;

        public    for (Tuple2<double[], Double> record : TrainOperators(int miniBatchSize) {
            this.miniBatchSize = miniBatchSize;
  double  diff = (ArrayUtils.muladd(record.f0, parameters) - record.f1);   }

        public void processElement1(double[] parameter, Context context, Collector<O> output) {
                ArrayUtils.addWith(modelUpdate, ArrayUtils.multiply(record.f0, diff));calculateModelUpdate(parameter, output);
			miniBatchSize.clear();
        }

        public void processElement2(Tuple2<double[],  }
Double> trainSample, Context context, Collector<O> output) {
            outputdataset.collectadd(modelUpdatetrainSample);
        }

      }
}

Also similar to the bounded case, for the asynchronous training the Parameters vertex would not wait for received updates from all the Train tasks. Instead, it would directly response to the task sending update:

Code Block
languagejava
public static class ParametersCacheFunction extends ProcessFunction<Tuple2<Integer, double[]>, Tuple2<Integer, double[]>> {  
    
    private final double[] parameters = new double[N_DIM];

    public void processElement(Tuple2<Integer, double[]> update, Context ctx, Collector<Tuple2<Integer, double[]>> output) {
  public InputSelection nextSelection() {
            if (miniBatch.size() < miniBatchSize) {
                return InputSelection.SECOND;
            } else {
                return ArrayUtils.addWith(parameters, update)InputSelection.FIRST;
            }
    
    }

    if (update.f0 < 0) {
private void calculateModelUpdate(double[] parameters, Collector<O> output) {
      // Received the initialized parameter values, broadcast todouble[] allmodelUpdate the= downstream tasksnew double[N_DIM];
            for (int i = 0; i < 10; ++iTuple2<double[], Double> record : miniBatchSize) {
                output.collect(new Tuple2<>(i, parameters))   double diff = (ArrayUtils.muladd(record.f0, parameters) - record.f1);
     
            }
ArrayUtils.addWith(modelUpdate, ArrayUtils.multiply(record.f0, diff));
          } else {}

            output.collect(new Tuple2<>(update.f0, parameters))modelUpdate);
        }
    }
}

Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs.

Support the Feedback Edges

1) How the feedback edge is supported.

The Flink core runtime can only execute a DAG of operators that does not involve cycles. Thus extra work needs to be done to support feedback edges (which effectively introduces cycles in the data flow).

Similar to the existing iterative API, this FLIP plans to implement the feedback edge using the following approach:

  • Automatically insert the HEAD and the TAIL operators as the first and the last operators in the iteration body.
  • Co-locate the HEAD and the TAIL operators on the same task manager.
  • Have the HEAD and the TAIL operators transmit the records of the feedback edges using an in-memory queue data structure. 

Since all the forward edges have limited buffers, to avoid the deadlocks the feedback queues must have unlimited size. To avoid unlimited memory footprint, when the queued size exceeds a threshold the records would be spilled to the disk. To avoid such spilling the HEAD operators would read the data in a "feedback-first" manner, namely it would always process the feedback records first if there are records from both initial input and feedback edges.

2) How the termination of the iteration execution is determined.

Having the feedback edges also complicate the termination detection of the job. Since the feedback edges is not visible in the JobGraph, the HEAD operators, as the first operators in the DAG of the iteration body, would decide when the whole iteration body could be terminated and initiate the termination process. The termination happens when

  1. All the inputs to the iteration body have been finished. 
  2. AND 
    1. If users have specify a reference stream, the number of records in each epoch would be counted after the epoch is done, and if the count is 0, HEAD would start to terminate. 
    2. OR If users have not specify a reference stream, then when no records are still being processed inside the iteration body. This is detected by have a special event travel through the whole iteration body. 

The Execution Mode

Currently Flink provides two execution mode for the bounded job, name blocking mode and stream mode. The two modes differ in the edge types, scheduler policy and failover policy. For the jobs 

Dealing With Failover

4) The execution mode that is required to execute the iteration body.

  • If all inputs streams are bounded, then the iteration body can be executed in either the stream mode or the batch mode.
  • If any input stream is unbounded, then the iteration body must be executed in the stream mode.

5) The requirements of the edge types and parallelism in the IterationBody.

All edges inside the iteration body are required to have the PIPELINE type. If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.

6) Lifetime of the operators inside the iteration body.

With the approach proposed in this FLIP, the operators inside the iteration body are only created once and destroyed after the entire iteration terminates.

In comparison, the existing DataSet::iterate(..) would destroy and re-create the iteration body once for each round of iteration, which in general could introduce more runtime overhead then the approach adopted in this FLIP.

7) How an iteration can resume from the most recent completed epoch after failover.

For any job that is executed in the batch mode, the job can not start from a recent epoch after failover. In other words, if an iterative job fails, it will start from the first epoch of this iteration. Note that the existing DataSet::iterate(...) has the same behavior after job failover.

For any job that is executed in the stream mode, the job can start from a recent epoch after failover. This is achieved by re-using the existing checkpoint mechanism (only available in the stream mode) and additionally checkpointing the values buffered on the feedback edges.

8) How to implement an iterative algorithm in the sync mode.

Definition of sync-mode

An iterative algorithm is run in sync-mode if there exists global epoch, such that at the time a given operator computes its output for the Nth epoch, this operator has received exactly the following records from its input edges:

  • We define a feedback loop as a circle composed of exactly 1 feedback edge and arbitrary number of non-feedback edges.
  • If an edge is a non-feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the Nth epoch, without receiving any record for the (N+1)th epoch.
  • If an edge is a feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the (N-1)th epoch, without receiving any record for the Nth epoch.

...

Also similar to the bounded case, for the asynchronous training the Parameters vertex would not wait for received updates from all the Train tasks. Instead, it would directly response to the task sending update:

Code Block
languagejava
public static class ParametersCacheFunction extends ProcessFunction<Tuple2<Integer, double[]>, Tuple2<Integer, double[]>> {  
    
    private final double[] parameters = new double[N_DIM];

    public void processElement(Tuple2<Integer, double[]> update, Context ctx, Collector<Tuple2<Integer, double[]>> output) {
        ArrayUtils.addWith(parameters, update);
                
        if (update.f0 < 0) {
            // Received the initialized parameter values, broadcast to all the downstream tasks
            for (int i = 0; i < 10; ++i) {
                output.collect(new Tuple2<>(i, parameters))        
            }
        } else {
            output.collect(new Tuple2<>(update.f0, parameters))
        }
    }
}

Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs.

Support the Feedback Edges

1) How the feedback edge is supported.

The Flink core runtime can only execute a DAG of operators that does not involve cycles. Thus extra work needs to be done to support feedback edges (which effectively introduces cycles in the data flow).

Similar to the existing iterative API, this FLIP plans to implement the feedback edge using the following approach:

  • Automatically insert the HEAD and the TAIL operators as the first and the last operators in the iteration body.
  • Co-locate the HEAD and the TAIL operators on the same task manager.
  • Have the HEAD and the TAIL operators transmit the records of the feedback edges using an in-memory queue data structure. 

Since all the forward edges have limited buffers, to avoid the deadlocks the feedback queues must have unlimited size. To avoid unlimited memory footprint, when the queued size exceeds a threshold the records would be spilled to the disk. To avoid such spilling the HEAD operators would read the data in a "feedback-first" manner, namely it would always process the feedback records first if there are records from both initial input and feedback edges.


2) How the termination of the iteration execution is determined.

Having the feedback edges also complicate the termination detection of the job. Since the feedback edges is not visible in the JobGraph, the HEAD operators, as the first operators in the DAG of the iteration body, would decide when the whole iteration body could be terminated and initiate the termination process. The termination happens when

  1. All the inputs to the iteration body have been finished. 
  2. AND 
    1. If users have specify a reference stream, the number of records in each epoch would be counted after the epoch is done, and if the count is 0, HEAD would start to terminate. 
    2. OR If users have not specify a reference stream, then when no records are still being processed inside the iteration body. This is detected by have a special event travel through the whole iteration body. 

The Execution Mode

Currently Flink provides two execution mode for the bounded job, name blocking mode and stream mode. The two modes differ in the edge types, scheduler policy and failover policy. For the jobs with iteration, to reduce the implementation complexity of coordination and failover, we expect the whole iteration body would be always run together, thus at least initially we would not support blocking mode. 

Users may have jobs that requires preprocessing and post-process. For such jobs, currently we expected users to split the whole pipeline into multiple jobs so that users could use blocking modes for the other jobs. 

Dealing With Failure

Since currently we only support stream mode, we could rely on the checkpoint mechanism of Flink directly. The existing checkpoint mechanism does not support feedback edges, we would enhance them by also snapshotting the feedback records with the checkpoint. 

...

An iterative algorithm will be run in the sync mode if its IterationBody meets the following requirements:

  • When any operator within the IterationBody receives values from its input edges, this operator does not immediately emit records to its output.
  • Operators inside the IterationBody only compute and emit records to their outputs in the onEpochWatermarkIncremented(...) callback. The emitted records should be computed based on the values received from the input edges up to the invocation of this callback.

See the Appendix section for a proof of why the solution described above could achieve the sync-mode execution as defined above.

9) How to run an iterative algorithm without dumping all user-provided data streams to disk.

As mentioned in the motivation section, the existing DataSet::iterate() always dump the user-provided data streams to disk so that it can replay the data streams regardless of the size of those data streams. Since this is the only available API to do iteration on bounded data streams, there is no way for algorithm developer to get rid of this performance overhead.

In comparison, the iterateBoundedStreamsUntilTermination(...) method proposed in this FLIP allows users to run an iteration body without incurring this disk performance overhead. Developers have the freedom to optimize the performance based on its algorithm and data size, e.g. cache data in memory in a more compact format.

10) How to support operators (e.g. ReduceOperator) that requires bounded inputs in the IterationBody.

...

Compatibility, Deprecation, and Migration Plan

...