Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Lack of the support for multiple inputs, arbitrary outputs and nested iteration for both iteration APIs, which is required by algorithms like LDA or boost. In the new iteration we would also support these functionalities.
  2. Lack of asynchronous iteration support for the DataSet iteration, which is required by algorithms like asynchronous linear regression, in this the new iterations we would support both synchronous and asynchronous modes for the bounded iteration. 
  3. The current DataSet iteration by default provides a "for each round" semantics, namely users only need to specify the computation logic in each iteration, and the framework would executes the subgraph multiple times until convergence. To cooperate with the semantics, the DataSet iteration framework would merge the initial input and the feedback (bulk style and delta style), and replay the datasets comes from outside of the iteration. This method is easier to use, but it also limit some possible optimizations.

...

  1. For bounded iteration, we could track if we have processed all the records for a specific round. This is necessary for operators like aggregation inside the iteration

    BoundedIterationContext

    : if it is notified all the records of the current round is processed, it could output the result of this round. We could also track if the whole iteration is end, namely all the inputs are finished and no pending records inside the iteration. 
  2. For unbounded iteration, there is no concept of global rounds, and the only progress tracking is at the end of iteration. 

To support progress tracking, we need to introduce additional events inside the iteration, similar to the Watermark used outside the iteration. However, ordinary user operators won’t understand these events, thus we would need to wrap the operators to allow the framework to process these events. Different progress tracking requirements lead to different wrapper used.

...

Code Block
languagejava
titleBounded Iteration API
linenumberstrue
/**
* The factory for the bounded iteration.
*/
public class BoundedIteration {

    BoundedIteration withBody(BoundedIterationBody body) {...}

	BoundedIteration bindInput(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public class BoundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}

		<U> Builder until(TerminationCondition terminationCondition) { ... }
	
		BoundedIterationDeclaration build() {...}
	}
}

public class TerminationCondition {
	
	@Nullable DataStream<?> refStream;

	ConvergenceCriterion convergenceCriterion;	

}

public interface ConvergenceCriterion {
  boolean isConverged(Context context);

  interface Context {

     	int[] getRound();

     	<T> List<T> getStreamRecords();
  	}	
}

public interface EachRound {
	
	Map<String, DataStream<?>> executeInEachRound();

}

public class BoundedIterationContextBoundedIterationUtils {

	static ResultStreams forEachRound(EachRound eachRound);
	
}

// The Progress Tracking interface for UDF / Operator
public interface BoundedIterationProgressListener<T> {

	void onRoundEnd(int[] round, Context context, Collector<T> collector);

    public interface Context {
		
		<X> void output(OutputTag<X> outputTag, X value);
		
		Long timestamp();

		TimerService timerService();
	}

}

// Example
public class BoundedIterationExample {

	public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> {
			
		private int value = 0;

		void flatMap(Integer v1, Collector<Integer> v2) {value += v1}

		void onRoundEnd(int[] round, Context context, Collector<T> collector) {
			collector.collector(value);
			value = 0;
		}
	}

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new BoundedIteration()
			.withBody(new IterationBody() {

				@IterationFunction
				BoundedIterationDeclaration iterate(
					@IterationInput("first") DataStream<Integer> first,
					@IterationInput("second") DataStream<String> second,
					BoundedIterationContext context
				) {
					DataStream<Integer> feedBack1 = ...;

					ResultStreams results = BoundedIterationUtils.forEachRound(() -> {
						return Collections.singletonMap("result", feedback1.map(xx));
					});
					DataStream<String> feedBack2 = results.getStream("result");

					DataStream<String> output1 = feedback1.flatMap(new MyReducer());

					return new BoundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withOutput("output1", output1)
						.until(new TerminationCondition(feedback2, context -> context.getStreamRecords().size() == 0))
						.build();
				}
			})BoundedIterationProgressListener
			.bindInput("first", source1)
			.bindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

...