THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class UnboundedIteration { UnboundedIteration withBody(IterationBody body) {...} UnboundedIteration bindVariable(String name, DataStream<?> input) {...} UnboundedIteration bindConstant(String name, DataStream<?> input) {...} ResultStreams apply() {...} } public class UnboundedIterationDeclarativeBuilderUnboundedIterationDeclaration { public static class Builder { public UnboundedIterationDeclarationBuilderBuilder withFeedback(String name, DataStream<?> feedback) {...} public UnboundedIterationDeclarationBuilderBuilder withOutput(String name, DataStream<?> output) {...} UnboundedIterationDeclaration build() {...} } } public class ResultStreams { public <T> DataStream<T> getStream(String name) {...} } // Example public class UnboundedIterationExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new UnboundedIteration() .withBody(new IterationBody() { @IterationFunction UnboundedIterationDeclarative iterate( @IterationVariable("first") DataStream<Integer> first @IterationConstant("second") DataStream<> second, ) { DataStream<Integer> feedBack1 = ...; DataStream<String> feedBack2 = ...; DataStream<String> output1 = ...; return new UnboundedIterationDeclarationBuilder() .withFeedback("first", feedBack1) .withFeedback("second", feedBack2) .withOutput("output1", output1) .build(); } }) .bindVariable("first", source1) .bindConstant("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class BoundedIteration { BoundedIteration withBody(BoundedIterationBody body) {...} BoundedIteration bindVariable(String name, DataStream<?> input) {...} BoundedIteration bindConstant(String name, DataStream<?> input) {...} ResultStreams apply() {...} } public class BoundedIterationDeclarationBuilderBoundedIterationDeclaration { public static class Builder { public BoundedIterationDeclarationBuilderBuilder withFeedback(String name, DataStream<?> feedback) { ... } public BoundedIterationDeclarationBuilderBuilder withOutput(String name, DataStream<?> output) { ... } <U> BoundedIterationDeclarationBuilderBuilder until(TerminationCondition<U>TerminationCondition terminationCondition) { ... } BoundedIterationDeclaration build() { ... } } } public class TerminationCondition { @Nullable TerminationCondition(DataStream<?> refStream; ConvergenceCriterion convergenceCriterion; } public interface ConvergenceCriterion { boolean isConverged(Context context); interface Context { int[] getRound(); <T> List<T> getVariableValues(); <T> List<T> getLastRoundVariabeValues(); } } public interface EachRound { Map<String, DataStream<?>> eachRoundexecuteInEachRound(); } public class BoundedIterationContext { ResultStreams forEachRound(EachRound eachRound); } // The Progress Tracking interface for UDF / Operator public interface BoundedIterationProgressListener<T> { default void onRoundStart(int[] round, Context context, Collector<T> collector){} void onRoundEnd(int[] round, Context context, Collector<T> collector); } // Example public class BoundedIterationExample { public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> { private int value = 0; void flatMap(Integer v1, Collector<Integer> v2) {value += v1} void onRoundEnd(int[] round, Context context, Collector<T> collector) { collector.collector(value); value = 0; } } public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new BoundedIteration() .withBody(new IterationBody() { @IterationFunction BoundedIterationDeclarativeBoundedIterationDeclaration iterate( @IterationVariable("first") DataStream<Integer> first, @IterationConstant("second") DataStream<String> second, BoundedIterationContext context ) { DataStream<Integer> feedBack1 = ...; ResultStreams results = forEachRound(() -> { return Collections.singletonMap("result", feedback1.map(xx)); }); DataStream<String> feedBack2 = results.getStream("result"); DataStream<String> output1 = feedback1.flatMap(new MyReducer()); return new BoundedIterationDeclarationBuilder() .withFeedback("first", feedBack1) .withFeedbackwithOutput("secondoutput1", feedBack2output1) .withOutput("output1", output1) .until(ConvergenceCriteria.fixRound(5until(new TerminationCondition(feedback2, context -> context.getVariableValues().size() == 0)) .build(); } }) .bindInput("first", source1) .bindInput("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
...