You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Next »

Status

Current state: <TODO>

Discussion thread: <TODO>

JIRA: <TODO>

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Iteration is a basic building block for a ML library. It is required for training ML models for both offline and online cases. However, different cases might require different types of iteration:

  1. Offline Training: The training dataset is bounded, the algorithm usually iterates the dataset multiple rounds to optimize the model until convergence.
  2. Online Training: The training dataset is unbounded, the algorithm usually calculates the update to the model according to the current model and the input records, then sends the update back to the operator that holds the model.


Previously Flink supported bounded iteration with DataSet API and supported the unbounded iteration with DataStream API. However, since Flink aims to deprecate the DataSet API and the iteration in the DataStream API is rather incomplete, thus we would require to re-implement a new iteration library in the Flink-ml repository to support the algorithms. 

Overall Design

To reduce the development and maintenance overhead, it would be preferred to have a unified implementation for different types of iterations. In fact, the different iteration types shares the same requirements in runtime implementation:

  1. All the iteration types should support multiple inputs and multiple outputs. 
  2. All the iteration types require some kind of back edges that transfer the data back to the iteration head. Since Flink does not support cycles in scheduler and network stack, the back edges should not be visible in the StreamGraph and JobGraph.
  3. All the iteration should support checkpoints mechanism in Stream execution mode.

Different types of iterations differ in their requirements for Progress tracking. Progress tracking is analogous to the watermark outside the iteration and it tracks the “progress” inside the iteration:

  1. For bounded iteration, we could track if we have processed all the records for a specific round. This is necessary for operators like aggregation inside the iteration: if it is notified all the records of the current round is processed, it could output the result of this round. We could also track if the whole iteration is end, namely all the inputs are finished and no pending records inside the iteration. 
  2. For unbounded iteration, there is no concept of global rounds, and the only progress tracking is end of iteration. 


To support progress tracking, we need to introduce additional events inside the iteration, similar to the Watermark used outside the iteration. However, ordinary user operators won’t understand these events, thus we would need to wrap the operators. Different progress tracking requirements lead to different wrapper used.

On the other side, for the API, different types of iteration also have different requirements. For example, the unbounded iteration won’t provide the concept of round while round is explicitly in the bounded iteration. 

Based on the above requirements, we propose to provide different API for different kinds of iteration, and translate them into the same framework. The framework would implements the basic functionality like iteration StreamGraph building, runtime structure and checkpoint, and it allows specify different types of wrapper operators. 


Proposed Changes

Iteration API

Similar to FLIP-15, we would more tend to provide a structural iteration API to make it easier to be understand and avoid illegal graph. With this method, users are required to specify an IterationBody that generates the part of JobGraph inside the iteration. The inputs of the iteration body can be classified into two types:

  1. Variable: the data stream has a corresponding back edge, namely it would be get updated during iteration. The iteration body would get a data stream representing the union of the original input and the feedback data.
  2. Constant: the data stream that only get used inside the iteration. 

The iteration body should specify the DAG inside the iteration, and also the list of feedback streams and the output streams. The feedback streams would be union with the corresponding inputs and t he output streams would be provided to the caller routine. 

However, since we do not know the precise number and type of input streams, it is not easy to define a unified interface for the iteration body without type casting. Thus we would propose to use the annotation to allows for arbitrary number of inputs:

The IterationBody API
public interface IterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationVariable {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interfae IterationConstant {
		String value();
	}
}

// Example
new IterationBody() {
	@IterationFunction
	UnboundedIterationDeclarative iterate(
		@IterationVariable("first") DataStream<Integer> first
		@IterationConstant("second") DataStream<> second,
	) {
		DataStream<Integer> feedBack1 = ...;
		DataStream<String> feedBack2 = ...;
		DataStream<String> output1 = ...;
		return new UnboundedIterationDeclarationBuilder()
			.withFeedback("first", feedBack1)
			.withFeedback("second", feedBack2)
			.withOutput("output1", output1)
			.build();
		}
	}


The interface for the unbounded iteration is straightforward:

Unbounded Iteration API
public class UnboundedIteration {

    UnboundedIteration withBody(IterationBody body) {...}

	UnboundedIteration bindVariable(String name, DataStream<?> input) {...}

	UnboundedIteration bindConstant(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public class UnboundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}
	
		UnboundedIterationDeclaration build() {...}
	}
}

public class ResultStreams {

	public <T> DataStream<T> getStream(String name) {...}

}

