Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs.

Support the Feedback Edges

1) How the feedback edge is supported.

...

2) How the termination of the iteration execution is determined.

We will add a coordinator operator which takes all feedback variable streams (emitted by the iteration body) and the termination criteria stream (if not null) as inputs. The execution of the graph created by the iteration body will terminate when all input streams have been fully consumed OR any of the following conditions is met:

  • The termination criteria stream is not null. And the coordinator operator has not observed any new value from the termination criteria stream between two consecutive onEpochWatermarkIncremented invocations.
  • The coordinator operator has not observed any new value from any feedback variable stream between two consecutive onEpochWatermarkIncremented invocations.

...

Having the feedback edges also complicate the termination detection of the job. Since the feedback edges is not visible in the JobGraph, the HEAD operators, as the first operators in the DAG of the iteration body, would decide when the whole iteration body could be terminated and initiate the termination process. The termination happens when

  1. All the inputs to the iteration body have been finished. 
  2. AND 
    1. If users have specify a reference stream, the number of records in each epoch would be counted after the epoch is done, and if the count is 0, HEAD would start to terminate. 
    2. OR If users have not specify a reference stream, then when no records are still being processed inside the iteration body. This is detected by have a special event travel through the whole iteration body. 




4) The execution mode that is required to execute the iteration body.

...

10) How to support operators (e.g. ReduceOperator) that requires bounded inputs in the IterationBody.

TODO: explain it.

To avoid more data is read from the inputs while too much data accumulate inside the iteration, the iteration would first process the feedback data if both side of data is available. 

For termination detection, the iteration would continue until

  1. All the inputs are terminated.
  2. And there is no records inside the iteration subgraph. 

Then the iteration terminates.

Bounded Iteration

As mentioned in the motivation, the existing dataset iteration API uses the "per-round" semantics: it views the iteration as a repeat execution of the same DAG, thus underlying it would automatically merge the inputs and feedbacks and replay the inputs without feedbacks, and the operators inside the iteration live only for one-round. This might cause bad performance for some algorithms who could cache these data in a more efficient way. 

To avoid this issue, similar to the unbounded iteration, by default we use the "per-iteration" semantics: 

  1. Operators inside the iteration would live till the whole iteration is finished.
  2. We do not automatically merge the inputs and feedbacks. Instead, we union the original inputs and the feedbacks so that users could decide how the merge them.
  3. We do not replay the inputs without feedbacks. Users could decide to how to cache them more efficiently. 

Besides, to cooperate with the "per-round" semantics, previously the iteration is by default synchronous: before the current round fully finished, the feedback data is cached and would not be emitted. Thus it could not support some algorithms like asynchronous regression. To cope with this issue, we view synchronous iteration as a special case of asynchronous iteration with additional synchronization. Thus by default the iteration is asynchronous. 

Based on the above assumption, the API to add iteration to a job is nearly the same compared to the unbounded iteration. The only difference is that bounded iteration supports more sophisticated termination conditions: a function is evaluated when each round ends based on the round or the records of a specified data stream. If it returns true, the iteration would deserts all the following feedback records, ends all the ongoing rounds and finish. 

Since now the operators would live across multiple rounds and multiple rounds might be concurrent, the operators inside the iteration needs to know the rounds of the current record and when one round is fully finished, namely the progress tracking. For example, an operator computes the sum of the records in each rounds would like to add the record to the corresponding partial sum, and when one round is finished, it would emit the sum for this round. To support the progress track, UDFs / operators inside the iteration could implementation `BoundedIterationProgressListener` to acquire the additional information about the progress. 

Based on the progress tracking interface, if users want to implement a synchronous method, some operators inside the subgraph needs to be synchronous: they only emits the records in `onRoundEnd`, namely after all the data of the current round is received. If for the subgraph of iteration body, every path from input to the feedbacks has at least such an operator, then the iteration would be synchronous. 

For users still want to use the iteration with the "per-round" semantics, a utility `forEachRound()` is provided. With the utility users could add a subgraph inside the iteration body that

  1. The operators inside the subgraph would live only for one round.
  2. If an input stream without feedback is referenced, the input stream would be replayed for each round.

For input stream with feedbacks, we also provide two utility processFunction that automatically merge the original inputs and feedbacks. Both the existing bulk and delta method is supported. Then users would be able to implement a per-round iteration with input.process(bulkCache()).forEachRound(() → {...}).

...


Examples

This sections shows how general used ML algorithms could be implemented with the iteration API. 

...