You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state"Under Discussion"

Discussion thread: To be added

JIRA: To be added

Released: To be decided

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to execute a machine learning algorithm using Flink as the underlying runtime, Flink needs to support the iteration primitive, such that some outputs of a Flink job subgraph can be fed back to the inputs of the iteration body and this loop continuous until some termination criteria is reached.

Flink currently provides DataSet::iterate(...) and DataStream::iterate(...) to support the iteration primitive described above. However, neither API can be used to support iteration on both bounded and unbounded data streams, for the following reasons:

  • The DataSet::iterate(...) only supports iteration on the bounded data streams. And as described in FLIP-131, we will deprecate the DataSet API in favor of the Table API/SQL in the future.
  • The DataStream::iterate(...) has a few design issues that prevents it from being reliably used in production jobs. Many of these issues, such as possibility of deadlock, are described in FLIP-15.

In order to address the issues described above and support iteration on both bounded and unbounded data streams, this FLIP proposes to add a couple APIs in the flink-ml repository. Note that we have chosen to put the iteration API (and its implementation) in the flink-ml repository instead of the DataStream class in the Flink core repository, because we believe it is important to keep the Flink core runtime as simple and maintainable as possible.

Besides supporting the basic iteration primitive (i.e. the feedback stream), Flink also needs APIs to support synchronization between parallel execution of the iteration body. This is needed to execute machine learning algorithms that need to be parallelized and still ensure deterministic execution results.

In the following, we describe the example use-cases and the exact semantics that we aim to support via the proposed APIs.

Target use-cases

We expect to support online training and offline training use-cases. These use-cases have the following properties:

1) In online training, the training samples will be unbounded streams of data. The iteration body should ingest these unbounded streams of data, read each value in each stream once, and update machine learning model repeatedly in near real-time. The iteration will never terminate in this case. The job should be run as a streaming job.

2) In offline training, the training samples will be bounded streams of data. The iteration body should read these bounded data streams for arbitrary number of rounds and update machine learning model repeatedly until a termination criteria is met (e.g. a given number of rounds is reached or the model has converged). The job should be run as a batch job.


For both online training and offline training scenarios, there exists use-cases to run the iteration body in either sync mode or async mode:

1) In the sync mode, parallel subtasks, which execute the iteration body, update the model variables in a coordinated manner. There exists global epoch epochs, such that all subtasks could read the shared model variables at the beginning of a epoch, calculate variable updates based on the fetched variable values, and emit updates of the variable values at the end of this epoch.

The sync mode is useful when users want to execute an algorithm in a deterministic way to achieve best possible accuracy and the straggler (i.e. subtask which is considerably slower than others) is not an important concern.

2) In the async mode, each parallel subtask, which execute the iteration body, could read/update the shared model variables without waiting for variable updates from other subtask. For example, a subtask could have updated model variables 10 times when another subtask has updated model variables only 3 times.

The async mode is useful for users who to execute an algorithm across many subtasks as fast as possible, in the presence of stragglers, at the possible cost of reduced accuracy.


  • No labels