// Example
public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new UnboundedIteration()
			.withBody(new IterationBody() {
				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<Integer> first
					@IterationConstant("second") DataStream<> second,
				) {
					DataStream<Integer> feedBack1 = ...;
					DataStream<String> feedBack2 = ...;
					DataStream<String> output1 = ...;
					return new UnboundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.build();
				}
			})
			.bindVariable("first", source1)
			.bindConstant("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}


The interface for the bounded iteration is more complex since it involves the concept of rounds and allows for active termination conditions.

The existing dataset iteration API views the iteration as a repeat execution of the same DAG, thus underlying it would automatically cache the iteration variables and iteration constants. This would cause bad performance for some algorithms since they could be able to cache these data in the memory according to the computation. To avoid this issue, we would support both operators that live across rounds and only in one round.  By default, an operator would live until the whole iteration ends and its state is kept. If users specify specially with forEachRound call, the operator would be re-created for each iteration.

Besides, the current dataset iteration only support synchronous iteration. However, synchronous iteration could be viewed as a special case of asynchronous iteration with additional synchronization. Therefore the bounded iteration does not provide embedded synchronous mechanism. Users could implement the synchronization via only emit the records for the next round in onRoundEnd(). This avoids explicit cache of the variable dataset and provides additional optimization opportunity for the algorithms.

The API for the bounded iteration is as follows:

Bounded Iteration API
public class BoundedIteration {

    BoundedIteration withBody(BoundedIterationBody body) {...}

	BoundedIteration bindVariable(String name, DataStream<?> input) {...}

	BoundedIteration bindConstant(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public class BoundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}

		<U> Builder until(TerminationCondition terminationCondition) { ... }
	
		BoundedIterationDeclaration build() {...}
	}
}

public class TerminationCondition {
	
	@Nullable DataStream<?> refStream;

	ConvergenceCriterion convergenceCriterion;	

}

public interface ConvergenceCriterion {
  boolean isConverged(Context context);

  interface Context {

     int[] getRound();

     <T> List<T> getStreamRecords();
  }
}

public interface EachRound {
	
	Map<String, DataStream<?>> executeInEachRound();

}

public class BoundedIterationContext {

	ResultStreams forEachRound(EachRound eachRound);
	
}

// The Progress Tracking interface for UDF / Operator
public interface BoundedIterationProgressListener<T> {

	default void onRoundStart(int[] round, Context context, Collector<T> collector){}

	void onRoundEnd(int[] round, Context context, Collector<T> collector);

}

// Example
public class BoundedIterationExample {

	public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> {
			
		private int value = 0;

		void flatMap(Integer v1, Collector<Integer> v2) {value += v1}

		void onRoundEnd(int[] round, Context context, Collector<T> collector) {
			collector.collector(value);
			value = 0;
		}
	}

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new BoundedIteration()
			.withBody(new IterationBody() {

				@IterationFunction
				BoundedIterationDeclaration iterate(
					@IterationVariable("first") DataStream<Integer> first,
					@IterationConstant("second") DataStream<String> second,
					BoundedIterationContext context
				) {
					DataStream<Integer> feedBack1 = ...;

					ResultStreams results = forEachRound(() -> {
						return Collections.singletonMap("result", feedback1.map(xx));
					});
					DataStream<String> feedBack2 = results.getStream("result");

					DataStream<String> output1 = feedback1.flatMap(new MyReducer());

					return new BoundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withOutput("output1", output1)
						.until(new TerminationCondition(feedback2, context -> context.getStreamRecords().size() == 0))
						.build();
				}
			})
			.bindVariable("first", source1)
			.bindConstant("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

Key Implementation

To wrapper the operators for the part of DAG inside the iteration, we would introduce a mock execution environment and build the iteration DAG inside this environment first, then when apply() method is called, we would translate the DAG into the real execution environment with the suitable wrapper. Besides, all the edges inside the iteration should be PIPELINE, we would also set the edge property when translating.

The runtime physical structure of the iteration is shown in Figure 1, which is similar to the current implementation. The head & tail is added by the framework. They would be colocated so that we could implement the feedback edge with the local queue.  The head could coordinator with an operator coordinator bind to a virtual operator ID for synchronization, including progress tracking and termination condition calculating. 

Figure 1. The physical runtime structure for the iteration. 


The operator wrapper needs to simulates the context that an operator executes. Specially, for operators with single-round lifecycle in bounded iteration, we would need to isolate the states used for each round and cleanup the corresponding state after the round end.

Implementation Plan

Logically all the iteration types would support both BATCH and STREAM execution mode. However, according to the algorithms' requirements, we would first implement 

  1. Unbounded iteration + STREAM mode.
  2. Bounded iteration + BATCH mode.

This would also simplify our implementation for operator wrappers with single-round lifecycle.

Compatibility, Deprecation, and Migration Plan

The API is added as a library inside flink-ml repository, thus it does not have compatibility problem. However, it has some difference with the existing iteration API and the algorithms would need some re-implementation.

Rejected Alternatives

Naiad has proposed a unified model for watermark mechanism (namely progress tracking outside of the iteration) and the progress tracking inside the iteration. It extends the event time and watermark to be a vector (long timestamp, int[] rounds) and implements a vectorized alignment algorithm. Although Naiad provides an elegant model, the direct implementation on Flink would requires a large amount of modification to the flink runtime, which would cause a lot of complexity and maintenance overhead.  Thus we would choose to implement a simplified version on top of FLINK, as a part of the flink-ml library.

For the iteration DAG build graph, it would be more simpler if we could directly refer to the data stream variables outside of the closure of iteration body. However, since we need to make the iteration DAG creation first happen in the mock execution environment, we could not use these variables directly, otherwise we would directly modify the real environment and won't have chance to add wrappers to the operators. 

  • No labels