Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Followed by the experience of FLIP-15, we would provide a structural-style iteration API. The API for the unbounded iteration isis 


Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
public class UnboundedIteration {

    UnboundedIteration withBody(UnboundedIterationBody body) {...}

	UnboundedIteration bindVariable(String name, DataStream<?> input) {...}

	UnboundedIteration bindConstant(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

public interface UnboundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationVariable {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationConstant {
		String value();
	}
}

public class UnboundedIterationDeclarativeBuilder {

	public UnboundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) {...}

	public UnboundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) {...}

	UnboundedIterationDeclaration build() {...}
}

public class ResultStreams {

	public <T> DataStream<T> getStream(String name) {...}

}

// Example
public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new UnboundedIteration()
			.withBody(new UnboundedIterationBody() {

				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<Integer> first
					@IterationConstant("second") DataStream<> second,
				) {
					DataStream<Integer> feedBack1 = ...;
					DataStream<String> feedBack2 = ...;
					DataStream<String> output1 = ...;
					return new UnboundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.build();
				}
			})
			.bindVariable("first", source1)
			.bindConstant("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

...

Code Block
languagejava
titleBounded Iteration API
linenumberstrue
public class BoundedIteration {

    BoundedIteration withBody(BoundedIterationBody body) {...}

	BoundedIteration bindVariable(String name, DataStream<?> input) {...}

	BoundedIteration bindConstant(String name, DataStream<?> input) {...}

	void enableAsynchronousIteration() {...}

	ResultStreams apply() {...}
}

public interface BoundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationInput {
		String value();
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationConstant {
		String value();
	}
}

public class BoundedIterationDeclarationBuilder {

	public BoundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) { ... }

	public BoundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) { ... }

	<U> BoundedIterationDeclarationBuilder until(TerminationCondition<U> terminationCondition) { ... }

	BoundedIterationDeclaration build() { ... }
}

public class TerminationCondition {
  TerminationCondition(
        @Nullable DataStream<?> convergenceCriteriaVariable,
        ConvergenceCriterion convergenceCriteria);
}

public interface ConvergenceCriterion {
  boolean isConverged(Context context);

  interface Context {
     int[] getRound();

     <T> List<T> getVariableValues();

     <T> List<T> getLastRoundVariabeValues();
  }
}

public interface EachRound {
	
	Map<String, DataStream<?>> eachRound();

}

public class BoundedIterationContext {

	ResultStreams forEachRound(EachRound eachRound);
	
}

// The Progress Tracking interface for UDF / Operator
public interface BoundedIterationProgressListener<T> {

	default void onRoundStart(int[] round, Context context, Collector<T> collector){}

	void onRoundEnd(int[] round, Context context, Collector<T> collector);

}

// Example
public class BoundedIterationExample {

	public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> {
			
		private int value = 0;

		void flatMap(Integer v1, Collector<Integer> v2) {value += v1}

		void onRoundEnd(int[] round, Context context, Collector<T> collector) {
			collector.collector(value);
			value = 0;
		}
	}

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new BoundedIteration()
			.withBody(new BoundedIterationBody() {

				@IterationFunction
				BoundedIterationDeclarative iterate(
					@IterationInput@IterationVariable("first") DataStream<Integer> first
					@IterationInput("second") DataStream<String> second,
					BoundedIterationContext context
				) {
					DataStream<Integer> feedBack1 = ...;

					ResultStreams results = forEachRound(() -> {
						return Collections.singletonMap("result", feedback1.map(xx));
					});
					DataStream<String> feedBack2 = results.getStream("result");

					DataStream<String> output1 = feedback1.flatMap(new MyReducer());

					return new BoundedIterationDeclarationBuilder()
						.withFeedback("first", feedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", output1)
						.until(ConvergenceCriteria.fixRound(5))
						.build();
				}
			})
			.bindInput("first", source1)
			.bindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

...