THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class BoundedIteration { BoundedIteration withBody(BoundedIterationBody body) {...} BoundedIteration bindVariable(String name, DataStream<?> input) {...} BoundedIteration bindConstant(String name, DataStream<?> input) {...} ResultStreams apply() {...} } public class BoundedIterationDeclaration { public static class Builder { public Builder withFeedback(String name, DataStream<?> feedback) {...} public Builder withOutput(String name, DataStream<?> output) {...} <U> Builder until(TerminationCondition terminationCondition) { ... } BoundedIterationDeclaration build() {...} } } public class TerminationCondition { @Nullable DataStream<?> refStream; ConvergenceCriterion convergenceCriterion; } public interface ConvergenceCriterion { boolean isConverged(Context context); interface Context { int[] getRound(); <T> List<T> getStreamRecords(); } } public interface EachRound { Map<String, DataStream<?>> executeInEachRound(); } public class BoundedIterationContext { ResultStreams forEachRound(EachRound eachRound); } // The Progress Tracking interface for UDF / Operator public interface BoundedIterationProgressListener<T> { default void onRoundStart(int[] round, Context context, Collector<T> collector){} void onRoundEnd(int[] round, Context context, Collector<T> collector); } // Example public class BoundedIterationExample { public static void MyReducer implements FlatMapFunction<Integer, Integer>, BoundedIterationProgressListener<Integer> { private int value = 0; void flatMap(Integer v1, Collector<Integer> v2) {value += v1} void onRoundEnd(int[] round, Context context, Collector<T> collector) { collector.collector(value); value = 0; } } public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5); DataStream<String> source2 = env.fromElements("2", "3", "4"); ResultStreams resultStreams = new BoundedIteration() .withBody(new IterationBody() { @IterationFunction BoundedIterationDeclaration iterate( @IterationVariable("first") DataStream<Integer> first, @IterationConstant("second") DataStream<String> second, BoundedIterationContext context ) { DataStream<Integer> feedBack1 = ...; ResultStreams results = forEachRound(() -> { return Collections.singletonMap("result", feedback1.map(xx)); }); DataStream<String> feedBack2 = results.getStream("result"); DataStream<String> output1 = feedback1.flatMap(new MyReducer()); return new BoundedIterationDeclarationBuilder() .withFeedback("first", feedBack1) .withOutput("output1", output1) .until(new TerminationCondition(feedback2, context -> context.getStreamRecords().size() == 0)) .build(); } }) .bindInputbindVariable("first", source1) .bindInputbindConstant("second", source2) .apply(); DataStream<String> output = resultStreams.getStream("output1"); } } |
...