Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Pre-Commit Phase (on checkpoint)

Image Modified

On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:

...

  • If the flush() or prepareTransaction() call in step 1 fails, then we strictly fail the job as this is a synchronization point failure in 2PC.

  • Failing to write the snapshot of the buffer in step 3 to Flink’s managed state backend is also a synchronization point failure and will fail the job.

  • For step 4, if a new producer instance cannot be obtained for the next checkpoint’s transaction (either the initialization failed, or all possible TIDs have been depleted which is a possibility with FLIP-ZZZ: TID Pooling for KafkaSink), we choose to fail the job by design. Strictly speaking, this is just implementation detail and not a synchronization point failure for 2PC; theoretically, it is possible to delay / retry obtaining the new producer instance.

Commit Phase (on checkpoint complete notification)

Image Modified

The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM to notify them of the completion) remains the same without any changes required.

...

Restore Phase (on operator initialization)

Image Modified

On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution logic ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.

All dangling open transactions from previous execution attempts can be categorized as the following:

  1. TIDs within PrecommittedRange: these TIDs have been successfully pre-committed, and therefore are always persisted within completed Flink checkpoints. When the Flink job fails and restores from a checkpoint, all TIDs read from the checkpoint are within PrecommittedRange and should be committed.

  2. TIDs outside of PrecommittedRange: these TIDs were NOT successfully pre-committed, and are NOT written in the restored Flink checkpoint. All dangling TIDs in this range need to be aborted. To obtain the TIDs, the KafkaSink has to either 1) query Kafka to list all TIDs that are possibly within this range, or 2) iterate through all possible TIDs that can be in this range.

...

Appendix: Current State of KafkaSink

Data Loss Scenario

Data loss under exactly-once mode occurs for Flink's KafkaSink under the following scenario:

  1. Upon receiving the checkpoint barrier, KafkaSink subtasks successfully flushes to their current transaction, and acknowledges the JM of completion.
  2. After the JM receives acknowledgement from all sink subtasks, it commits the current checkpoint as completed (in other words, voting phase of 2PC succeeds and is recorded by the transaction manager).
  3. To trigger following commit phase of 2PC, the JM notifies all sink subtasks of checkpoint completion.
  4. KafkaSink subtasks receive the notification, but by the time it tries to commit the pre-committed transaction, Kafka had already aborted the transaction due to exceeding transaction.timeout.ms.
  5. Since the decision that the global distribution transaction has succeeded was already committed by Flink, it could only attempt to restart from the completed checkpoint and try to re-commit the transactions, only to repeatedly fail to do so.


Image Added

Generally speaking, as long as there is enough delay for sink subtasks to receive checkpoint complete notifications to trigger the second phase of 2PC, the scenario described above can occur. This can happen simply due to a misfortunately configured combo of Flink's checkpoint interval / delay and Kafka transaction timeout, or if the Flink job happens to fail right after a checkpoint completes and remains down for an extended amount of time.WIP

Using Java Reflections to Bypass InitPidRequest on Restore

...