Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
public class UnboundedIteration {

    UnboundedIteration withBody(UnboundedIterationBody body) {
        ...
	}

	ResultStreams apply() {
        ...
	}
}

public interface UnboundedIterationBody {

	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {
	}

	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationVariable {

		String value();

	}
}

public static class UnboundedIterationDeclarativeBuilder {

	public UnboundedIterationDeclarativeBuilder withFeedback(String name, DataStream<?> feedback) {
		...
		return this;
	}

	public UnboundedIterationDeclarativeBuilder withOutput(String name, DataStream<?> output) {
		...
		return this;
	}

	UnboundedIterationDeclarative build() {
		return ...
	}
}

public static class ResultStreams {

	public <T> DataStream<T> getStream(String name) {
		return ...
	}
}

public class UnboundedIterationExample {
	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5);
		ResultStreams resultStreams = new UnboundedIterationAPI()
			.withBody(new UnboundedIterationBody() {

				@IterationFunction
				UnboundedIterationDeclarative iterate(
					@IterationVariable("first") DataStream<?> first
				) {
					DataStream<?> feedBack = null;
					return new UnboundedIterationDeclarativeBuilder()
						.withFeedback("first", feedBack)
						.withOutput("output1", feedBack)
						.build();
				}
			}).apply();

		DataStream<String> output = resultStreams.getStream("output1");
	}
}

...