Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) for all producers started by Flink's KafkaSink to declare that the producers are participating in a distributed 2PC transaction, coordinated by Flink. Effectively, this tells Kafka to never abort these producers' transactions by timeout, and enables the following transaction operations below.

  2. New Producer#initTransactions(boolean keepPreparedTxn) method: alongside the existing Producer#initTransactions() method, an additional Producer#initTransactions(boolean) overload variant has been added. Just like the former, the new method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former alwaysaborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that Flink may choose to either commit or abort it.
  3. New Producer#prepareTransaction()::PreparedTxnState method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction(), Producer#abortTransaction(), or Producer#completeTransaction(PreparedTxnState) can be called to finalize the transaction.

  4. New Producer#completeTransaction(PreparedTxnState state)

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

...

If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-939 relax the ACL constraint such that the Producer#initTransactions(true) operation only needs WRITE ACL to work and not the extra new TWO_PHASE_COMMIT ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.

Rejected Alternatives

Supporting two versions of Kafka’s InitPidRequest protocol for live migration

WIP

Appendix: Known Critical Issues with current KafkaSink

...