You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current statePending

Discussion thread: Archived

JIRAs

Released: Not Yet

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current implementation of loops (or Iterations) on the streaming API of Flink does not allow for a deterministic termination of jobs and depends on user-defined timeouts to enforce termination. This impacts correctness, resulting to incomplete processing or/and inconsistent state when iterations exist in the topology. We propose a new functional, yet compositional API (i.e. nested loops) for expressing asynchronous DataStream loops with proper scoping and a distributed coordination algorithm for graph termination that exploits scoping information.

This is a proposal for a major fix in the DataStream execution and thus it should not be perceived simply as an improvement or a feature. It recommends changes to the PublicEvolving stream iteration API which means that these changes can already be incorporated in any upcoming Flink release if this proposal gets accepted by the community.


Public Interfaces

In the current Iteration API users can open and close iterations (i.e. add feedback edges) in an arbitrary manner. This means that users can nest iterations, yet close them outside outer iterations, since there is no notion of scopes. Apart from being a rather inconvenient programming paradigm, it makes things hard when we want to establish global properties, such as the state of an iterative computation and as a result of the whole dataflow graph, in a decentralized way (Termination and timeout elimination are addressed in part II of the FLIP).

In order to address iterations in a more consistent way that allows for proper scoping, we propose the following API, as a first step towards that direction.


 

The Loops API

 

A loop can be defined in terms of a LoopFunction which takes an input Data Stream and returns the feedback and the output Data Streams. Each loop has its own unique context. This API allows the system to create a separate context per loop and make operators and tasks aware of their current scope in the graph (i.e. their loop and outer loops in which they reside).

 

The LoopFunction which wraps the logic of an iteration, looks as follows in Java:

  

@Public
public interface LoopFunction<T, R> extends Function, Serializable {
   Tuple2<DataStream<T>, DataStream<R>> loop(DataStream<T> value);
}

 

For loops with a different feedback type, we propose the CoLoopFunction that takes a connected stream of the input and the feedback streams instead.

 

@Public
public interface CoLoopFunction<T, F, R> extends Function, Serializable {
   Tuple2<DataStream<F>, DataStream<R>> loop(ConnectedStreams<T, F> input);
}

  

The iterate operation of the DataStream will also support these two flavors of loops as follows:

  

public <R> DataStream<R> iterate(LoopFunction<T, R> loopFun)

 

public <F,R> DataStream<R> iterateWithFeedback

 

(CoLoopFunction<T, F, R> coLoopFun)

 

Loops can be trivially nested as shown below:

  

DataStream loop = map1.iterate(new LoopFunction<Integer, String>() {
 @Override
 public Tuple2<DataStream<Integer>, DataStream<Integer>>

 

  loop(DataStream<Integer> input) {
      input.map(...).iterate(new LoopFunction<Integer, Integer>() {
                   @Override
                   public Tuple2<DataStream<Integer>,DataStream<Integer>>           

 

                    loop(DataStream<Integer> input2) {
                        DataStream<> tmp=input2.map(...).split(...);
                        return new Tuple2<>(tmp.select(...), tmp.select(..));
                   }
               });
               return new Tuple2<>(nestedLoop, nestedLoop);
           }

...
});

 

API Restrictions

All binary operations can and should be restricted on data streams of the same StreamScope (see next section).

Proposed Changes

Introduce StreamScope

Given the proposed way of defining loops, the system can now infer scopes in a structured way. A StreamScope is an immutable data structure, specified per loop context and it basically contains the chain of all of its outer scopes. This is very powerful, not only for distributed termination but also for future additions in the systems such as progress tracking (see TimelyDataflow) for iterative computations and stream supersteps later on. These are the properties of StreamScope :

  • All operators that do not reside within a loop contain the default StreamScope.

  • The two main operations of a StreamScope are nest() and unnest()

  • Every invocation of the DataStream iterate operation generates a new StreamScope, nested on its outer scope.

  • The level of each StreamScope (getLevel()) is the number of its outer scopes. Fig.2 shows the assigned scope levels on each operator in an example graph.

  • StreamScopes can be compared topologically (isInnerOf(..), isOuterOf()).

  • Operators who reside in the exact same loop should have equal StreamScope

  • All outer scopes of a StreamScope can be retrieved

    • (e.g. scope.getContextForLevel(scope.getLevel()-2), or  scope.unest().unest())

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels