Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


We buffer records in operators op X. If src 1 finishes its state will be cleared. Than if after a restore the state of src 2 is assigned to src 1.  Records from partitions originally assigned to src 2 will end up in both op1 and op2. Depending on the processing speed of the two operators, if the op 1 unbuffers records faster it may happen that later records from such partition will overtake earlier records in the pipeline.  We want to make it clear though we do not support such a scenario and the redistribution may happen regardless if we are rescaling or not However, Flink never offered explicit guarantees that in case of recovery, non-keyed ListState will be assigned in the same order to subtasks before and after recovery, especially across multiple operators. As part of this FLIP, we want to document and clarify this.

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

...