Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Previously Flink supported bounded iteration with DataSet API and supported the unbounded iteration with DataStream API. However, since Flink aims to deprecate the DataSet API and the iteration in the DataStream API is rather incomplete, thus we would require to re-implement a new iteration library in the Flink-ml repository to support the algorithms. 

The Goals

The Types of the Algorithms

In general a ML algorithm would update the model according to the data in iteration until the model is converged. According to the granularity of the dataset used to update the model, in general ML algorithms could be classified into two types:

  1. Epoch-based: each epoch means the algorithm goes through all the training dataset. The epoch-based algorithm must work with the bounded dataset. 
  2. Batch-based: each batch means the algorithm samples a subset from all the records and used the sampled records to update the model. The batch-based algorithm could be work with the bounded or unbounded dataset.

In a distributed settings, the dataset might be partitioned onto multiple subtasks, then for each epoch we refer to each subtask goes through all the assigned records, and for batch we refer to each subtask sample from its assigned records. When update the model with the referred data in a distributed settings, there are further two styles:

  1. Synchronous: In synchronous pattern the model must wait for the updates from all the subtasks before it could be used in the computation of the next update in each subtask. 
  2. Asynchronous: In asynchronous pattern the model could directly apply the updates from some subtasks, and uses the updated value in the following computation immediately. 

In general the synchronous pattern would have higher accuracy and the asynchronous pattern would convergent faster. 

Based on the above dimensions, the algorithms could be classified into the following types:

...

SGD-Based Synchronous Offline algorithm

...

And there is another performance issue with the DataSet::iterate(...) API: in order to replay the user-provided data streams multiple times, it requires the runtime to always dump the user-provided data streams to disk. This introduces storage and disk I/O overhead even if user's algorithm may prefer to cache those values in-memory and in possibly a more compact format.

In order to address all the issues described above, and make Flink ML available for more iteration use-case in the long run, this FLIP proposes to add a couple APIs in the flink-ml repository to achieve the following goals:

  • Provide solution for all the iteration use-cases (see the use-case section below for more detail) supported by the existing APIs, without having the issues described above.
  • Provide solution for a few use-cases (e.g. bounded streams + async mode + per-round variable update) not supported by the existing APIs.
  • Decouple the iteration-related APIs from core Flink core runtime (by moving them to the flink-ml repo) so that we can keep the Flink core runtime as simple and maintainable as possible.
  • Provide iteration API that does not enforce the disk I/O overhead described above, so that users can optimize an iterative algorithm for best possible performance.

Target Use-cases

In general a ML algorithm would update the model according to the data in iteration until the model is converged. The target algorithms could be classified w.r.t. three properties: boundedness of input datasets, amount of data relied for each variable update and the synchronization policy

1) Algorithms have different needs for whether the input data streams should be bounded or unbounded. We classify those algorithms into online algorithm and offline algorithms as below.

  • For online training algorithms, the training samples will be unbounded streams of data. The corresponding iteration body should ingest these unbounded streams of data, read each value in each stream once, and update machine learning model repeatedly in near real-time. The iteration will never terminate in this case. The algorithm should be executed as a streaming job.
  • For offline training algorithms, the training samples will be bounded streams of data. The corresponding iteration body should read these bounded data streams for arbitrary number of rounds and update machine learning model repeatedly until a termination criteria is met (e.g. a given number of rounds is reached or the model has converged). The algorithm should be executed as a batch job.

2) An algorithm may have additional requirements in how much data should be consumed each time before a subtask can update variables. There are two categories of choices here:

  • Per-batch variable update: The algorithm wants to update variables every time an arbitrary subset of the user-provided data streams (either bounded or unbounded) is processed.
  • Per-round variable update: The algorithm wants to update variables every time all data of the user-provided bounded data streams is processed.

In the machine learning domain, some algorithms allow users to configure a batch size and the model will be updated every time each subtask processes a batch of data. Those algorithms fits into the first category. And such an algorithm can be either online or offline.

Other algorithms only update variables every time the entire data is consumed for one round. Those algorithms fit into the second category. And such an algorithm must be offline because, by this definition, the user-provided dataset must be bounded.

3) Algorithms (either online or offline) have different needs of how their parallel subtasks.

  • In the sync mode, parallel subtasks, which execute the iteration body, update the model variables in a coordinated manner. There exists global epoch epochs, such that all subtasks fetch the shared model variable values at the beginning of an epoch, calculate model variable updates based on the fetched variable values, and updates the model variable values at the end of this epoch. In other words, al subtasks read and update model variables in global lock steps.
  • In the async mode, each parallel subtask, which execute the iteration body, could read/update the shared model variables without waiting for variable updates from other subtask. For example, a subtask could have updated model variables 10 times when another subtask has updated model variables only 3 times.

The sync mode is useful when an algorithm should be executed in a deterministic way to achieve best possible accuracy, and the straggler issue (i.e. there is subtask which is considerably slower than others) does not cause slow down the algorithm execution too much. In comparison, the async mode is useful for algorithms which want to be parallelized and executed across many subtasks as fast as possible, without worrying about performance issue caused by stragglers, at the possible cost of reduced accuracy.

Based on the above dimensions, the algorithms could be classified into the following types:

