Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Pre-Commit Phase (on checkpoint)

Image Modified

On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:

...

Commit Phase (on checkpoint complete notification)

Image Modified

The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM to notify them of the completion) remains the same without any changes required.

...

Restore Phase (on operator initialization)

Image Modified

On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution logic ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.

...

  1. Upon receiving the checkpoint barrier, KafkaSink subtasks successfully flushes to their current transaction, and acknowledges the JM of completion.
  2. After the JM receives acknowledgement from all sink subtasks, it commits the current checkpoint as completed (in other words, voting phase of 2PC succeeds and is recorded by the transaction manager).
  3. To trigger following commit phase of 2PC, the JM notifies all sink subtasks of checkpoint completion.
  4. KafkaSink subtasks receive the notification, but by the time it tries to commit the pre-committed transaction, Kafka had already aborted the transaction due to exceeding transaction.timeout.ms.
  5. Since the decision that the global distribution transaction has succeeded was already committed by Flink, it could only attempt to restart from the completed checkpoint and try to re-commit the transactions, only to repeatedly fail to do so.

Image Modified

Generally speaking, as long as there is enough delay for sink subtasks to receive checkpoint complete notifications to trigger the second phase of 2PC, the scenario described above can occur. This can happen simply due to a misfortunately configured combo of Flink's checkpoint interval / delay and Kafka transaction timeout, or if the Flink job happens to fail right after a checkpoint completes and remains down for an extended amount of time.

...