This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Pending
Discussion thread: Archived
JIRAs:
- Loops and Nesting - https://issues.apache.org/jira/browse/FLINK-5089
- Job Termination - https://issues.apache.org/jira/browse/FLINK-2390
Released: Not Yet
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP addresses two, currently interconnected problems : The arbitrary structure of the existing iteration model and the inconsistency of job termination due to having iterations.
Loops
The current implementation of loops (or Iterations) on the streaming API of Flink does not allow for a deterministic termination of jobs and depends on user-defined timeouts to enforce termination. This impacts correctness, resulting to incomplete processing or/and inconsistent state when iterations exist in the topology. We propose a new functional, yet compositional API (i.e. nested loops) for expressing asynchronous DataStream loops with proper scoping and a distributed coordination algorithm for graph termination that exploits scoping information.
Termination
Currently, an iteration head and tail are two obscure, interconnected dataflow operators that utilize an in-memory blocking queue. A fixed timeout at the consumer side (head) terminates the head task and subsequently allows for a graceful, yet irregular, shut-down of the dataflow graph in topological operator order.
Problems with Existing Mechanism
An arbitrary loop timeout does not necessarily mean that all computation in the dataflow graph has finished. With the existing mechanism it is possible to end up terminating a job non-deterministically in a non-final state when computation is still pending. This can trivially lead to inconsistent processing and boils down to at-most-once guarantees when loops exist in the dataflow graph.(Note: Setting a very large value by default does not really solve the issue. Operators can take an arbitrary amount of time to execute, i.e. due to checkpointing very large state, communicating with another service etc).
It introduces an unnecessary lower latency cap to the termination process.
It adds execution details (i.e. timeouts) to the user-facing API. The user should not really worry about queue staleness timeouts, especially since this is something that the runtime can instead figure out and execute in speculative manner.
Public Interfaces
In the current Iteration API users can open and close iterations (i.e. add feedback edges) in an arbitrary manner. This means that users can nest iterations, yet close them outside outer iterations, since there is no notion of scopes. Apart from being a rather inconvenient programming paradigm, it makes things hard when we want to establish global properties, such as the state of an iterative computation and as a result of the whole dataflow graph, in a decentralized way (Termination and timeout elimination are addressed in part II of the FLIP).
In order to address iterations in a more consistent way that allows for proper scoping, we propose the following API, as a first step towards that direction.
The Loops API
A loop can be defined in terms of a LoopFunction which takes an input Data Stream and returns the feedback and the output Data Streams. Each loop has its own unique context. This API allows the system to create a separate context per loop and make operators and tasks aware of their current scope in the graph (i.e. their loop and outer loops in which they reside).
The LoopFunction which wraps the logic of an iteration, looks as follows in Java:
@Public
public interface LoopFunction<T, R> extends Function, Serializable {
Tuple2<DataStream<T>, DataStream<R>> loop(DataStream<T> value);
}
For loops with a different feedback type, we propose the CoLoopFunction that takes a connected stream of the input and the feedback streams instead.
@Public
public interface CoLoopFunction<T, F, R> extends Function, Serializable {
Tuple2<DataStream<F>, DataStream<R>> loop(ConnectedStreams<T, F> input);
}
The iterate operation of the DataStream will also support these two flavors of loops as follows:
public <R> DataStream<R> iterate(LoopFunction<T, R> loopFun)
public <F,R> DataStream<R> iterateWithFeedback
(CoLoopFunction<T, F, R> coLoopFun)
Loops can be trivially nested as shown below:
DataStream loop = map1.iterate(new LoopFunction<Integer, String>() {
@Override
public Tuple2<DataStream<Integer>, DataStream<Integer>>
loop(DataStream<Integer> input) {
input.map(...).iterate(new LoopFunction<Integer, Integer>() {
@Override
public Tuple2<DataStream<Integer>,DataStream<Integer>>
loop(DataStream<Integer> input2) {
DataStream<> tmp=input2.map(...).split(...);
return new Tuple2<>(tmp.select(...), tmp.select(..));
}
});
return new Tuple2<>(nestedLoop, nestedLoop);
}
});
API Restrictions
All binary operations can and should be restricted on data streams of the same StreamScope (see next section).Proposed Changes
Given the new Loops API we define two changes that build on it, namely Stream Scopes and a new Termination protocol.
I - Introduce StreamScope, an internal representation of the operator context in the graph
Given the proposed way of defining loops, the system can now infer scopes in a structured way. A StreamScope is an immutable data structure, specified per loop context and it basically contains the chain of all of its outer scopes. This is very powerful, not only for distributed termination but also for future additions in the systems such as progress tracking (see TimelyDataflow) for iterative computations and stream supersteps later on. These are the properties of StreamScope :
All operators that do not reside within a loop contain the default StreamScope.
The two main operations of a StreamScope are nest() and unnest()
Every invocation of the DataStream iterate operation generates a new StreamScope, nested on its outer scope.
The level of each StreamScope (getLevel()) is the number of its outer scopes. Fig.2 shows the assigned scope levels on each operator in an example graph.
StreamScopes can be compared topologically (isInnerOf(..), isOuterOf()).
Operators who reside in the exact same loop should have equal StreamScope
All outer scopes of a StreamScope can be retrieved
(e.g. scope.getContextForLevel(scope.getLevel()-2), or scope.unest().unest())
II - The Job Termination Protocol
For graphs with no loops, the termination happens in topological sequential order, from sources to sinks. Starting with the end-of-stream on all regular sources, each operator terminates in-order after all prior operators have already terminated. This is trivial when we have a DAG, but not likewise when loops exist in the pipeline. Computation might still be pending in the distributed dataflow graph, even when all regular sources are done. There is, however, a practical way to terminate any arbitrary dataflow graph with or without cycles. For a correct termination, the operators can determine in a decentralized manner that there is no pending data/computation in the pipeline. Here is a simple protocol that puts this logic into practice.
Technique Overview
The main idea of the decentralised graph termination algorithm is that when loops are scoped, we can trivially coordinate the termination decision recursively, starting from outermost loop of the graph down to the innermost and then back. The important thing is to get knowledge of the status of all operators within a scope and its inner scopes. Thus, we propose an abortable, distributed passing-the-baton approach to traverse the graph and collect all statuses of the operators in order to determine whether termination is feasible.
Tricky Part: Termination is staged between sections of the dataflow graph that contain at least one loop. To enforce staging, we use a form of alignment. We basically align the batons within and out of loops. The graph in Fig.1 shows a part of the execution of the algorithm from node ‘a’ to node ‘c’. Node ‘c’ will wait (align) to receive the event twice, via: a→b→c and a→b→T→H→b→c.
Fig.1 - Example of Alignment
In our implementation, the baton is represented as a TerminationAttempt event that contains the following properties:
The current StreamScope of the attempt
An attempt identifier (~epoch)
The scope level of the last visited operator (to decide on alignment)
The last completed scope level
The collective status of the operators visited in the current attempt
Fig.2 - A dataflow graph example that contains nested loops
In Fig. 2 we depict an example throughout all phases to use as a reference. The protocol works as follows:
Once all regular stream sources have reached the “end-of-stream” the system enters into a “Attempting” phase, upon which the following abortable protocol becomes active:
The regular stream sources broadcast a TerminationAttempt event to downstream operators
Three types of operators might receive this event
Type 1 : Head Operators
Type 2 : Operators with Feedback link, (Red nodes in Fig. 2)
Type 3 : Other operators(Grey nodes in Fig. 2)
Each of the operators handles the TerminationAttempt as follows:
Type 3: Add status (BUSY | IDLE) to the event and broadcast it.
Type 2: If the event is not associated with a StreamScope, then operator will associate its outermost non-terminated StreamScope to this event. Add operator status to the event and broadcast it.
Type 1: if the operator’s StreamScope is not the one in the event then it will just add its status to the event and forward it downstream. Otherwise, it will go into a local speculation mode on which protocol is activated :
The operator will enter an ATTEMPTING state, broadcast a TerminationAttempt downstream and wait to receive this event back via the feedback loop.
After receiving the TerminationAttempt back it does the following:
If the operator did not receive any data streams in between and the TerminationAttempt is collectively marked as IDLE, it will
Mark its StreamScope as complete in this event and flag this context as terminated
Terminate and forward this event, which will cause all operators to terminate up until reaching another Type 2 operator.
Otherwise, the Operator will repeat protocol from step 2.5.1 with an increased attempt id
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
You can find the prior proposal that was rejected here (Part I). That is more general but requires more RPC communication which can be preferably avoided.