TypeBounded / UnboundedData GranularitySynchronization Pattern Support in the existing APIsSupport in the proposed APIExamples
Non-SGD-basedBoundedEpochMostly SynchronousYesYesK-Means, Apriori, Decision Tree, Random Walk

SGD-Based Synchronous Offline algorithm

BoundedBatch → Epoch*SynchronousYesYesLinear Regression, Logistic Regression, Deep Learning algorithms
SGD-Based Asynchronous Offline algorithmBoundedBatch → Epoch*AsynchronousNoYesSame to the above
SGD-Based Synchronous Online algorithmUnboundedBatchSynchronousNoYesOnline version of the above algorithm
SGD-Based Asynchronous Online algorithmUnboundedBatchAsynchronousYesYesOnline version of the above algorithm

*Although SGD-based algorithms are also batch-based, they could be implemented with an Epoch-based method if intermediate state is allowed: the subtasks could sample a batch from all the records from the position of the last batch. 

Based on the above classification and the replacement implementation for SGD-based algorithms with bounded dataset, we mainly need to support

  1. The synchronous / asynchronous epoch-based algorithms on the bounded dataset.
  2. The synchronous / asynchronous batch-based algorithms on the unbounded dataset. 

Overview of the Iteration Paradigm

Based on the types of algorithms, we explain the iteration paradigm that has motivated our choices of the proposed APIs.

An iterative algorithm has the following behavior pattern:

  • The iterative algorithm has an iteration body that is repeatedly invoked until some termination criteria is reached (e.g. after a user-specified number of epochs has been reached). An iteration body is a subgraph of operators that implements the computation logic of e.g. an iterative machine learning algorithm, whose outputs might be be fed back as the inputs of this subgraph. 
  • In each invocation, the iteration body updates the model parameters based on the user-provided data as well as the most recent model parameters.
  • The iterative algorithm takes as inputs the user-provided data and the initial model parameters.
  • The iterative algorithm could output arbitrary user-defined information, such as the loss after each epoch, or the final model parameters. 

Therefore, the behavior of an iterative algorithm could be characterized with the following iteration paradigm (w.r.t. Flink concepts):

  • An iteration-body is a Flink subgraph with the following inputs and outputs:
    • Inputs: model-variables (as a list of data streams) and user-provided-data (as another list of data streams)
    • Outputs: feedback-model-variables (as a list of data streams) and user-observed-outputs (as a list of data streams)
  • A termination-condition that specifies when the iterative execution of the iteration body should terminate.
  • In order to execute an iteration body, a user needs to execute an iteration body the following inputs, and gets the following outputs.
    • Inputs: initial-model-variables (as a list of bounded data streams) and user-provided-data (as a list of data streams)
    • Outputs: the user-observed-output emitted by the iteration body.

It is important to note that the users typically should not invoke the IterationBody::process directly because the model-variables expected by the iteration body is not the same as the initial-model-variables provided by the user. Instead, model-variables are computed as the union of the feedback-model-variables (emitted by the iteration body) and the initial-model-variables (provided by the caller of the iteration body). To relieve user from creating this union operator, we have added utility class (see IterationUtils) to run an iteration-body with the user-provided inputs.

The figure below summarizes the iteration paradigm described above. The streams in the red color are inputs provided by the user to the iteration body, as well as outputs emitted by the iteration body to the user.

Image Added

Public Interfaces

We propose to make the following API changes to support the iteration paradigm described above.


1) Add the IterationBody interface.

This interface corresponds to the iteration-body with the inputs and outputs described in the iteration paradigm. This interface should be implemented by the developer of the algorithm.

Note that the IterationBody also outputs the termination criteria which corresponds to the termination-condition described in the iteration paradigm. This allows the algorithm developer to use a stream created inside the IterationBody as the terminationCriteria.

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

@PublicEvolving
public interface IterationBody {
    /**
     * This method creates the graph for the iteration body.
     *
     * See Utils::iterate, Utils::iterateBoundedStreams and Utils::iterateAndReplayBoundedStreams for how the iteration
     * body can be executed and when execution of the corresponding graph should terminate.
     *
     * Required: the number of feedback variable streams returned by this method must equal the number of variable
     * streams given to this method.
     *
     * @param variableStreams the variable streams.
     * @param dataStreams the data streams.
     * @return a IterationBodyResult.
     */
    IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
}


2) Add IterationBodyResult class.

This is a helper class which contains the objects returned by the IterationBody::process(...).

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class that contains the streams returned by the iteration body.
 */
class IterationBodyResult {
    /**
     * A list of feedback variable streams. These streams will only be used during the iteration execution and will
     * not be returned to the caller of the iteration body. It is assumed that the method which executes the
     * iteration body will feed the records of the feedback variable streams back to the corresponding input variable
     * streams.
     */
    DataStreamList feedbackVariableStreams;

    /**
     * A list of output streams. These streams will be returned to the caller of the methods that execute the
     * iteration body.
     */
    DataStreamList outputStreams;

    /**
     * An optional termination criteria stream. If this stream is not null, it will be used together with the
     * feedback variable streams to determine when the iteration should terminate.
     */
    Optional<DataStream<?>> terminationCriteria;
}


3) Add the IterationListener interface.

