Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-2491

...

Release1.14



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators that does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 

Operator state

There are a few scenarios in which operator states are used in combination with an additional implicit contract on data distribution. Those implicit contracts might not hold in case we restore state with partially finished operators. Therefore we would like to discuss how we want to use the operator state in combination with finished tasks.

BroadcastState

In case of broadcast state all operators snapshot their state. The assumption is that every such state is equal and can be used when restoring for any given subtask. We want to leverage that property and when restoring with some subtasks finished, we would use any of the non-empty state (taken for any running subtask).

UnionListState

The UnioinListState is more complex. A common usage pattern is to implement a "broadcast" state by storing state on a single subtask.  Afterwards, when initializing subtasks the single copy of the state would be distributed to all subtasks. Another common pattern is to use a UnionListState to create a global view. It is used for example to share information about offsets of partitions which has been consumed so far. This lets us restart from the given offset if after restore a partition is assigned to a different subtask than originally. Such a logic can only be implemented with a merged state of all original subtasks If a part of subtasks are finished and we only keep the remaining state in the checkpoint. This can obviously loose important bit of information, or even the entire state in case of described implementation of broadcast state. However, since the UnionListState is on the way to be deprecated and replaced by the OperatorCoordinator.

For the time being, we will allow for a situation that if an operator checkpoints UnionListState, it can only finish all at once. We will decline checkpoints if not all of the tasks called finished and received notifyCheckpointComplete .

ListState

We want to make the contract of ListState more explicit that the redistribution may happen even in case there is no rescaling. This might have some sophisticated implications. Imagine a situation where you have a topology:

src 0 --> op 0 --> sink 0

src 1 --> op 1 --> sink 1

src 2 --> op 2 --> sink 2


We buffer records in operators op X. If src 1 finishes its state will be cleared. Than if after a restore the state of src 2 is assigned to src 1.  Records from partitions originally assigned to src 2 will end up in both op1 and op2. Depending on the processing speed of the two operators, if the op 1 unbuffers records faster it may happen that later records from such partition will overtake earlier records in the pipeline.  However, Flink never offered explicit guarantees that in case of recovery, non-keyed ListState will be assigned in the same order to subtasks before and after recovery, especially across multiple operators. As part of this FLIP, we want to document and clarify this.

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

...

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event, namely EndOfUserRecordsEvent EndOfData, after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

...

The detail life cycle for the source stream tasks and the operators would become 

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---

No more records

or

Received Savepoint Trigger

---

-finish task (cancel source thread for legacy source and suspend the mailbox for the new source)finish task (cancel source thread for legacy source and suspend the mailbox for the new source)

Advanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-

WAITING_FOR_FINAL_CP



FINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit
EndOfUserRecordsEvent
EndOfData[finished = true]Emit
EndOfUserRecordsEvent
EndOfData[finished = true]Emit
EndOfUserRecordsEvent
EndOfData[finished = false]

when checkpoint triggered, emit Checkpoint BarrierEmit Checkpoint BarrierEmit Checkpoint Barrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed
Checkpoint Completed

Wait for downstream tasks
acknowledge EndOfUserRecordsEvent
acknowledge EndOfDataWait for downstream tasks
acknowledge EndOfUserRecordsEvent
acknowledge EndOfDataWait for downstream tasks
acknowledge EndOfUserRecordsEventDownstream Tasks acknowledge EndOfUserRecordsEvent
acknowledge EndOfData
Checkpoint Completed && EndOfData acknowledgedCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Similarly, the status for the non-source tasks would become

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---


---

---
Aligned on MAX_WATERMARKAdvanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timersN/A (MAX_WATERMARK is not emitted in this case)

Emit MAX_WATERMARKEmit MAX_WATERMARKN/A
Aligned On EndOfUserRecordsEventWAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit
EndOfUserRecordsEvent
EndOfData[finished = true]Emit
EndOfUserRecordsEvent
EndOfData[finished = true]Emit
EndOfUserRecordsEvent
EndOfData[finished = false]
Aligned on Checkpoint BarrierEmit CheckpointBarrierEmit CheckpointBarrierEmit CheckpointBarrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint
CompletedCheckpoint
Completed

Wait for downstream tasks
acknowledge EndOfUserRecordsEvent
acknowledge EndOfDataWait for downstream tasks
acknowledge EndOfUserRecordsEvent
acknowledge EndOfDataWait for downstream tasks
acknowledge EndOfUserRecordsEventDownstream Tasks acknowledge EndOfUserRecordsEventCLOSEDCLOSED
acknowledge EndOfData

Wait for EndOfPartitionEventWait for EndOfPartitionEventWait for EndOfPartitionEvent
Wait for EndOfPartitionEvent
Checkpoint completed/EndOfData acknowledged/EndOfPartition receivedCLOSEDCLOSED



Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Info
We need to wait for a checkpoint to complete, that started after the finish() method. However, we support concurrent checkpoints. Moreover there is no guarantee the notifyCheckpointComplete arrives or the order in which they will arrive. It should be enough though to wait for notification for any checkpoint, that started after the finish() method.
We shouldHowever, makewe suresupport thoughconcurrent later checkpoints. doMoreover notthere leaveis behindno lingering resources.
Imagine a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)
Our proposal is to shutdown the task immediately after seeing first `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER `finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 should be empty
After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created, depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if they were opened, depends on the user code)
If checkpoint 44 completes afterwards, it will still be valid. Ideally we would recommend that after seeing `finish()` operators/functions should not be opening any new transactions, but that shouldn't be required.

Skip Waiting for the Final Checkpoint If possible

For the final checkpoint case, there is one possibility that users in fact do not use the 2pc, thus for the source finished case the following task do not need to wait for the final checkpoint. As an optimization, to allow users specify such logic, we could enhance the StreamOperator and CheckpointListener interface with the following changes:

Code Block
languagejava
interface StreamOperator {
    default boolean requiresFinalCheckpoint() {
        return true;
    }
}

interface CheckpointListener {

    default boolean requiresFinalCheckpoint() {
        return true;
    }
}

class AbstractUdfStreamOperator {
    
    @Override
    boolean requiresFinalCheckpoint() {
        return userFunction instanceof CheckpointListener &&
            ((CheckpointListener) userFunction).requiresFinalCheckpoint();
    }
}guarantee the notifyCheckpointComplete arrives or the order in which they will arrive. It should be enough though to wait for notification for any checkpoint that started after finish().

We should make sure though later checkpoints do not leave behind lingering resources.

Imagine a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)


Our proposal is to shutdown the task immediately after seeing first `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER `finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 should be empty

After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created, depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if they were opened, depends on the user code)

If checkpoint 44 completes afterwards, it will still be valid. Ideally we would recommend that after seeing `finish()` operators/functions should not be opening any new transactions, but that shouldn't be required.


Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...