Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Lack of deterministic termination detection and checkpoint support for the DataStream iteration., which 
  2. Lack of the support for multiple inputs, arbitrary outputs and nested iteration for both iteration APIs, which is required by algorithms like LDA or boost. 
  3. Lack of asynchronous iteration support for the DataSet iteration, which is required by algorithms like asynchronous linear regression
  4. The current DataSet iteration by default provides a "for each round" semantics, namely users only need to specify the computation logic in each iteration, and the framework would executes the subgraph multiple times until convergence. To cooperate with the semantics, the DataSet iteration will replay the 

Overall Design

  1. framework would merge the initial input and the feedback (bulk style and delta style), and replay the datasets comes from outside of the iteration. This method is easier to use, but it also limit some possible optimizations.

We also would like to improve these caveats in the new iteration library. 

Overall Design

To reduce the development and maintenance overhead, it would be preferred to have a unified To reduce the development and maintenance overhead, it would be preferred to have a unified implementation for different types of iterations. In fact, the different iteration types shares the same requirements in runtime implementation:

...

To support progress tracking, we need to introduce additional events inside the iteration, similar to the Watermark used outside the iteration. However, ordinary user operators won’t understand these events, thus we would need to wrap the operators . Different to allow the framework to process these events. Different progress tracking requirements lead to different wrapper used.

...

Based on the above requirements, we propose to provide different API for different kinds of iteration, and translate them into the same framework. The framework would implements the basic functionality like iteration StreamGraph building, runtime structure and checkpoint, and it allows specify different types of wrapper operators. 

Proposed Changes

ResultStreams

Iteration API

Similar to FLIP-15, we would more tend to provide a structural iteration API to make it easier to be understand and avoid illegal graph. With this method, users are required to specify an IterationBody that generates the part of JobGraph inside the iteration. The inputs of the iteration body can be classified into two types:

...

that generates the part of JobGraph inside the iteration.

...

The iteration body should specify the DAG inside the iteration, and also the list of feedback streams and the output streams. The feedback streams would be union with the corresponding inputs and t he the output streams would be provided to the caller routine. 

However, since we do not know the precise accurate number and type of input streams, it is not easy to define a unified interface for the iteration body without type casting. Thus we would propose to use the annotation to allows for arbitrary number of inputs:

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
public interface IterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationVariableIterationInput {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interfae IterationConstant {
		String value();
	}
}

// Example
new IterationBody() {
	@IterationFunction
	UnboundedIterationDeclarative iterate(
		@IterationVariable@IterationInput("first") DataStream<Integer> first
		@IterationConstant("second")	@IterationInput DataStream<> second,
	) {
		DataStream<Integer> feedBack1 = ...;
		DataStream<String> feedBack2 = ...;
		DataStream<String> output1 = ...;
		return new UnboundedIterationDeclarationBuilder()
			.withFeedback("first", feedBack1)
			.withFeedback("second", feedBack2)
			.withOutput("output1", output1)
			.build();
		}
	}

...

Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
public class UnboundedIteration {

    UnboundedIteration withBody(IterationBody body) {...}

	UnboundedIteration bindVariable(String name, DataStream<?> input) {...}

	UnboundedIteration bindConstantbindInput(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public class UnboundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}
	
		UnboundedIterationDeclaration build() {...}
	}
}

public class ResultStreams {

	public <T> DataStream<T> getStream(String name) {...}

}

// Example
public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new UnboundedIteration()
			.withBody(new IterationBody() {
				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable@IterationInput("first") DataStream<Integer> first
					@IterationConstant@IterationInput("second") DataStream<> second,
				) {
					DataStream<Integer> feedBack1 = ...;
					DataStream<String> feedBack2 = ...;
					DataStream<String> output1 = ...;
					return new UnboundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.build();
				}
			})
			.bindVariablebindInput("first", source1)
			.bindConstantbindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

...