If an UDF (a.k.a user-defined function) used inside the IterationBody implements this interface, the callbacks on this interface will be invoked when corresponding events happen.

This interface allows users to achieve the following goals:
- Run an algorithm in sync mode, i.e. each subtask will wait for model parameters updates from all other subtasks before reading the aggregated model parameters and starting the next epoch of execution. 
- Emit final output after the iteration terminates.


Code Block
languagejava
linenumberstrue
org.apache.flink.ml.iteration

/**
 * The callbacks defined below will be invoked only if the operator instance which implements this interface is used
 * within an iteration body.
 */
@PublicEvolving
public interface IterationListener<T> {
    /**
     * This callback is invoked every time the epoch watermark of this operator increments. The initial epoch watermark
     * of an operator is 0.
     *
     * The epochWatermark is the maximum integer that meets this requirement: every record that arrives at the operator
     * going forward should have an epoch larger than the epochWatermark. See Java docs in IterationUtils for how epoch
     * is determined for records ingested into the iteration body and for records emitted by operators within the
     * iteration body.
     *
     * If all inputs are bounded, the maximum epoch of all records ingested into this operator is used as the
     * epochWatermark parameter for the last invocation of this callback.
     *
     * @param epochWatermark The incremented epoch watermark.
     * @param context A context that allows emitting side output. The context is only valid during the invocation of
     *                this method.
     * @param collector The collector for returning result values.
     */
    void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector);

    /**
     * This callback is invoked after the execution of the iteration body has terminated.
     *
     * See Java doc of methods in IterationUtils for the termination conditions.
     *
     * @param context A context that allows emitting side output. The context is only valid during the invocation of
     *                this method.
     * @param collector The collector for returning result values.
     */
    void onIterationTermination(Context context, Collector<T> collector);

    /**
     * Information available in an invocation of the callbacks defined in the IterationProgressListener.
     */
    interface Context {
        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        <X> void output(OutputTag<X> outputTag, X value);
    }
}


4) Add the IterationUtils class.

This class provides APIs to execute an iteration body with the user-provided inputs. This class provides three APIs to run an iteration body, each with different input types (e.g. bounded data streams vs. unbounded data streams) and data replay semantics (i.e. whether to replay the user-provided data streams).

Each of these three APIs provide the functionality as described in the iteration paradigm: Union the feedback variables streams (returned by the iteration body) with the initial variable streams (provided by the user) and use the merged streams as inputs to invoke IterationBody::process(...).


Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

/**
 * A helper class to apply {@link IterationBody} to data streams.
 */
@PublicEvolving
public class IterationUtils {
    /**
     * This method can use an iteration body to process records in unbounded data streams.
     *
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=1.
     * 2) All records in the data streams has epoch=MAX_LONG. In this case, records in the data stream won't affect
     * any operator's epoch watermark.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.
     * 4) For any record emitted by this operator into a feedback variable stream, the epoch of the emitted record =
     * min(the epoch of the input record that triggers this emission, MAX_LONG - 1) + 1. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark + 1.
     *
     * The execution of the graph created by the iteration body will not terminate by itself. This is because at least
     * one of its data streams is unbounded.
     *
     * Required:
     * 1) All the init variable streams must be bounded.
     * 2) There is at least one unbounded stream in the data streams list.
     * 3) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param dataStreams The data streams. These streams will be used as the 2nd parameter to invoke the iteration
     *                    body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateUnboundedStreams(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is
     * needed). Because this method does not replay records in the data streams, the iteration body needs to cache those
     * records in order to visit those records repeatedly.
     *
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is the data streams given to this method.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=1.
     * 2) All records in the data streams has epoch=1.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.
     * 4) For any record emitted by this operator into a feedback variable stream, the epoch of the emitted record = the
     * epoch of the input record that triggers this emission + 1. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark + 1.
     *
     * Suppose there is a coordinator operator which takes all feedback variable streams (emitted by the iteration body)
     * and the termination criteria stream (if not null) as inputs. The execution of the graph created by the
     * iteration body will terminate when all input streams have been fully consumed AND any of the following conditions
     * is met:
     * 1) The termination criteria stream is not null. And the coordinator operator has not observed any new value from
     * the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochWatermarkIncremented invocations.
     *
     * Required:
     * 1) All the init variable streams and the data streams must be bounded.
     * 2) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param dataStreams The data streams. These streams will be used as the 2nd parameter to invoke the iteration
     *                    body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}

    /**
     * This method can use an iteration body to process records in some bounded data streams iteratively until a
     * termination criteria is reached (e.g. the given number of rounds is completed or no further variable update is
     * needed). Because this method replays records in the data streams, the iteration body does not need to cache those
     * records to visit those records repeatedly.
     *
     * This method invokes the iteration body with the following parameters:
     * 1) The 1st parameter is a list of input variable streams, which are created as the union of the initial variable
     * streams and the corresponding feedback variable streams (returned by the iteration body).
     * 2) The 2nd parameter is a list of replayed data streams, which are created by replaying the initial data streams
     * round by round until the iteration terminates. The records in the Nth round will be emitted into the iteration
     * body only if the low watermark of the first operator in the iteration body >= N - 1.
     *
     * The epoch values are determined as described below. See IterationListener for how the epoch values are used.
     * 1) All records in the initial variable streams has epoch=1.
     * 2) The records from the initial data streams will be replayed round by round into the iteration body. The records
     * in the first round have epoch=1. And records in the Nth round have epoch = N.
     * 3) For any record emitted by this operator into a non-feedback stream, the epoch of this emitted record = the
     * epoch of the input record that triggers this emission. If this record is emitted by
     * onEpochWatermarkIncremented(), then the epoch of this record = epochWatermark.
     * 4) For any record emitted by this operator into a feedback stream, the epoch of the emitted record = the epoch
     * of the input record that triggers this emission + 1. If this record is emitted by onEpochWatermarkIncremented(),
     * then the epoch of this record = epochWatermark + 1.
     *
     * Suppose there is a coordinator operator which takes all feedback variable streams (emitted by the iteration body)
     * and the termination criteria stream (if not null) as inputs. The execution of the graph created by the
     * iteration body will terminate when all input streams have been fully consumed AND any of the following conditions
     * is met:
     * 1) The termination criteria stream is not null. And the coordinator operator has not observed any new value from
     * the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
     * 2) The coordinator operator has not observed any new value from any feedback variable stream between two
     * consecutive onEpochWatermarkIncremented invocations.
     *
     * Required:
     * 1) All the init variable streams and the data streams must be bounded.
     * 2) The parallelism of any stream in the initial variable streams must equal the parallelism of the stream at the
     * same index of the feedback variable streams returned by the IterationBody.
     *
     * @param initVariableStreams The initial variable streams. These streams will be merged with the feedback variable
     *                            streams before being used as the 1st parameter to invoke the iteration body.
     * @param initDataStreams The initial data streams. Records from these streams will be repeatedly replayed and used
     *                        as the 2nd parameter to invoke the iteration body.
     * @param body The computation logic which takes variable/data streams and returns variable/output streams.
     * @return The list of output streams returned by the iteration boy.
     */
    static DataStreamList iterateAndReplayBoundedStreamsUntilTermination(DataStreamList initVariableStreams, DataStreamList initDataStreams, IterationBody body) {...}
}


