Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) to declare that the producer is participating in a distributed 2PC transaction. Effectively, Kafka will disable timeouts for all transactions issued by this producer.

  2. New Producer#prepareTransaction()method::PreparedTxnState method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction(), Producer#abortTransaction(), or Producer#abortTransactionProducer#completeTransaction(PreparedTxnState) can be called to finalize the transaction.

  3. New Producer#initTransactions(boolean keepPreparedTxn) method: alongside the existing Producer#initTransactions() method, an additional Producer#initTransactions(boolean) overload variant has been added. Just like former, the new method should method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former alwaysaborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that the external transaction manager may choose to either commit or abort it.

...

  1. Upgrade their Kafka cluster version to a minimum version that supports KIP-YYY939.

  2. If authentication is enabled on the Kafka cluster, make sure that it is configured so that respective users have the TWO_PHASE_COMMIT ACL permissions set on the TransactionalId resource.

  3. Take a savepoint of their Flink job, and then stop it.

  4. Upgrade their job application code to use the new KafkaSink version. No code changes are required from the user; they simply need to upgrade the flink-connector-kafka dependency and recompile the job jar.

  5. Submit the upgraded job jar, configured to restore from the savepoint taken in step 3.

...

So far, when authentication is enabled for the Kafka cluster, KIP-YYY 939 assumes that the TWO_PHASE_COMMIT ACL is setup in order for authenticated producer clients to set transaction.two.phase.commit.true to true as well as use Producer#initTransactions(true) to resume previous transactions. In other words, to use the new KafkaSink, the TWO_PHASE_COMMIT ACL must be setup as mentioned in step 2 in the section above. KIP-939 gates 2PC participation behind a new ACL because it is arguably a dangerous feature when used incorrectly (e.g. when used without a properly functioning external transaction coordinator, or even the lack of one).

...

If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-YYY 939 relax the ACL constraint such that the Producer#initTransactions(true) operation only needs WRITE ACL to work and not the extra new TWO_PHASE_COMMIT ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.

...