Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

Iteration API

Followed by the experience of Similar to FLIP-15, we would more tend to provide a structural -style iteration API. The API for the unbounded iteration is iteration API to make it easier to be understand and avoid illegal graph. With this method, users are required to specify an IterationBody that generates the part of JobGraph inside the iteration. The inputs of the iteration body can be classified into two types:

  1. Variable: the data stream has a corresponding back edge, namely it would be get updated during iteration. The iteration body would get a data stream representing the union of the original input and the feedback data.
  2. Constant: the data stream that only get used inside the iteration. 

The iteration body should specify the DAG inside the iteration, and also the list of feedback streams and the output streams. The feedback streams would be union with the corresponding inputs and t he output streams would be provided to the caller routine. 

However, since we do not know the precise number and type of input streams, it is not easy to define a unified interface for the iteration body without type casting. Thus we would propose to use the annotation to allows for arbitrary number of inputs:

Code Block
languagejava
titleUnbounded Iteration The IterationBody API
linenumberstrue
public classinterface UnboundedIterationIterationBody {

    UnboundedIteration withBody(UnboundedIterationBody body) {...}

	UnboundedIteration bindVariable(String name, DataStream<?> input) {...}

	UnboundedIteration bindConstant(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public interface UnboundedIterationBody {	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	@Target(ElementType.METHODPARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunctionIterationVariable {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface@interfae IterationVariableIterationConstant {
		String value();
	}
}

// Example
new IterationBody() {
	@IterationFunction
	UnboundedIterationDeclarative iterate(
		@IterationVariable("first") DataStream<Integer> first
		@IterationConstant("second") DataStream<> second,
	) {
		DataStream<Integer> feedBack1 = ...;
		DataStream<String> feedBack2 = ...;
		DataStream<String> output1 = ...;
		return new UnboundedIterationDeclarationBuilder()
			.withFeedback("first", feedBack1)
			.withFeedback("second", feedBack2)
			.withOutput("output1", output1)
			.build();
		}
	}


The interface for the unbounded iteration is straightforward:

Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
public class UnboundedIteration {

    UnboundedIteration withBody(IterationBody body) {...}

	UnboundedIteration bindVariable(String name, DataStream<?> input) {...}

	UnboundedIteration bindConstant(String name, DataStream<?> input) {...}

	ResultStreams apply() {...	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationConstant {
		String value();
	}
}

public class UnboundedIterationDeclarativeBuilder {

	public UnboundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) {...}

	public UnboundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) {...}

	UnboundedIterationDeclaration build() {...}
}

public class ResultStreams {

	public <T> DataStream<T> getStream(String name) {...}

}

// Example
public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new UnboundedIteration()
			.withBody(new UnboundedIterationBodyIterationBody() {

				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<Integer> first
					@IterationConstant("second") DataStream<> second,
				) {
					DataStream<Integer> feedBack1 = ...;
					DataStream<String> feedBack2 = ...;
					DataStream<String> output1 = ...;
					return new UnboundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.build();
				}
			})
			.bindVariable("first", source1)
			.bindConstant("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}


The bounded stream iteration API is interface for the bounded iteration is more complex since it involves the concept of rounds and allows for active termination conditions.

The existing dataset iteration API views the iteration as a repeat execution of the same DAG, thus underlying it would automatically cache the iteration variables and iteration constants. This would cause bad performance for some algorithms since they could be able to cache these data in the memory according to the computation. To avoid this issue, we would support both operators that live across rounds and only in one round.  By default, an operator would live until the whole iteration ends and its state is kept. If users specify specially with forEachRound call, the operator would be re-created for each iteration.

Besides, the current dataset iteration only support synchronous iteration. However, synchronous iteration could be viewed as a special case of asynchronous iteration with additional synchronization. Therefore the bounded iteration does not provide embedded synchronous mechanism. Users could implement the synchronization via only emit the records for the next round in onRoundEnd(). This avoids explicit cache of the variable dataset and provides additional optimization opportunity for the algorithms.

The API for the bounded iteration is as follows:

Code Block
languagejava
titleBounded Iteration API
linenumberstrue
public class BoundedIteration {

    BoundedIteration withBody(BoundedIterationBody body) {...}

	BoundedIteration bindVariable(String name, DataStream<?> input) {...}

	BoundedIteration bindConstant(String name, DataStream<?> input) {...}

	void enableAsynchronousIteration() {...}

	ResultStreams apply() {...}
}

public interface BoundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationInput {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationConstant {
		String value();
	}
}

public class BoundedIterationDeclarationBuilder {

	public BoundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) { ... }

	public BoundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) { ... }

	<U> BoundedIterationDeclarationBuilder until(TerminationCondition<U> terminationCondition) { ... }

	BoundedIterationDeclaration build() { ... }
}

public class TerminationCondition {
  TerminationCondition(
        @Nullable DataStream<?> convergenceCriteriaVariable,
        ConvergenceCriterion convergenceCriteria);
}

public interface ConvergenceCriterion {
  boolean isConverged(Context context);

  interface Context {
     int[] getRound();

     <T> List<T> getVariableValues();

     <T> List<T> getLastRoundVariabeValues();
  }
}

public interface EachRound {
	
	Map<String, DataStream<?>> eachRound();

}

public class BoundedIterationContext {

	ResultStreams forEachRound(EachRound eachRound);
	
}

// The Progress Tracking interface for UDF / Operator
public interface BoundedIterationProgressListener<T> {

	default void onRoundStart(int[] round, Context context, Collector<T> collector){}

	void onRoundEnd(int[] round, Context context, Collector<T> collector);

}

// Example
public class BoundedIterationExample {

	public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> {
			
		private int value = 0;

		void flatMap(Integer v1, Collector<Integer> v2) {value += v1}

		void onRoundEnd(int[] round, Context context, Collector<T> collector) {
			collector.collector(value);
			value = 0;
		}
	}

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new BoundedIteration()
			.withBody(new BoundedIterationBody() {

				@IterationFunction
				BoundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<Integer> first
					@IterationInput@IterationConstant("second") DataStream<String> second,
					BoundedIterationContext context
				) {
					DataStream<Integer> feedBack1 = ...;

					ResultStreams results = forEachRound(() -> {
						return Collections.singletonMap("result", feedback1.map(xx));
					});
					DataStream<String> feedBack2 = results.getStream("result");

					DataStream<String> output1 = feedback1.flatMap(new MyReducer());

					return new BoundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.until(ConvergenceCriteria.fixRound(5))
						.build();
				}
			})
			.bindInput("first", source1)
			.bindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

Compared to the existing unbounded 

Key Implementation

Key Implementation

To wrapper the operators for the part of DAG inside the iteration, we would introduce a mock execution environment and build the iteration DAG inside this environment first, then when apply() method is called, we would translate the DAG into the real execution environment with the suitable wrapper. Besides, all the edges inside the iteration should be PIPELINE, we would also set the edge property when translating.

The runtime physical structure of the iteration is shown in Figure 1, which is similar to the current implementation. The head & tail is added by the framework. They would be colocated so that we could implement the feedback edge with the local queue.  The head could coordinator with an operator coordinator bind to a virtual operator ID for synchronization, including progress tracking and termination condition calculating. 

draw.io Diagram
bordertrue
diagramNameIteration structure
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth671
revision2

Figure 1. The physical runtime structure for the iteration. 


The operator wrapper needs to simulates the context that an operator executes. Specially, for operators with single-round lifecycle in bounded iteration, we would need to isolate the states used for each round and cleanup the corresponding state after the round end.

Implementation Plan

Logically all the iteration types would support both BATCH and STREAM execution mode. However, according to the algorithms' requirements, we would first implement 

  1. Unbounded iteration + STREAM mode.
  2. Bounded iteration + BATCH mode.

This would also simplify our implementation for operator wrappers with single-round lifecycle.

Compatibility, Deprecation, and Migration Plan

The The API is added as a library inside flink-ml repository, thus it does not have compatibility problem. However, it has some difference with the existing iteration API and the algorithms would need some re-implementation.

Rejected Alternatives