5) Add the DataStreamList class.

DataStreamList is a helper class that contains a list of data streams with possibly different elements types.

Code Block
languagejava
linenumberstrue
package org.apache.flink.ml.iteration;

public class DataStreamList {
    // Returns the number of data streams in this list.
    public int size() {...}

    // Returns the data stream at the given index in this list.
    public <T> DataStream<T> get(int index) {...}
}


6) Deprecate the existing DataStream::iterate() and the DataStream::iterate(long maxWaitTimeMillis) methods.

We plan to remove both methods after the APIs added in this doc is ready for production use. This change is needed to decouple the iteration-related APIs from core Flink core runtime  so that we can keep the Flink core runtime as simple and maintainable as possible.

Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs.

1) How the feedback edge is supported.

The Flink core runtime can only execute a DAG of operators that does not involve cycles. Thus extra work needs to be done to support feedback edges (which effectively introduces cycles in the data flow).

Similar to the existing iterative API, this FLIP plans to implement the feedback edge using the following approach:

  • Automatically insert the HEAD and the TAIL operators as the first and the last operators in the iteration body.
  • Co-locate the HEAD and the TAIL operators on the same task manager.
  • Have the HEAD and the TAIL operators transmit the records of the feedback edges using an in-memory queue data structure. 

Since all the forward edges have limited buffers, to avoid the deadlocks the feedback queues must have unlimited size. To avoid unlimited memory footprint, when the queued size exceeds a threshold the records would be spilled to the disk. To avoid such spilling the HEAD operators would read the data in a "feedback-first" manner, namely it would always process the feedback records first if there are records from both initial input and feedback edges.

2) How the termination of the iteration execution is determined.

We will add a coordinator operator which takes all feedback variable streams (emitted by the iteration body) and the termination criteria stream (if not null) as inputs. The execution of the graph created by the iteration body will terminate when all input streams have been fully consumed OR any of the following conditions is met:

  • The termination criteria stream is not null. And the coordinator operator has not observed any new value from the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
  • The coordinator operator has not observed any new value from any feedback variable stream between two consecutive onEpochWatermarkIncremented invocations.

TODO: explain how this is implemented.



4) The execution mode that is required to execute the iteration body.

  • If all inputs streams are bounded, then the iteration body can be executed in either the stream mode or the batch mode.
  • If any input stream is unbounded, then the iteration body must be executed in the stream mode.


5) The requirements of the edge types and parallelism in the IterationBody.

All edges inside the iteration body are required to have the PIPELINE type. If the user-defined iteration body contains an edge that does not have the PIPELINE type, methods that create the subgraph from the iteration body, such as iterateBoundedStreamsUntilTermination, will throw exception upon invocation.


6) Lifetime of the operators inside the iteration body.

With the approach proposed in this FLIP, the operators inside the iteration body are only created once and destroyed after the entire iteration terminates.

In comparison, the existing DataSet::iterate(..) would destroy and re-create the iteration body once for each round of iteration, which in general could introduce more runtime overhead then the approach adopted in this FLIP.


7) How an iteration can resume from the most recent completed epoch after failover.

