Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For the time being, we will allow for a situation that if an operator checkpoints UnionListState, it can only finish all at once. We will decline checkpoints if not all of the tasks called finished and received notifyCheckpointComplete .

ListState

We want to make the contract of ListState more explicit that the redistribution may happen even in case there is no rescaling. This might have some sophisticated implications. Imagine a situation where you have a topology:

src 0 --> op 0 --> sink 0

src 1 --> op 1 --> sink 1

src 2 --> op 2 --> sink 2


We buffer records in operators op X. If src 1 finishes its state will be cleared. Than if after a restore the state of src 2 is assigned to src 1.  Records from partitions originally assigned to src 2 will end up in both op1 and op2. Depending on the processing speed of the two operators, if the op 1 unbuffers records faster it may happen that later records from such partition will overtake earlier records in the pipeline.  We want to make it clear though we do not support such a scenario and the redistribution may happen regardless if we are rescaling or not.

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

...