Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP addresses two, currently interconnected problems : The arbitrary structure of the existing iteration model and the inconsistency of the current job termination protocol due to having iterationspending iterative computation.

Loops

The current implementation of loops (or Iterations) on the streaming API of Flink does not allow for a deterministic termination of jobs and depends on user-defined timeouts to enforce termination. This impacts correctness, resulting to incomplete processing or/and inconsistent state when iterations exist in the topology. We propose a new functional, yet compositional API (i.e. nested loops) for expressing asynchronous DataStream loops with proper scoping and a distributed coordination algorithm for graph termination that exploits scoping information. 

...

  1. The regular stream sources broadcast a TerminationAttempt event to downstream operators

  2. Three types of operators might receive this event

    1. Type 1 : Head Operators

    2. Type 2 : Operators with Feedback link, (Red nodes in Fig. 2)

    3. Type 3 : Other operators(Grey nodes in Fig. 2)

  3. Each of the operators handles the TerminationAttempt as follows:

    1. Type 3: Add status (BUSY | IDLE)  to the event and broadcast it.

    2. Type 2: If the event is not associated with a StreamScope, then operator will associate its outermost non-terminated StreamScope to this event. Add operator status to the event and broadcast it.

    3. Type 1: if the operator’s StreamScope is not the one in the event then it will just add its status to the event and forward it downstream. Otherwise, it will go into a local speculation mode on which protocol  is activated :

  4. The operator will enter an ATTEMPTING state, broadcast a TerminationAttempt downstream and wait to receive this event back via the feedback loop.

  5. After receiving the TerminationAttempt
  6. Once it gets it back it

  7. does
  8. executes the following steps:

    1. If the operator did not receive any data streams in between and the TerminationAttempt

...

    1. is collectively

...

    1.  marked as IDLE, it will

      1. Mark its StreamScope

...

      1. as complete

...

      1.  in this event and flag this context as terminated

      2. Terminate and forward this event, which will cause all operators to terminate up until reaching another Type 2 operator.

    1. Otherwise, the Operator will repeat protocol from step

     2.5.1
    1. 4 with an increased attempt id

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

...

  • The Proposed changes in the PublicEvolving Iteration API are not backward compatible and require the following changes:
    • Users need to discard the Iteration Timeout in existing Flink applications
    • All logic within Iterations need to be wrapped within a LoopFunction or CoLoopFunction
    • Binary Operations are now dissallowed across different loop contexts
    • Chaining is now disabled across operators of different StreamScope (context)
  • The old (currently deprecated) API can still be accessible until a major future release (e.g. Flink v2.0). This is up to discussion.
  • Changes will affect third project integrations of Flink such as the Storm and Samoa compatibility libraries which allow arbitrary loops in the graph and do not require nesting.

Test Plan

All proposed changes need to be tested thoroughly. Thus, the following tests are proposed

  • Add unit tests for API restrictions on binary graph operations across contexts (i.e. union, cogroup, join, connect etc)
  • Remove old test functionality that depends on timeouts for loop termination
  • Add unit test to validate the correct properties of StreamScopes with a unit test (e.g. unique contexts, scope levels and nesting)
  • Test termination with a complex integration test which contains multiple nested levels and different iterations one after the other.

Rejected Alternatives

You can find the prior proposal that was rejected here (Part I). That is more general but requires more RPC communication which can be preferably avoided.