For any job that is executed in the batch mode, the job can not start from a recent epoch after failover. In other words, if an iterative job fails, it will start from the first epoch of this iteration. Note that the existing DataSet::iterate(...) has the same behavior after job failover.

For any job that is executed in the stream mode, the job can start from a recent epoch after failover. This is achieved by re-using the existing checkpoint mechanism (only available in the stream mode) and additionally checkpointing the values buffered on the feedback edges.


8) How to implement an iterative algorithm in the sync mode.

Definition of sync-mode

An iterative algorithm is run in sync-mode if there exists global epoch, such that at the time a given operator computes its output for the Nth epoch, this operator has received exactly the following records from its input edges:

  • We define a feedback loop as a circle composed of exactly 1 feedback edge and arbitrary number of non-feedback edges.
  • If an edge is a non-feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the Nth epoch, without receiving any record for the (N+1)th epoch.
  • If an edge is a feedback input edge and this edge is part of a feedback loop, then this operator has received all records emitted on this edge for the (N-1)th epoch, without receiving any record for the Nth epoch.


Solution to run an iterative algorithm in the sync mode

An iterative algorithm will be run in the sync mode if its IterationBody meets the following requirements:

  • When any operator within the IterationBody receives values from its input edges, this operator does not immediately emit records to its output.
  • Operators inside the IterationBody only compute and emit records to their outputs in the onEpochWatermarkIncremented(...) callback. The emitted records should be computed based on the values received from the input edges up to the invocation of this callback.

See the Appendix section for a proof of why the solution described above could achieve the sync-mode execution as defined above.


9) How to run an iterative algorithm without dumping all user-provided data streams to disk.

As mentioned in the motivation section, the existing DataSet::iterate() always dump the user-provided data streams to disk so that it can replay the data streams regardless of the size of those data streams. Since this is the only available API to do iteration on bounded data streams, there is no way for algorithm developer to get rid of this performance overhead.

In comparison, the iterateBoundedStreamsUntilTermination(...) method proposed in this FLIP allows users to run an iteration body without incurring this disk performance overhead. Developers have the freedom to optimize the performance based on its algorithm and data size, e.g. cache data in memory in a more compact format.


10) How to support operators (e.g. ReduceOperator) that requires bounded inputs in the IterationBody.

TODO: explain it.


To avoid more data is read from the inputs while too much data accumulate inside the iteration, the iteration would first process the feedback data if both side of data is available. 

For termination detection, the iteration would continue until

  1. All the inputs are terminated.
  2. And there is no records inside the iteration subgraph. 

Then the iteration terminates.

Bounded Iteration

As mentioned in the motivation, the existing dataset iteration API uses the "per-round" semantics: it views the iteration as a repeat execution of the same DAG, thus underlying it would automatically merge the inputs and feedbacks and replay the inputs without feedbacks, and the operators inside the iteration live only for one-round. This might cause bad performance for some algorithms who could cache these data in a more efficient way. 

To avoid this issue, similar to the unbounded iteration, by default we use the "per-iteration" semantics: 

  1. Operators inside the iteration would live till the whole iteration is finished.
  2. We do not automatically merge the inputs and feedbacks. Instead, we union the original inputs and the feedbacks so that users could decide how the merge them.
  3. We do not replay the inputs without feedbacks. Users could decide to how to cache them more efficiently. 

Besides, to cooperate with the "per-round" semantics, previously the iteration is by default synchronous: before the current round fully finished, the feedback data is cached and would not be emitted. Thus it could not support some algorithms like asynchronous regression. To cope with this issue, we view synchronous iteration as a special case of asynchronous iteration with additional synchronization. Thus by default the iteration is asynchronous. 

Based on the above assumption, the API to add iteration to a job is nearly the same compared to the unbounded iteration. The only difference is that bounded iteration supports more sophisticated termination conditions: a function is evaluated when each round ends based on the round or the records of a specified data stream. If it returns true, the iteration would deserts all the following feedback records, ends all the ongoing rounds and finish. 

Since now the operators would live across multiple rounds and multiple rounds might be concurrent, the operators inside the iteration needs to know the rounds of the current record and when one round is fully finished, namely the progress tracking. For example, an operator computes the sum of the records in each rounds would like to add the record to the corresponding partial sum, and when one round is finished, it would emit the sum for this round. To support the progress track, UDFs / operators inside the iteration could implementation `BoundedIterationProgressListener` to acquire the additional information about the progress. 

Based on the progress tracking interface, if users want to implement a synchronous method, some operators inside the subgraph needs to be synchronous: they only emits the records in `onRoundEnd`, namely after all the data of the current round is received. If for the subgraph of iteration body, every path from input to the feedbacks has at least such an operator, then the iteration would be synchronous. 

For users still want to use the iteration with the "per-round" semantics, a utility `forEachRound()` is provided. With the utility users could add a subgraph inside the iteration body that

  1. The operators inside the subgraph would live only for one round.
  2. If an input stream without feedback is referenced, the input stream would be replayed for each round.

For input stream with feedbacks, we also provide two utility processFunction that automatically merge the original inputs and feedbacks. Both the existing bulk and delta method is supported. Then users would be able to implement a per-round iteration with input.process(bulkCache()).forEachRound(() → {...}).

