Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • No more risk of data loss when using the KafkaSink under EXACTLY_ONCE mode

  • A much more robust implementation that does not rely on Java reflection on the Kafka Java client. For Flink devs, this improves code maintainability for the KafkaSink, and overall makes it much easier to upgrade Kafka client version. For our users, this will indirectly resolve many productions issues users have been reporting with their Kafka-writing Flink jobs, such as long startup times and orphaned hanging transactions.

KIP-939 Public

...

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

Minimum required Kafka Broker Version

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-939 in order to use the new KafkaSink version.

KafkaCommittable schema change

A checkpointed KafkaCommittable would now become only:

class KafkaCommittable {
    String transactionalId;
PreparedTxnState preparedTxnState; }

as opposed to the previous schema which contains (producerId, producerEpoch, TID). We no longer need to persist producerId and producerEpoch because the Kafka’s new InitPidRequest protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol.

Any user tooling that reads Flink checkpoints to inspect TIDs need to be updated to recognize the new schema.

KIP-939 Public Interface Summary

KIP-939 now adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer Java client API as that is what Flink’s KafkaSink interfaces with.

...

transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) to declare that the producer is participating in a distributed 2PC transaction. Effectively, Kafka will disable timeouts for all transactions issued by this producer.

...

New Producer#prepareTransaction()::PreparedTxnState method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction(), Producer#abortTransaction(), or Producer#completeTransaction(PreparedTxnState) can be called to finalize the transaction.

Interface Summary

KIP-939 adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer Java client API as that is what Flink’s KafkaSink interfaces with.

  1. transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) for all producers started by Flink's KafkaSink to declare that the producers are participating in a distributed 2PC transaction, coordinated by Flink. Effectively, this tells Kafka to never abort these producers' transactions by timeout, and enables the following transaction operations below.

  2. New Producer#initTransactions(boolean keepPreparedTxn) method: alongside the existing Producer#initTransactions() method, an additional Producer#initTransactions(boolean) overload variant has been added. Just like the former, the new method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former alwaysaborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that Flink may choose to either commit or abort it.
  3. New Producer#prepareTransaction()::PreparedTxnState method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction(), Producer#abortTransaction(), or Producer#completeTransaction(PreparedTxnState) can be called to finalize the transaction.

  4. New Producer#completeTransaction(PreparedTxnState state): 

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

Public Interface Changes

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

Minimum required Kafka Broker Version

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-939 in order to use the new KafkaSink version.

KafkaCommittable schema change

A checkpointed KafkaCommittable would now become only:

class KafkaCommittable {
    String transactionalId;
PreparedTxnState preparedTxnState; }

as opposed to the previous schema which contains (producerId, producerEpoch, TID). We no longer need to persist producerId and producerEpoch because the Kafka’s new InitPidRequest protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol by checkpointing these values only to inject them into the Kafka client at restore time.

The new PreparedTxnState object encapsulates metadata returned by the Kafka client upon pre-committing / preparing the transaction.

Any user tooling that reads Flink checkpoints to, for example, inspect TIDs need to be updated to recognize the new schema

...

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

Proposed Changes for Flink’s KafkaSink

...

Appendix: Current State of Kafka SinkKafkaSink

Data Loss Scenario

WIP

Using Java Reflections to Bypass InitPidRequest on Restore

...