THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class UnboundedIteration { UnboundedIteration withBody(UnboundedIterationBody body) { ... } UnboundedIteration bindInput(String name, DataStream<?> input) { ... } ResultStreams apply() { ... } } public interface UnboundedIterationBody { @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface IterationFunction { } @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationVariableIterationInput { String value(); } } public class UnboundedIterationDeclarativeBuilder { public UnboundedIterationDeclarativeBuilderUnboundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) { ... return this; } public UnboundedIterationDeclarativeBuilderUnboundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) { ... return this; } UnboundedIterationDeclarativeUnboundedIterationDeclaration build() { return ... } } public class ResultStreams { public <T> DataStream<T> getStream(String name) { return ...} } } // Example public class UnboundedIterationExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new UnboundedIterationAPIUnboundedIteration() .withBody(new UnboundedIterationBody() { @IterationFunction UnboundedIterationDeclarative iterate( @IterationVariable@IterationInput("first") DataStream<Integer> first @IterationVariable@IterationInput("second") DataStream<> second ) { DataStream<Integer> feedBack1 = ...; DataStream<String> feedBack2 = ...; DataStream<String> output1 = ...; return new UnboundedIterationDeclarativeBuilderUnboundedIterationDeclarationBuilder() .withFeedback("first", feedBack1) .withFeedback("second", feedBack2) .withOutput("output1", output1) .build(); } }) .bindInput("first", source1) .bindInput("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class BoundedIteration { BoundedIteration withBody(BoundedIterationBody body) {...} BoundedIteration bindInput(String name, DataStream<?> input) {...} void enableAsynchronousIteration() {...} ResultStreams apply() {...} } public interface BoundedIterationBody { @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface IterationFunction { } @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationInput { String value(); } } public class BoundedIterationDeclarationBuilder { public BoundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) { ... return this; } BoundedIterationpublic BoundedIterationDeclarationBuilder bindInputwithOutput(String name, DataStream<?> input) { ... } ResultStreams apply() { ... } } output) { ... return this; } <U> BoundedIterationDeclarationBuilder until(TerminationCondition<U> terminationCondition); BoundedIterationDeclaration build() { return ... } } class TerminationCondition { TerminationCondition( @Nullable DataStream<?> convergenceCriteriaVariable, ConvergenceCriterion convergenceCriteria); } interface ConvergenceCriterion { boolean isConverged(Context context); interface Context { int[] getRound(); <T> List<T> getVariableValues(); <T> List<T> getLastRoundVariabeValues(); } } public interface EachRound { @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface IterationFunction {} @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationInput { String value(); } } class BoundedIterationUtils { void forEachRound(EachRound eachRound) {...} } // The Progress Tracking interface for UDF / Operator public interface BoundedIterationProgressListener<T> { default void onRoundStart(int[] round, Context context, Collector<T> collector){} void onRoundEnd(int[] round, Context context, Collector<T> collector); } // Example public class BoundedIterationExample { public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> { private int value = 0; void flatMap(Integer v1, Collector<Integer> v2) {value += v1} void onRoundEnd(int[] round, Context context, Collector<T> collector) { collector.collector(value); value = 0; } } public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new BoundedIteration() .withBody(new BoundedIterationBody() { @IterationFunction BoundedIterationDeclarative iterate( @IterationInput("first") DataStream<Integer> first @IterationInput("second") DataStream<String> second ) { DataStream<Integer> feedBack1 = ...; ResultStreams results = forEachRound(new EachRound() { @EachRound public DataStream<String> eachRound(DataStream<Integer> in) { return ... } }); DataStream<String> feedBack2 = results.getStream("result"); DataStream<String> output1 = feedback1.flatMap(new MyReducer()); return new BoundedIterationDeclarationBuilder() .withFeedback("first", feedBack1) .withFeedback("second", feedBack2) .withOutput("output1", output1) .until(ConvergenceCriteria.fixRound(5)) .build(); } }) .bindInput("first", source1) .bindInput("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
Key Implementation
Compatibility, Deprecation, and Migration Plan
...