The API for the bounded iteration is as follows:

*Although SGD-based algorithms are also batch-based, they could be implemented with an Epoch-based method if intermediate state is allowed: the subtasks could sample a batch from all the records from the position of the last batch. 

Based on the above classification and the replacement implementation for SGD-based algorithms with bounded dataset, we mainly need to support

  1. The synchronous / asynchronous epoch-based algorithms on the bounded dataset.
  2. The synchronous / asynchronous batch-based algorithms on the unbounded dataset. 

The Goals of the Iteration Library

If we directly copy the current implementation of iteration on the DataSet And DataStream API, we would still meet with some problem, thus we would like to have some optimization to the existing iteration functionality.

The Iteration Body and Round Semantics

At the iteration API level, we would need the corresponding concept corresponding to Epoch and Batch. We would call processing one epoch as a round: users would specify a subgraph as the body of the iteration to specify how to calculate the update, after the iteration body process the whole dataset for one time (namely one Epoch). Apparently the round is meaningful only for the bounded cases.

...

Figure 1. The structure of an iteration body.

To process the inputs for multiple rounds, we would need feedback edges to emit the outputs of the last round to the inputs of the iteration body, and we union the initial inputs and the feedbacks. For bounded dataset when processing the first epoch, the data would be from the initial input edges and when processing the remaining epochs, the data would be from the feedback edges. 

There are also inputs to the iteration body that do not have feedbacks. For example, an ML algorithm might have two inputs, one is the initialized model and the other is the training data. The input corresponding to the initial model will have a feedback after each epoch about the update to the model, but the training data would not need to be updated. 

The iteration body also have one or multiple output streams. The iteration body might output records at each round, and the records emitted in all the rounds composed the final outputs.

Therefore, an iteration body composed of

  1. Variable inputs with feedbacks.
  2. Constant inputs without feedbacks.
  3. Outputs.

For the unbounded case, logically we should have 

Per-Round v.s. All-Rounds Semantics

How users could specify the iteration body ? If we first consider the bounded cases, there are two options

  1. Per-round: Users specify a subgraph, and for each round, the framework would recreate the operators and do the same computation.
  2. All-rounds: Users specify a subgraph, and the operators inside the subgraph would process the epochs of all the rounds. 

The DataSet iteration choose the per-round semantics. to support this semantics, in addition to re-create operators for each round, the framework also needs:

  1. For the inputs outside the iteration, 

The benefits of this method is that writing an iteration body is no difference from constructing a DAG outside of the iteration. 

Synchronization

Since for the bounded dataset, all the algorithms, to the best of out extend, are all able to be converted into epoch-based algorithms, thus we could only support the synchronization between epoch, namely between rounds.

How to 

Besides, the previous DataStream and DataSet iteration APIs also have some caveats to support algorithm implementation:

  1. Lack of the support for multiple inputs, arbitrary outputs and nested iteration for both iteration APIs, which is required by scenarios like Metapath (multiple-inputs), boost algorithms (nested iteration) or when we want to output both loss and model (multiple-outputs). In the new iteration we would support these functionalities.
  2. Lack of asynchronous iteration support for the DataSet iteration, which is required by algorithms like asynchronous linear regression, in the new iterations we would support both synchronous and asynchronous modes for the bounded iteration. 
  3. The current DataSet iteration by default provides a "for each round" semantics, namely users only need to specify the computation logic in each round, and the framework would executes the subgraph multiple times until convergence. To cooperate with the semantics, the DataSet iteration framework would merge the initial input and the feedback (bulk style and delta style), and replay the datasets comes from outside of the iteration. This method is easier to use, but it also limit some possible optimizations.

We also would like to improve these caveats in the new iteration library. 

Overall Design

To reduce the development and maintenance overhead, it would be preferred to have a unified implementation for different types of iterations. In fact, the different iteration types shares the same requirements in runtime implementation:

  1. All the iteration types should support multiple inputs and multiple outputs. 
  2. All the iteration types require some kind of back edges that transfer the data back to the iteration head. Since Flink does not support cycles in scheduler and network stack, the back edges should not be visible in the StreamGraph and JobGraph.
  3. All the iteration should support checkpoints mechanism in Stream execution mode.

Different types of iterations differ in their requirements for Progress tracking. Progress tracking is analogous to the watermark outside the iteration and it tracks the “progress” inside the iteration:

  1. For bounded iteration, we could track if we have processed all the records for a specific round. This is necessary for operators like aggregation inside the iteration: if it is notified all the records of the current round is processed, it could output the result of this round. We could also track if the whole iteration is end, namely all the inputs are finished and no pending records inside the iteration. 
  2. For unbounded iteration, there is no concept of global rounds, and the only progress tracking is at the end of iteration. 

The difference of the progress tracking would also affect the API. For example, for bounded iteration, we could allow users to specify the termination condition based on number of rounds, but it is meaningless for the unbounded iteration.

To make the API easy to use, we propose to have dedicated API for different types of iteration, and underlying we will translate them onto the same framework. would implements the basic functionality like iteration StreamGraph building, runtime structure and checkpoint, and it allows to implement different iterations to implement different types of progress tracking support. 

Public Interfaces

