...
Table of Contents
Table of Contents
Motivation
It has been a long-standing issue that Flink’s Kafka Sink implementation for exactly-once delivery guarantees comes with two critical pitfalls (please see the Appendix for an illustrated detail recap of why these issues exist):
...
No more risk of data loss when using the KafkaSink under
EXACTLY_ONCE
mode, andA more robust implementation that does not rely on Java reflection on the Kafka Java client
Public Interfaces
This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE
mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable
state schema.
Minimum required Kafka Broker Version
The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-YYY in order to use the new KafkaSink version.
KafkaCommittable
schema change
A checkpointed KafkaCommittable
would now become only:
...
Any user tooling that reads Flink checkpoints to inspect TIDs need to be updated to recognize the new schema.
KIP-939 Public Interface Summary
KIP-939 now adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer
Java client API as that is what Flink’s KafkaSink interfaces with.
...
For (2), Kafka’s new Producer#prepareTransaction()
method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter
abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions()
protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.
Proposed Changes for Flink’s KafkaSink
Here we’ll go through the interaction changes between Flink’s KafkaSink and Kafka throughout the following lifecycles of the 2PC integration: 1) pre-commit phase (i.e. on checkpoint barrier), 2) commit phase (i.e. on checkpoint complete RPC notification), and 3) restore phase (i.e. on failure and restoring from successful Flink checkpoint).
As you’ll see, none of the main 2PC interaction flow has been altered and we also do not require any changes to the TwoPhaseCommittingSink
abstraction. The actual changes are quite minimal as we're mainly just calling 2 new methods on Kafka's producer Java client, but for the sake of completeness, this FLIP will briefly illustrate the full 2PC integration while highlighting the actual changes.
Pre-Commit Phase (on checkpoint)
On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:
...
Note that in step 3, we only need to snapshot the TID of transactions in Flink checkpoints, as opposed to before where the TID, internal producer ID, and epoch (latter two obtained via Java reflection on the producer instance) needs to be snapshotted. This is because on restore, we no longer need to inject the internal producer ID and epoch on resumed producer instances to bypass the initTransactions()
method. More on this in the On Restore section.
Failure Points
If the
flush()
orprepareTransaction()
call in step 1 fails, then we strictly fail the job as this is a synchronization point failure in 2PC.Failing to write the snapshot of the buffer in step 3 to Flink’s managed state backend is also a synchronization point failure and will fail the job.
For step 4, if a new producer instance cannot be obtained for the next checkpoint’s transaction (either the initialization failed, or all possible TIDs have been depleted which is a possibility with FLIP-ZZZ: TID Pooling for KafkaSink), we choose to fail the job by design. Strictly speaking, this is just implementation detail and not a synchronization point failure for 2PC; it is possible to delay / retry obtaining the new producer instance.
Commit Phase (on checkpoint complete notification)
The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM notify them of the completion) remains the same without any changes required.
When a KafkaSink subtask is notified that checkpoint N
was successfully completed, all buffered prepared producers up to checkpoint N
(recall that the buffer is ordered by checkpoint ID) will be retrieved and committed. Afterwards, the prepared producers may be removed from the buffer and released (depending on the implementation, this can mean either putting it back into a producer instance pool for reuse by future checkpoints if we reuse TIDs, or just closed).
Failure Points
If
commitTransaction()
call fails for any transaction, the KafkaSink retries the commit later. After attempting up to a maximum number of retries, only then does the Flink job fail.
Restore Phase (on operator initialization)
On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.
...
The main notable change here is in case (1), where for each restored TID the KafkaSink needs to create a new producer instance to resume the transaction and commit it. As opposed to before where we had to create this producer instance and use Java reflection to inject the internal producer ID and epoch in order to bypass the initTransactions()
call, we can now simply initialize the producer using initTransactions(true)
to retain the previous prepared transaction.
Compatibility, Deprecation, and Migration Plan
Upgrading to new KafkaSink
To upgrade, Flink jobs using older versions of the KafkaSink will need to do the following:
...
It will be strongly recommended to upgrade the KafkaSink version as soon as possible, since it inherently poses a risk of data loss.
Relaxing TWO_PHASE_COMMIT
ACL requirement for smoother upgrade path
So far, when authentication is enabled for the Kafka cluster, KIP-YYY assumes that the TWO_PHASE_COMMIT
ACL is setup in order for authenticated producer clients to set transaction.two.phase.commit.true
to true
as well as use Producer#initTransactions(true)
to resume previous transactions. In other words, to use the new KafkaSink, the TWO_PHASE_COMMIT
ACL must be setup as mentioned in step 2 in the section above. KIP-939 gates 2PC participation behind a new ACL because it is arguably a dangerous feature when used incorrectly (e.g. when used without a properly functioning external transaction coordinator, or even the lack of one).
...
If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-YYY relax the ACL constraint such that the Producer#initTransactions(true)
operation only needs WRITE
ACL to work and not the extra new TWO_PHASE_COMMIT
ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.
Rejected Alternatives
Supporting two versions of Kafka’s InitPidRequest
protocol for live migration
WIP
Appendix: Current State of Kafka Sink
Data Loss Scenario
WIP
Using Java Reflections to Bypass InitPidRequest
on Restore
WIP