Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
public class UnboundedIteration {

    UnboundedIteration withBody(UnboundedIterationBody body) {
        ...
	}

	UnboundedIteration bindInput(String name, DataStream<?> input) {
		...
	}

	ResultStreams apply() {
        ...
	}
}

public interface UnboundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationVariable {

		String value();

	}
}

public class UnboundedIterationDeclarativeBuilder {

	public UnboundedIterationDeclarativeBuilder withFeedback(String name, DataStream<?> feedback) {
		...
		return this;
	}

	public UnboundedIterationDeclarativeBuilder withOutput(String name, DataStream<?> output) {
		...
		return this;
	}

	UnboundedIterationDeclarative build() {
		return ...
	}
}

public class ResultStreams {

	public <T> DataStream<T> getStream(String name) {
		return ...
	}
}

// Example
public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> sourcesource1 = env.fromElements(1, 2, 3, 4, 5);
		DataStream<String> source2 = env.fromElements("2", "3", "4");

		ResultStreams resultStreams = new UnboundedIterationAPI()
			.withBody(new UnboundedIterationBody() {

				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<?>DataStream<Integer> first
					@IterationVariable("second") DataStream<> second
				) {
					DataStream<?> feedBack = nullDataStream<Integer> feedBack1 = ...;
					DataStream<String> feedBack2 = ...;
					DataStream<String> output1 = ...;
					return new UnboundedIterationDeclarativeBuilder()
						.withFeedback("first", feedBackfeedBack1)
						.withFeedback("second", feedBack2)
						.withOutput("output1", feedBackoutput1)
						.build();
				}
			})
			.bindInput("first", source1)
			.bindInput("second", source2)
			.apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}


The bounded stream iteration API is 



Key Implementation



Compatibility, Deprecation, and Migration Plan

...