As shown in Figure 1, an iteration is composed of 

  1. The inputs from outside of the iteration. 
  2. An iteration body specify the structure inside the iteration.
    1. The subgraph inside the iteration.
    2. Some input have corresponding feedbacks to update the underlying data stream. The feedbacks are union with the corresponding inputs: the original inputs are emitted into the iteration body for only once, and the feedbacks are also emitted to the same set of operators.
    3. The outputs going out of the iteration. The outputs could be emitted from arbitrary data stream.

Unbounded Iteration

Similar to FLIP-15, we would more tend to provide a structural iteration API to make it easier to be understand. With this method, users are required to specify an IterationBody that generates the part of JobGraph inside the iteration. The iteration body should specify the DAG inside the iteration, and also the list of feedback streams and the output streams. The feedback streams would be union with the corresponding inputs and the output streams would be provided to the caller routine. 

However, since we do not know the accurate number and type of input streams, it is not easy to define a unified interface for the iteration body without type casting. Thus we would propose to use the annotation to allows for arbitrary number of inputs:

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
/** The iteration body specify the sub-graph inside the iteration. */
public interface IterationBody {

	/** This annotation marks the function as it would builds the subgraph. */
	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	/** 
	* This annotation marks a parameter of the iteration function as an input to the subgraph. 
	* The input is the union of the initial inputs bound to the iteration and the corresponding 
	* feedback. 
	*/
	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationInput {
		String value();
	}
}

/** An example usage for the iteration body with two inputs. */
new IterationBody() {
	@IterationFunction
	UnboundedIterationDeclarative iterate(
		@IterationInput("first") DataStream<Integer> first
		@IterationInput("second") DataStream<> second,
	) {
		DataStream<Integer> feedBack1 = ...;
		DataStream<String> output1 = ...;
		return new UnboundedIterationDeclarationBuilder()
			.withFeedback("first", feedBack1)
			.withOutput("output1", output1)
			.build();
		}
	}

The interface for the unbounded iteration is straightforward:

Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
/** Builder for the unbounded iteration. */
public class UnboundedIteration {

	/** Set the body of the iteration. */
    UnboundedIteration withBody(IterationBody body) {...}
	
	/** Bind the initial input with the specific name. */
	UnboundedIteration bindInput(String name, DataStream<?> input) {...}

	/** Generates and adds the subgraph corresponding to the iteration.  */
	ResultStreams build() {...}
}

/** The expected return type of the iteration function, which specifies the feedbacks and outputs. */
public class UnboundedIterationDeclaration {
	
	public static class Builder {

		/** 
		* Specify the feedback corresponding to the specific name. The feedback would
		* be union with the initial input with the same name to provide to the iteration
		* body. 
		*/
		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		/** Specify one output with the specific name. */
		public Builder withOutput(String name, DataStream<?> output) {...}
		
		/** Generate the Declaration. */
		UnboundedIterationDeclaration build() {...}
	}
}

/** The map of the output streams of an iteration.  */
public class ResultStreams {
	
	/** Gets the DataStream with the specific name. */
	public <T> DataStream<T> getStream(String name) {...}

}

To avoid more data is read from the inputs while too much data accumulate inside the iteration, the iteration would first process the feedback data if both side of data is available. 

For termination detection, the iteration would continue until

  1. All the inputs are terminated.
  2. And there is no records inside the iteration subgraph. 

Then the iteration terminates.

Bounded Iteration

As mentioned in the motivation, the existing dataset iteration API uses the "per-round" semantics: it views the iteration as a repeat execution of the same DAG, thus underlying it would automatically merge the inputs and feedbacks and replay the inputs without feedbacks, and the operators inside the iteration live only for one-round. This might cause bad performance for some algorithms who could cache these data in a more efficient way. 

To avoid this issue, similar to the unbounded iteration, by default we use the "per-iteration" semantics: 

  1. Operators inside the iteration would live till the whole iteration is finished.
  2. We do not automatically merge the inputs and feedbacks. Instead, we union the original inputs and the feedbacks so that users could decide how the merge them.
  3. We do not replay the inputs without feedbacks. Users could decide to how to cache them more efficiently. 

Besides, to cooperate with the "per-round" semantics, previously the iteration is by default synchronous: before the current round fully finished, the feedback data is cached and would not be emitted. Thus it could not support some algorithms like asynchronous regression. To cope with this issue, we view synchronous iteration as a special case of asynchronous iteration with additional synchronization. Thus by default the iteration is asynchronous. 

Based on the above assumption, the API to add iteration to a job is nearly the same compared to the unbounded iteration. The only difference is that bounded iteration supports more sophisticated termination conditions: a function is evaluated when each round ends based on the round or the records of a specified data stream. If it returns true, the iteration would deserts all the following feedback records, ends all the ongoing rounds and finish. 

Since now the operators would live across multiple rounds and multiple rounds might be concurrent, the operators inside the iteration needs to know the rounds of the current record and when one round is fully finished, namely the progress tracking. For example, an operator computes the sum of the records in each rounds would like to add the record to the corresponding partial sum, and when one round is finished, it would emit the sum for this round. To support the progress track, UDFs / operators inside the iteration could implementation `BoundedIterationProgressListener` to acquire the additional information about the progress. 

