You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: ACCEPT

Discussion thread: <TODO>

JIRA: <TODO>

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Iteration is a basic building block for a ML library. It is required for training ML models for both offline and online cases. However, different cases might require different types of iteration:

  1. Offline Training: The training dataset is bounded, the algorithm usually iterates the dataset multiple rounds to optimize the model until convergence.
  2. Online Training: The training dataset is unbounded, the algorithm usually calculates the update to the model according to the current model and the input records, then sends the update back to the operator that holds the model.


Previously Flink supported bounded iteration with DataSet API and supported the unbounded iteration with DataStream API. However, since Flink aims to deprecate the DataSet API and the iteration in the DataStream API is rather incomplete, thus we would require to re-implement a new iteration library in the Flink-ml repository to support the algorithms. 

Overall Design

To reduce the development and maintenance overhead, it would be preferred to have a unified implementation for different types of iterations. In fact, the different iteration types shares the same requirements in runtime implementation:

  1. All the iteration types should support multiple inputs and multiple outputs. 
  2. All the iteration types require some kind of back edges that transfer the data back to the iteration head. Since Flink does not support cycles in scheduler and network stack, the back edges should not be visible in the StreamGraph and JobGraph.
  3. All the iteration should support checkpoints mechanism in Stream execution mode.
  4. All the iteration types should support 


Different types of iterations differ in their requirements for Progress tracking. Progress tracking is analogous to the watermark outside the iteration and it tracks the “progress” inside the iteration:

  1. For bounded iteration, we could track if we have processed all the records for a specific round. This is necessary for operators like aggregation inside the iteration: if it is notified all the records of the current round is processed, it could output the result of this round. We could also track if the whole iteration is end, namely all the inputs are finished and no pending records inside the iteration. 
  2. For unbounded iteration, there is no concept of global rounds, and the only progress tracking is end of iteration. 


To support progress tracking, we need to introduce additional events inside the iteration, similar to the Watermark used outside the iteration. However, ordinary user operators won’t understand these events, thus we would need to wrap the operators. Different progress tracking requirements lead to different wrapper used.

On the other side, for the API, different types of iteration also have different requirements. For example, the unbounded iteration won’t provide the concept of round while round is explicitly in the bounded iteration. 

Based on the above requirements, we propose to provide different API for different kinds of iteration, and translate them into the same framework. The framework would implements the basic functionality like iteration StreamGraph building, runtime structure and checkpoint, and it allows specify different types of wrapper operators. 

Proposed Changes

Iteration API

Followed by the experience of FLIP-15, we would provide a structural-style iteration API. The API for the unbounded iteration is

Unbounded Iteration API
public class UnboundedIteration {

    UnboundedIteration withBody(UnboundedIterationBody body) {
        ...
	}

	ResultStreams apply() {
        ...
	}
}

public interface UnboundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationVariable {

		String value();

	}
}

public static class UnboundedIterationDeclarativeBuilder {

	public UnboundedIterationDeclarativeBuilder withFeedback(String name, DataStream<?> feedback) {
		...
		return this;
	}

	public UnboundedIterationDeclarativeBuilder withOutput(String name, DataStream<?> output) {
		...
		return this;
	}

	UnboundedIterationDeclarative build() {
		return ...
	}
}

public static class ResultStreams {

	public <T> DataStream<T> getStream(String name) {
		return ...
	}
}

public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5);
		ResultStreams resultStreams = new UnboundedIterationAPI()
			.withBody(new UnboundedIterationBody() {

				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<?> first
				) {
					DataStream<?> feedBack = null;
					return new UnboundedIterationDeclarativeBuilder()
						.withFeedback("first", feedBack)
						.withOutput("output1", feedBack)
						.build();
				}
			}).apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}




Key Implementation



Compatibility, Deprecation, and Migration Plan


Rejected Alternatives

  • No labels