Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Cyclic Flow Control (Deadlock and Starvation-Free)

 

Currently, Flink's backpressure mechanism might have deadlocks. The heart of the problem is that there are finite resources (i.e., number of network buffers) along the conceptual cyclic backpressure graph. 

...

  • All operators that do not reside within a loop contain the default StreamScope.

  • The two main operations of a StreamScope are nest() and unnest()

  • Every invocation of the DataStream iterate operation generates a new StreamScope, nested on its outer scope.

  • The level of each StreamScope (getLevel()) is the number of its outer scopes. Fig.2 shows the assigned scope levels on each operator in an example graph.

  • StreamScopes can be compared topologically (isInnerOf(..), isOuterOf()).

  • Operators who reside in the exact same loop should have equal StreamScope

  • All outer scopes of a StreamScope can be retrieved

    • (e.g. scope.getContextForLevel(scope.getLevel()-2), or  scope.unest().unest())

 

II - Cyclic Flow Control and Physical Implementation of Feedback Channels

Optimally, we want to achieve a simple approach to stream cycles that is performant and satisfies deadlock-free and starvation-free execution.

I) Simple design: Currently Iteration Head and Tail are artificial operators that introduce complexity to the graph building and scheduling process. We should therefore replace them with a special feedback channel that a stream entry operator can differentiate. Another big advantage of having a special channel is that the fault tolerance logic for cycles (upstream logging) can be embedded there.

II) Deadlock-free execution: For deadlock-free execution we simply need to make one of the cyclic resources conceptually 'infinite' and thus break the back-pressure cycle. The special feedback-edge can spill to disk when local network buffers are utilised in order to compensate with any accumulated feedback traffic to enable progress.

III) Starvation-free execution: Starvation can happen when a flow control mechanism prioritises fully a specific flow, thus, living another data flow behind leading to unsustainable execution skew.  Obviously, having an extra, specialised feedback channel allows for custom flow control. There are two alternative approaches under discussion regarding input prioritisation within loops which are summarised below:

  1. feedback-first: 
    +it is simple and most predictable 
    +discourages spilling
    +lowers latency per iterative operation
    - lowers throughput -> stages loops and discourages pipelining
    - restricts loop logic to staged execution (waiting until all cyclic operations are done and output is flushed out of the loop)
    - it can create some form of temporary starvation when the feedback queue is constantly non-empty. This can 
  2. feedback-first when feedback queue buffers are utilised:
    + discourages spilling
    + lower end-to-end latency and higher throughput
    +/- it limits the possibility of temp starvation only when feedback queue is excessively more
    - it can increase latency per-loop operation due to multiplexing
    - less predictable and more complex conceptually

III - The Job Termination Protocol

For graphs with no loops, the termination happens in topological sequential order, from sources to sinks. Starting with the end-of-stream on all regular sources, each operator terminates in-order after all prior operators have already terminated. This is trivial when we have a DAG, but not likewise when loops exist in the pipeline. Computation might still be pending in the distributed dataflow graph, even when all regular sources are done. There is, however, a practical way to terminate any arbitrary dataflow graph with or without cycles. For a correct termination, the operators can determine in a decentralized manner that there is no pending data/computation in the pipeline. Here is a simple protocol that puts this logic into practice.

...