Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Loops API

A loop can be defined in terms of a LoopFunction which takes an input Data Stream and returns the feedback and the output Data Streams. Each loop has its own unique context. This API allows the system to create a separate context per loop and make operators and tasks aware of their current scope in the graph (i.e. their loop and outer loops in which they reside).

 

The LoopFunction which wraps the logic of an iteration, looks as follows in Java:

  

@Public
public interface LoopFunction<T, R> extends Function, Serializable {
   Tuple2<DataStream<T>, DataStream<R>> loop(DataStream<T> value);
}

 

For loops with a different feedback type, we propose the CoLoopFunction that takes a connected stream of the input and the feedback streams instead.

 

@Public
public interface CoLoopFunction<T, F, R> extends Function, Serializable {
   Tuple2<DataStream<F>, DataStream<R>> loop(ConnectedStreams<T, F> input);
}

  

The iterate operation of the DataStream will also support these two flavors of loops as follows:

  

public <R> DataStream<R> iterate(LoopFunction<T, R> loopFun)

 

public <F,R> DataStream<R> iterateWithFeedback

 

(CoLoopFunction<T, F, R> coLoopFun)

 

Loops can be trivially nested as shown below:

  

DataStream loop = map1.iterate(new LoopFunction<Integer, String>() {
 @Override
 public Tuple2<DataStream<Integer>, DataStream<Integer>>

 

  loop(DataStream<Integer> input) {
      input.map(...).iterate(new LoopFunction<Integer, Integer>() {
                   @Override
                   public Tuple2<DataStream<Integer>,DataStream<Integer>>           

 

                    loop(DataStream<Integer> input2) {
                        DataStream<> tmp=input2.map(...).split(...);
                        return new Tuple2<>(tmp.select(...), tmp.select(..));
                   }
               });
               return new Tuple2<>(nestedLoop, nestedLoop);
           }

...