Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. For bounded iteration, we could track if we have processed all the records for a specific round. This is necessary for operators like aggregation inside the iteration

    BoundedIterationContext

    : if it is notified all the records of the current round is processed, it could output the result of this round. We could also track if the whole iteration is end, namely all the inputs are finished and no pending records inside the iteration. 
  2. For unbounded iteration, there is no concept of global rounds, and the only progress tracking is at the end of iteration. 

To support progress tracking, we need to introduce additional events inside the iteration, similar to the Watermark used outside the iteration. However, ordinary user operators won’t understand these events, thus we would need to wrap the operators to allow the framework to process these events. Different progress tracking requirements lead to different wrapper used.

On the other side, for the API, different types of iteration also have different requirements. For example, the unbounded iteration won’t provide the concept of round while round is explicitly in the bounded iteration. 

The difference of the progress tracking would also affect the API. For example, for bounded iteration, we could allow users to specify the termination condition based on number of rounds, but it is meaningless for the unbounded iteration.

To make the API easy to use, we propose to have dedicated API for different types of iteration, and underlying we will translate them onto the same framework. Based on the above requirements, we propose to provide different API for different kinds of iteration, and translate them into the same framework. The framework would implements the basic functionality like iteration StreamGraph building, runtime structure and checkpoint, and it allows specify to implement different iterations to implement different types of wrapper operatorsprogress tracking support

Proposed Changes

Iteration API

Similar to FLIP-15, we would more tend to provide a structural iteration API to make it easier to be understand and avoid illegal graph. With this method, users are required to specify an IterationBody that generates the part of JobGraph inside the iteration. The iteration body should specify the DAG inside the iteration, and also the list of feedback streams and the output streams. The feedback streams would be union with the corresponding inputs and the output streams would be provided to the caller routine. 

However, since we do not know the accurate number and type of input streams, it is not easy to define a unified interface for the iteration body without type casting. Thus we would propose to use the annotation to allows for arbitrary number of inputs:

As shown in Figure 1, an iteration is composed of 

  1. The inputs from outside of the iteration. 
  2. An iteration body specify the structure inside the iteration.
    1. The subgraph inside the iteration.
    2. Some input have corresponding feedbacks to update the underlying data stream. The feedbacks are union with the corresponding inputs.
    3. The outputs going out of the iteration. The outputs could be emitted from arbitrary data stream.

draw.io Diagram
bordertrue
diagramNameiteration
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth631
revision5

Figure 1. The structure of an iterations. 

Iteration API

Unbounded Iteration

Similar to FLIP-15, we would more tend to provide a structural iteration API to make it easier to be understand and avoid illegal graph. With this method, users are required to specify an IterationBody that generates the part of JobGraph inside the iteration. The iteration body should specify the DAG inside the iteration, and also the list of feedback streams and the output streams. The feedback streams would be union with the corresponding inputs and the output streams would be provided to the caller routine. 

However, since we do not know the accurate number and type of input streams, it is not easy to define a unified interface for the iteration body without type casting. Thus we would propose to use the annotation to allows for arbitrary number of inputs:

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
/** The iteration body specify the sub-graph inside the iteration. */
public interface IterationBody {

	/** This annotation marks the function as it would builds the subgraph. */
	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	/** This annotation marks a parameter of the iteration function as an input to the subgraph. */
Code Block
languagejava
titleThe IterationBody API
linenumberstrue
public interface IterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationInput {
		String value();
	}
}

// Example** An example usage for the iteration body with two inputs. */
new IterationBody() {
	@IterationFunction
	UnboundedIterationDeclarative iterate(
		@IterationInput("first") DataStream<Integer> first
		@IterationInput("second") DataStream<> second,
	) {
		DataStream<Integer> feedBack1 = ...;
		DataStream<String> output1 = ...;
		return new UnboundedIterationDeclarationBuilder()
			.withFeedback("first", feedBack1)
			.withOutput("output1", output1)
			.build();
		}
	}

...

Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
/** Builder for the unbounded iteration. */
public class UnboundedIteration {

    UnboundedIteration withBody(IterationBody body) {...}

	UnboundedIteration bindInput(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}

	ResultStreams apply() {...}
}
}
}

/** The expected return type of the iteration function, which specifies the feedbacks and outputs. */
public class UnboundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}
	
		UnboundedIterationDeclaration build() {...}
	}
}

/** The map of the output streams of an iteration.  */
public class ResultStreams {

	public <T> DataStream<T> getStream(String name) {...}

}

// Example** An example unbounded iteration. */
public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new UnboundedIteration()
			.withBody(new IterationBody() {
				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationInput("first") DataStream<Integer> first
					@IterationInput("second") DataStream<> second,
				) {
					DataStream<Integer> feedBack1 = ...;
					DataStream<String> output1 = ...;
					return new UnboundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withOutput("output1", output1)
						.build();
				}
			})
			.bindInput("first", source1)
			.bindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

