You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Status

Current state: ACCEPT

Discussion thread: <TODO>

JIRA: <TODO>

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Iteration is a basic building block for a ML library. It is required for training ML models for both offline and online cases. However, different cases might require different types of iteration:

  1. Offline Training: The training dataset is bounded, the algorithm usually iterates the dataset multiple rounds to optimize the model until convergence.
  2. Online Training: The training dataset is unbounded, the algorithm usually calculates the update to the model according to the current model and the input records, then sends the update back to the operator that holds the model.


Previously Flink supported bounded iteration with DataSet API and supported the unbounded iteration with DataStream API. However, since Flink aims to deprecate the DataSet API and the iteration in the DataStream API is rather incomplete, thus we would require to re-implement a new iteration library in the Flink-ml repository to support the algorithms. 

Overall Design

To reduce the development and maintenance overhead, it would be preferred to have a unified implementation for different types of iterations. In fact, the different iteration types shares the same requirements in runtime implementation:

  1. All the iteration types should support multiple inputs and multiple outputs. 
  2. All the iteration types require some kind of back edges that transfer the data back to the iteration head. Since Flink does not support cycles in scheduler and network stack, the back edges should not be visible in the StreamGraph and JobGraph.
  3. All the iteration should support checkpoints mechanism in Stream execution mode.
  4. All the iteration types should support 


Different types of iterations differ in their requirements for Progress tracking. Progress tracking is analogous to the watermark outside the iteration and it tracks the “progress” inside the iteration:

  1. For bounded iteration, we could track if we have processed all the records for a specific round. This is necessary for operators like aggregation inside the iteration: if it is notified all the records of the current round is processed, it could output the result of this round. We could also track if the whole iteration is end, namely all the inputs are finished and no pending records inside the iteration. 
  2. For unbounded iteration, there is no concept of global rounds, and the only progress tracking is end of iteration. 


To support progress tracking, we need to introduce additional events inside the iteration, similar to the Watermark used outside the iteration. However, ordinary user operators won’t understand these events, thus we would need to wrap the operators. Different progress tracking requirements lead to different wrapper used.

On the other side, for the API, different types of iteration also have different requirements. For example, the unbounded iteration won’t provide the concept of round while round is explicitly in the bounded iteration. 

Based on the above requirements, we propose to provide different API for different kinds of iteration, and translate them into the same framework. The framework would implements the basic functionality like iteration StreamGraph building, runtime structure and checkpoint, and it allows specify different types of wrapper operators. 

Proposed Changes

Iteration API

Followed by the experience of FLIP-15, we would provide a structural-style iteration API. The API for the unbounded iteration is

Unbounded Iteration API
public class UnboundedIteration {

    UnboundedIteration withBody(UnboundedIterationBody body) {...}

	UnboundedIteration bindVariable(String name, DataStream<?> input) {...}

	UnboundedIteration bindConstant(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public interface UnboundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationVariable {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationConstant {
		String value();
	}
}

public class UnboundedIterationDeclarativeBuilder {

	public UnboundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) {...}

	public UnboundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) {...}

	UnboundedIterationDeclaration build() {...}
}

public class ResultStreams {

	public <T> DataStream<T> getStream(String name) {...}

}

// Example
public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new UnboundedIteration()
			.withBody(new UnboundedIterationBody() {

				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<Integer> first
					@IterationConstant("second") DataStream<> second,
				) {
					DataStream<Integer> feedBack1 = ...;
					DataStream<String> feedBack2 = ...;
					DataStream<String> output1 = ...;
					return new UnboundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.build();
				}
			})
			.bindVariable("first", source1)
			.bindConstant("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}


The bounded stream iteration API is 

Bounded Iteration API
public class BoundedIteration {

    BoundedIteration withBody(BoundedIterationBody body) {...}

	BoundedIteration bindVariable(String name, DataStream<?> input) {...}

	BoundedIteration bindConstant(String name, DataStream<?> input) {...}

	void enableAsynchronousIteration() {...}

	ResultStreams apply() {...}
}

public interface BoundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationInput {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationConstant {
		String value();
	}
}

public class BoundedIterationDeclarationBuilder {

	public BoundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) { ... }

	public BoundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) { ... }

	<U> BoundedIterationDeclarationBuilder until(TerminationCondition<U> terminationCondition) { ... }

	BoundedIterationDeclaration build() { ... }
}

public class TerminationCondition {
  TerminationCondition(
        @Nullable DataStream<?> convergenceCriteriaVariable,
        ConvergenceCriterion convergenceCriteria);
}

public interface ConvergenceCriterion {
  boolean isConverged(Context context);

  interface Context {
     int[] getRound();

     <T> List<T> getVariableValues();

     <T> List<T> getLastRoundVariabeValues();
  }
}

public interface EachRound {
	
	Map<String, DataStream<?>> eachRound();

}

public class BoundedIterationContext {

	ResultStreams forEachRound(EachRound eachRound);
	
}

// The Progress Tracking interface for UDF / Operator
public interface BoundedIterationProgressListener<T> {

	default void onRoundStart(int[] round, Context context, Collector<T> collector){}

	void onRoundEnd(int[] round, Context context, Collector<T> collector);

}

// Example
public class BoundedIterationExample {

	public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> {
			
		private int value = 0;

		void flatMap(Integer v1, Collector<Integer> v2) {value += v1}

		void onRoundEnd(int[] round, Context context, Collector<T> collector) {
			collector.collector(value);
			value = 0;
		}
	}

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new BoundedIteration()
			.withBody(new BoundedIterationBody() {

				@IterationFunction
				BoundedIterationDeclarative iterate(
					@IterationInput("first") DataStream<Integer> first
					@IterationInput("second") DataStream<String> second,
					BoundedIterationContext context
				) {
					DataStream<Integer> feedBack1 = ...;

					ResultStreams results = forEachRound(() -> {
						return Collections.singletonMap("result", feedback1.map(xx));
					});
					DataStream<String> feedBack2 = results.getStream("result");

					DataStream<String> output1 = feedback1.flatMap(new MyReducer());

					return new BoundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.until(ConvergenceCriteria.fixRound(5))
						.build();
				}
			})
			.bindInput("first", source1)
			.bindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}


Compared to the existing unbounded 

Key Implementation



Compatibility, Deprecation, and Migration Plan

The 


Rejected Alternatives



  • No labels