THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Followed by the experience of FLIP-15, we would provide a structural-style iteration API. The API for the unbounded iteration isis
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class UnboundedIteration { UnboundedIteration withBody(UnboundedIterationBody body) {...} UnboundedIteration bindVariable(String name, DataStream<?> input) {...} UnboundedIteration bindConstant(String name, DataStream<?> input) {...} ResultStreams apply() {...} } public interface UnboundedIterationBody { @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface IterationFunction {} @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationVariable { String value(); } @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationConstant { String value(); } } public class UnboundedIterationDeclarativeBuilder { public UnboundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) {...} public UnboundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) {...} UnboundedIterationDeclaration build() {...} } public class ResultStreams { public <T> DataStream<T> getStream(String name) {...} } // Example public class UnboundedIterationExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new UnboundedIteration() .withBody(new UnboundedIterationBody() { @IterationFunction UnboundedIterationDeclarative iterate( @IterationVariable("first") DataStream<Integer> first @IterationConstant("second") DataStream<> second, ) { DataStream<Integer> feedBack1 = ...; DataStream<String> feedBack2 = ...; DataStream<String> output1 = ...; return new UnboundedIterationDeclarationBuilder() .withFeedback("first", feedBack1) .withFeedback("second", feedBack2) .withOutput("output1", output1) .build(); } }) .bindVariable("first", source1) .bindConstant("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class BoundedIteration { BoundedIteration withBody(BoundedIterationBody body) {...} BoundedIteration bindVariable(String name, DataStream<?> input) {...} BoundedIteration bindConstant(String name, DataStream<?> input) {...} void enableAsynchronousIteration() {...} ResultStreams apply() {...} } public interface BoundedIterationBody { @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface IterationFunction { } @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationInput { String value(); } @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface IterationConstant { String value(); } } public class BoundedIterationDeclarationBuilder { public BoundedIterationDeclarationBuilder withFeedback(String name, DataStream<?> feedback) { ... } public BoundedIterationDeclarationBuilder withOutput(String name, DataStream<?> output) { ... } <U> BoundedIterationDeclarationBuilder until(TerminationCondition<U> terminationCondition) { ... } BoundedIterationDeclaration build() { ... } } public class TerminationCondition { TerminationCondition( @Nullable DataStream<?> convergenceCriteriaVariable, ConvergenceCriterion convergenceCriteria); } public interface ConvergenceCriterion { boolean isConverged(Context context); interface Context { int[] getRound(); <T> List<T> getVariableValues(); <T> List<T> getLastRoundVariabeValues(); } } public interface EachRound { Map<String, DataStream<?>> eachRound(); } public class BoundedIterationContext { ResultStreams forEachRound(EachRound eachRound); } // The Progress Tracking interface for UDF / Operator public interface BoundedIterationProgressListener<T> { default void onRoundStart(int[] round, Context context, Collector<T> collector){} void onRoundEnd(int[] round, Context context, Collector<T> collector); } // Example public class BoundedIterationExample { public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> { private int value = 0; void flatMap(Integer v1, Collector<Integer> v2) {value += v1} void onRoundEnd(int[] round, Context context, Collector<T> collector) { collector.collector(value); value = 0; } } public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new BoundedIteration() .withBody(new BoundedIterationBody() { @IterationFunction BoundedIterationDeclarative iterate( @IterationInput@IterationVariable("first") DataStream<Integer> first @IterationInput("second") DataStream<String> second, BoundedIterationContext context ) { DataStream<Integer> feedBack1 = ...; ResultStreams results = forEachRound(() -> { return Collections.singletonMap("result", feedback1.map(xx)); }); DataStream<String> feedBack2 = results.getStream("result"); DataStream<String> output1 = feedback1.flatMap(new MyReducer()); return new BoundedIterationDeclarationBuilder() .withFeedback("first", feedBack1) .withFeedback("second", feedBack2) .withOutput("output1", output1) .until(ConvergenceCriteria.fixRound(5)) .build(); } }) .bindInput("first", source1) .bindInput("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
...