Based on the progress tracking interface, if users want to implement a synchronous method, some operators inside the subgraph needs to be synchronous: they only emits the records in `onRoundEnd`, namely after all the data of the current round is received. If for the subgraph of iteration body, every path from input to the feedbacks has at least such an operator, then the iteration would be synchronous. 

For users still want to use the iteration with the "per-round" semantics, a utility `forEachRound()` is provided. With the utility users could add a subgraph inside the iteration body that

  1. The operators inside the subgraph would live only for one round.
  2. If an input stream without feedback is referenced, the input stream would be replayed for each round.

For input stream with feedbacks, we also provide two utility processFunction that automatically merge the original inputs and feedbacks. Both the existing bulk and delta method is supported. Then users would be able to implement a per-round iteration with input.process(bulkCache()).forEachRound(() → {...}).

The API for the bounded iteration is as follows:

Code Block
languagejava
titleBounded Iteration API
linenumberstrue
/** Builder for the bounded iteration. */
public class BoundedIteration {
	
	/** Set the body of the iteration. */
    BoundedIteration withBody(IterationBody body) {...}

	/** Bind the initial input with the specific name. */
	BoundedIteration bindInput(String name, DataStream<?> input) {...}
	
	/** Generates and adds the subgraph corresponding to the iteration.  */
	ResultStreams build() {...}
}

/** The expected return type of the iteration function, which specifies the feedbacks, outputs and termination conditions. */
public class BoundedIterationDeclaration {
	
	public static class Builder {
		
		/** 
		* Specify the feedback corresponding to the specific name. The feedback would
		* be union with the initial input with the same name to provide to the iteration
		* body. 
		*/
		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		/** Specify one output with the specific name. */
		public Builder withOutput(String name, DataStream<?> output) {...}
		
		/** Specify the termination condition of the iteration. */
		<U> Builder until(TerminationCondition terminationCondition) { ... }
		
		/** Generate the Declaration. */
		BoundedIterationDeclaration build() {...}
	}
}

/** 
* The termination condition judges if iteration should stop based on the round number
* or the records of a given data stream in one round. 
* 
* Since there might be asynchronous iteration that multiple rounds are executed in parallel,
* the condition is evaluated at the end of each round. If it is evaluated to true for one
* round, then the iteration would desert the feedback records for the following rounds, but
* the already emitted records would not be withdrawal. 
*/
public class TerminationCondition {

	/**
	* The records of the given DataStream will be collected in each
	* round to be used in judging whether the loop should terminate.
	*/
	@Nullable DataStream<?> refStream;

	/**
	* A user-defined function that is evaluated at the end of each round.
	*/
	Function<Context, Boolean> isConverged;

	interface Context {
		
		/** The round number. */
     	int[] getRound();
		
		/** The list of records of the referred stream. */
     	<T> List<T> getStreamRecords();
  	}	
}

/** The progress tracking interface for UDF / Operator */
public interface BoundedIterationProgressListener<T> {
	
	/** Sets a tool to be used to query the round number of the current record. */
	default void setCurrentRecordRoundsQuerier(Supplier<int[]> querier) {}
	
	/** Notified at the end of each round. */
	void onRoundEnd(int[] round, Context context, Collector<T> collector);
	
	/** Notified at the end of the whole iteration. */
	default void onIterationEnd(int[] rounds, Context context) {}

    public interface Context {
		
		<X> void output(OutputTag<X> outputTag, X value);
		
		Long timestamp();

		TimerService timerService();
	}
}

/** The utility methods to support per-round semantics. */
public class BoundedIterationPerRoundUtils {
	/** The builder that creates the subgraph that executed with the per-round semantics **/	
	public interface EachRound {
	
		Map<String, DataStream<?>> executeInEachRound();

	}
	
	/** Create a subgraph inside the iteration that execute with per-round semantics */
	static ResultStreams forEachRound(EachRound eachRoundBuilder);

	/** A cache to a Datastream. For each round it replaces the Datastream with the records received in this round and outputs all these records. */	
	static <T> ProcessFunction bulkCache(DataStream<T> inputStream);

	/** A cache to a KeyedStream. For each round it updates the KeyedStream with the records received in this round and outputs the updated records. */	
	static <K, T> ProcessFunction deltaCache(KeyedStream<K, T> inputStream);
}

Key Implementation

The runtime physical structure of the iteration is shown in Figure 1, which is similar to the current implementation. The head & tail is added by the framework. They would be colocated so that we could implement the feedback edge with the local queue.  The head could coordinator with an operator coordinator bind to a virtual operator ID for synchronization, including progress tracking and termination condition calculating. 

...

Figure 1. The physical runtime structure for the iteration. 

To support the progress tracking, we would introduce new events inside the iteration body, like how watermark is implemented. However, since the normal operators could not identify these event, we would wrap the operators inside the iteration to parse these events.

To wrap the operators for the part of DAG inside the iteration, when building the stream graph we would introduce a mock execution environment and build the iteration DAG inside this environment first, then when apply() method is called, we would translate the DAG into the real execution environment with the suitable wrapper. Besides, all the edges inside the iteration should be PIPELINE, we would also set the edge property when translating.

...

Examples

This sections shows how general used ML algorithms could be implemented with the iteration API. 

...