The existing dataset iteration API views the iteration as a repeat execution of the same DAG, thus underlying it would automatically cache the iteration variables and iteration constants. This would cause bad performance for some algorithms since they could be able to cache these data in the memory according to the computation. To avoid this issue, we would support both operators that live across rounds and only in one round.  By default, an operator would live until the whole iteration ends and its state is kept. If users specify specially with forEachRound call, the operator would be re-created for each iteration. each iteration.

To make the original DataSet iteration users be able to migrate to the new API easily, the new iteration would provide 


Besides, the current dataset iteration only support synchronous iteration. However, synchronous iteration could be viewed as a special case of asynchronous iteration with additional synchronization. Therefore the bounded iteration does not provide embedded synchronous mechanism. Users could implement the synchronization via only emit the records for the next round in onRoundEnd(). This avoids explicit cache of the variable dataset and provides additional optimization opportunity for the algorithms.

...

Code Block
languagejava
titleBounded Iteration API
linenumberstrue
/**
* The factory for the bounded iteration.
*/
public class BoundedIteration {

    BoundedIteration withBody(BoundedIterationBody body) {...}

	BoundedIteration bindVariablebindInput(String name, DataStream<?> input) {...}

	BoundedIterationResultStreams bindConstant(String name, DataStream<?> inputapply() {...}

	ResultStreams apply() {...}
}
}

/**
* The 
*/
public class BoundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}

		<U> Builder until(TerminationCondition terminationCondition) { ... }
	
		BoundedIterationDeclaration build() {...}
	}
}

public class TerminationCondition {
	
	@Nullable DataStream<?> refStream;

	ConvergenceCriterion convergenceCriterion;	

}

public interface ConvergenceCriterion {
  boolean isConverged(Context context);

  interface Context {

     int[] getRound();

     <T> List<T> getStreamRecords();
  }
}

public interface EachRound {
	
	Map<String, DataStream<?>> executeInEachRound();

}

public class BoundedIterationContext {

	ResultStreams forEachRound(EachRound eachRound);
	
}

// The Progress Tracking interface for UDF / Operator
public interface BoundedIterationProgressListener<T> {

	default void onRoundStartonRoundEnd(int[] round, Context context, Collector<T> collector){}

	void onRoundEnd(int[] round, Context context, Collector<T> collector);;

    public interface Context {
		
		<X> void output(OutputTag<X> outputTag, X value);
		
		Long timestamp();

		TimerService timerService();
	}

}

// Example
public class BoundedIterationExample {

	public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> {
			
		private int value = 0;

		void flatMap(Integer v1, Collector<Integer> v2) {value += v1}

		void onRoundEnd(int[] round, Context context, Collector<T> collector) {
			collector.collector(value);
			value = 0;
		}
	}

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new BoundedIteration()
			.withBody(new IterationBody() {

				@IterationFunction
				BoundedIterationDeclaration iterate(
					@IterationVariable@IterationInput("first") DataStream<Integer> first,
					@IterationConstant@IterationInput("second") DataStream<String> second,
					BoundedIterationContext context
				) {
					DataStream<Integer> feedBack1 = ...;

					ResultStreams results = forEachRound(() -> {
						return Collections.singletonMap("result", feedback1.map(xx));
					});
					DataStream<String> feedBack2 = results.getStream("result");

					DataStream<String> output1 = feedback1.flatMap(new MyReducer());

					return new BoundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withOutput("output1", output1)
						.until(new TerminationCondition(feedback2, context -> context.getStreamRecords().size() == 0))
						.build();
				}
			})BoundedIterationProgressListener
			.bindVariablebindInput("first", source1)
			.bindConstantbindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

...

Logically all the iteration types would support both BATCH and STREAM execution mode. However, according to the algorithms' requirements, we would first implement 

  1. Unbounded iteration + STREAM mode.
  2. Bounded iteration + BATCH mode.

This would also simplify our implementation for operator wrappers with single-round lifecycle.Currently we do not see requirements on Bounded iteration + STREAM mode, if there are additional requirement in the future we would implement this mode, and it could als be supported with the current framework. 

Compatibility, Deprecation, and Migration Plan

...