THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class UnboundedIteration { UnboundedIteration withBody(UnboundedIterationBody body) { ... } UnboundedIteration bindInput(String name, DataStream<?> input) { ... } ResultStreams apply() { ... } } public interface UnboundedIterationBody { @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface IterationFunction { } @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationVariable { String value(); } } public class UnboundedIterationDeclarativeBuilder { public UnboundedIterationDeclarativeBuilder withFeedback(String name, DataStream<?> feedback) { ... return this; } public UnboundedIterationDeclarativeBuilder withOutput(String name, DataStream<?> output) { ... return this; } UnboundedIterationDeclarative build() { return ... } } public class ResultStreams { public <T> DataStream<T> getStream(String name) { return ... } } // Example public class UnboundedIterationExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> sourcesource1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new UnboundedIterationAPI() .withBody(new UnboundedIterationBody() { @IterationFunction UnboundedIterationDeclarative iterate( @IterationVariable("first") DataStream<?>DataStream<Integer> first @IterationVariable("second") DataStream<> second ) { DataStream<?> feedBack = nullDataStream<Integer> feedBack1 = ...; DataStream<String> feedBack2 = ...; DataStream<String> output1 = ...; return new UnboundedIterationDeclarativeBuilder() .withFeedback("first", feedBackfeedBack1) .withFeedback("second", feedBack2) .withOutput("output1", feedBackoutput1) .build(); } }) .bindInput("first", source1) .bindInput("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
The bounded stream iteration API is
Key Implementation
Compatibility, Deprecation, and Migration Plan
...