...
No more risk of data loss when using the KafkaSink under
EXACTLY_ONCE
modeA much more robust implementation that does not rely on Java reflection on the Kafka Java client. For Flink devs, this improves code maintainability for the KafkaSink, and overall makes it much easier to upgrade Kafka client version. For our users, this will indirectly resolve many productions issues users have been reporting with their Kafka-writing Flink jobs, such as long startup times and orphaned hanging transactions.
KIP-939 Public
...
This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE
mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable
state schema.
Minimum required Kafka Broker Version
The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-939 in order to use the new KafkaSink version.
KafkaCommittable
schema change
A checkpointed KafkaCommittable
would now become only:
class KafkaCommittable {
String transactionalId;
PreparedTxnState preparedTxnState;
}
as opposed to the previous schema which contains (producerId, producerEpoch, TID)
. We no longer need to persist producerId
and producerEpoch
because the Kafka’s new InitPidRequest
protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol.
Any user tooling that reads Flink checkpoints to inspect TIDs need to be updated to recognize the new schema.
KIP-939 Public Interface Summary
KIP-939 now adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer
Java client API as that is what Flink’s KafkaSink interfaces with.
...
transaction.two.phase.commit.enable
producer client property: this property has been added for the Producer
Java client, which should be set to true
(default is false
) to declare that the producer is participating in a distributed 2PC transaction. Effectively, Kafka will disable timeouts for all transactions issued by this producer.
...
New Producer#prepareTransaction()::PreparedTxnState method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction(), Producer#abortTransaction(), or Producer#completeTransaction(PreparedTxnState) can be called to finalize the transaction.
Interface Summary
KIP-939 adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer
Java client API as that is what Flink’s KafkaSink interfaces with.
transaction.two.phase.commit.enable
producer client property: this property has been added for theProducer
Java client, which should be set totrue
(default isfalse
) for all producers started by Flink's KafkaSink to declare that the producers are participating in a distributed 2PC transaction, coordinated by Flink. Effectively, this tells Kafka to never abort these producers' transactions by timeout, and enables the following transaction operations below.- New
Producer#initTransactions(boolean keepPreparedTxn)
method: alongside the existingProducer#initTransactions()
method, an additionalProducer#initTransactions(boolean)
overload variant has been added. Just like the former, the new method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former alwaysaborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that Flink may choose to either commit or abort it. New
Producer#prepareTransaction()::PreparedTxnState method
: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only eitherProducer#commitTransaction()
,Producer#abortTransaction()
, orProducer#completeTransaction(PreparedTxnState)
can be called to finalize the transaction.- New Producer#completeTransaction(PreparedTxnState state):
For (2), Kafka’s new Producer#prepareTransaction()
method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter
abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions()
protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.
Public Interface Changes
This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE
mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable
state schema.
Minimum required Kafka Broker Version
The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-939 in order to use the new KafkaSink version.
KafkaCommittable
schema change
A checkpointed KafkaCommittable
would now become only:
class KafkaCommittable {
String transactionalId;
PreparedTxnState preparedTxnState;
}
as opposed to the previous schema which contains (producerId, producerEpoch, TID)
. We no longer need to persist producerId
and producerEpoch
because the Kafka’s new InitPidRequest
protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol by checkpointing these values only to inject them into the Kafka client at restore time.
The new PreparedTxnState object encapsulates metadata returned by the Kafka client upon pre-committing / preparing the transaction.
Any user tooling that reads Flink checkpoints to, for example, inspect TIDs need to be updated to recognize the new schema
...
For (2), Kafka’s new Producer#prepareTransaction()
method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter
abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions()
protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.
Proposed Changes for Flink’s KafkaSink
...
Appendix: Current State of Kafka SinkKafkaSink
Data Loss Scenario
WIP
Using Java Reflections to Bypass InitPidRequest
on Restore
...