The interface for the bounded iteration is more complex since it involves the concept of rounds and allows for active termination conditions.

}
}

To avoid more data is read from the inputs while too much data accumulate inside the iteration, the iteration would first process the feedback data if both side of data is available. 

For termination detection, the iteration would continue until

  1. All the inputs are terminated.
  2. There is no records inside the iteration subgraph.

Then the iteration terminates.

Bounded Iteration

As mentioned in the motivation, the existing dataset iteration API uses the "per-round" semantics: it The existing dataset iteration API views the iteration as a repeat execution of the same DAG, thus underlying it would automatically cache the iteration variables and iteration constants. This would merge the inputs and feedbacks and replay the inputs without feedbacks, and the operators inside the iteration live only for one-round. This might cause bad performance for some algorithms since they who could be able to cache these data in the memory according to the computation. To avoid this issue, we would support both operators that live across rounds and only in one round.  By default, an operator would live until the whole iteration ends and its state is kept. If users specify specially with forEachRound call, the operator would be re-created for each iteration.in a more efficient way. 

To avoid this issue, similar to the unbounded iteration, by default we use the "per-iteration" semantics: 

  1. Operators inside the iteration would live till the whole iteration is finished.
  2. We do not automatically merge the inputs and feedbacks. Instead, we union the original inputs and the feedbacks so that users could decide how the merge them.
  3. We do not replay the inputs without feedbacks. Users could decide to how to cache them more efficiently. 

Besides, to cooperate with the "per-round" semantics, previously the iteration is by default synchronous: before the current round fully finished, the feedback data is cached and would not be emitted. Thus it could not support some algorithms like asynchronous regression. To cope with this issue, we view synchronous iteration Besides, the current dataset iteration only support synchronous iteration. However, synchronous iteration could be viewed as a special case of asynchronous iteration with additional synchronization. Therefore the bounded iteration does not provide embedded synchronous mechanism. Users could implement the synchronization via only emit the records for the next round in onRoundEnd(). This avoids explicit cache of the variable dataset and provides additional optimization opportunity for the algorithms.synchronization. Thus by default the iteration is asynchronous, and we will describe how to implement the synchronous iteration in the next part. 



The API for the bounded iteration is as follows:

Code Block
languagejava
titleBounded Iteration API
linenumberstrue
/**
* The factory for the bounded iteration.
*/
public class BoundedIteration {

    BoundedIteration withBody(BoundedIterationBody body) {...}

	BoundedIteration bindInput(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public class BoundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}

		<U> Builder until(TerminationCondition terminationCondition) { ... }
	
		BoundedIterationDeclaration build() {...}
	}
}

public class TerminationCondition {
	
	@Nullable DataStream<?> refStream;

	Function<Context, Boolean> isConverged;

	TerminationCondition(DataStream<?> refStream

	interface Context {

     	int[] getRound();

     	<T> List<T> getStreamRecords();
  	}	
}

public interface EachRound {
	
	Map<String, DataStream<?>> executeInEachRound();

}

public class BoundedIterationUtils {

	static ResultStreams forEachRound(EachRound eachRound);
	
}

// The Progress Tracking interface for UDF / Operator
public interface BoundedIterationProgressListener<T> {

	void onRoundEnd(int[] round, Context context, Collector<T> collector);

    public interface Context {
		
		<X> void output(OutputTag<X> outputTag, X value);
		
		Long timestamp();

		TimerService timerService();
	}

}

// Example
public class BoundedIterationExample {

	public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> {
			
		private int value = 0;

		void flatMap(Integer v1, Collector<Integer> v2) {value += v1}

		void onRoundEnd(int[] round, Context context, Collector<T> collector) {
			collector.collector(value);
			value = 0;
		}
	}

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new BoundedIteration()
			.withBody(new IterationBody() {

				@IterationFunction
				BoundedIterationDeclaration iterate(
					@IterationInput("first") DataStream<Integer> first,
					@IterationInput("second") DataStream<String> second
				) {
					DataStream<Integer> feedBack1 = ...;

					ResultStreams results = BoundedIterationUtils.forEachRound(() -> {
						return Collections.singletonMap("result", feedback1.map(xx));
					});
					DataStream<String> feedBack2 = results.getStream("result");

					DataStream<String> output1 = feedback1.flatMap(new MyReducer());

					return new BoundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withOutput("output1", output1)
						.until(new TerminationCondition(feedback2, context -> context.getStreamRecords().size() == 0))
						.build();
				}
			})
			.bindInput("first", source1)
			.bindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

...