Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadpending
Vote threadpending
JIRApending
Releasepending



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Table of Contents

Table of Contents

Motivation

It has been a long-standing issue that Flink’s Kafka Sink implementation for exactly-once delivery guarantees comes with two critical pitfalls (please see the Appendix for a detail recap of why these issues exist):

...

  • No more risk of data loss when using the KafkaSink under EXACTLY_ONCE mode

  • A much more robust implementation that does not rely on Java reflection on the Kafka Java client. For Flink devs, this improves code maintainability for the KafkaSink, and overall makes it much easier to upgrade Kafka client version. For our users, this will indirectly resolve many productions issues users have been reporting with their Kafka-writing Flink jobs, such as long startup times and orphaned hanging transactions.

KIP-939 Public

...

Interface Summary

KIP-939 adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer Java client API as that is what Flink’s KafkaSink interfaces with.

  1. transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) for all producers started by Flink's KafkaSink to declare that the producers are participating in a distributed 2PC transaction, coordinated by Flink. Effectively, this tells Kafka to never abort these producers' transactions by timeout, and enables the following transaction operations below.

  2. New Producer#initTransactions(boolean keepPreparedTxn) method: alongside the existing Producer#initTransactions() method, an additional Producer#initTransactions(boolean) overload variant has been added. Just like the former, the new method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former alwaysaborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that Flink may choose to either commit or abort it.
  3. New Producer#prepareTransaction()::PreparedTxnState

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

Minimum required Kafka Broker Version

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-939 in order to use the new KafkaSink version.

KafkaCommittable schema change

A checkpointed KafkaCommittable would now become only:

class KafkaCommittable {
    String transactionalId;
PreparedTxnState preparedTxnState; }

as opposed to the previous schema which contains (producerId, producerEpoch, TID). We no longer need to persist producerId and producerEpoch because the Kafka’s new InitPidRequest protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol.

Any user tooling that reads Flink checkpoints to inspect TIDs need to be updated to recognize the new schema.

KIP-939 Public Interface Summary

KIP-939 now adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer Java client API as that is what Flink’s KafkaSink interfaces with.

  1. transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) to declare that the producer is participating in a distributed 2PC transaction. Effectively, Kafka will disable timeouts for all transactions issued by this producer.

  2. New Producer#prepareTransaction() method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction(), Producer#abortTransaction(), or Producer#abortTransactionProducer#completeTransaction(PreparedTxnState) can be called to finalize the transaction.

  3. New Producer#initTransactionsProducer#completeTransaction(boolean keepPreparedTxn)method: alongside the existing Producer#initTransactions() method, an additional Producer#initTransactions(boolean) overload variant has been added. Just like former , the new method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former alwaysaborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that the external transaction manager may choose to either commit or abort it.

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

Proposed Changes for Flink’s KafkaSink

Here we’ll go through the interaction changes between Flink’s KafkaSink and Kafka throughout the following lifecycles of the 2PC integration: 1) pre-commit phase (i.e. on checkpoint barrier), 2) commit phase (i.e. on checkpoint complete RPC notification), and 3) restore phase (i.e. on failure and restoring from successful Flink checkpoint).

As you’ll see, none of the main 2PC interaction flow has been altered and we also do not require any changes to the TwoPhaseCommittingSink abstraction. The actual changes are quite minimal as we're mainly just calling 2 new methods on Kafka's producer Java client, but for the sake of completeness, this FLIP will briefly illustrate the full 2PC integration while highlighting the actual changes.

Pre-Commit Phase (on checkpoint)

Image Removed

On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:

  1. Current producer is flushed and the current transaction is prepared using the new Producer#prepareTransaction() method.

  2. The producer instance holding the prepared transaction, as well as the TID of the prepared transaction, is collected in a buffer ordered by the checkpoint ID associated with the pre-commit. The buffer may contain other prepared producers/TIDs of earlier pre-committed transactions (i.e. of previous checkpoints) that have not been committed yet.

  3. The buffer is snapshotted to obtain all pre-committed TIDs that are awaiting to be committed. This snapshot is written to Flink managed state backend for checkpoint persistence.

  4. Finally, the subtask obtains a new producer instance (with a different TID) for the next checkpoint’s transaction.

The above steps are done atomically w.r.t. the checkpoint barrier and don’t necessarily need to be done in this exact order (except from step 1 and 2).

Note that in step 3, we only need to snapshot the TID of transactions in Flink checkpoints, as opposed to before where the TID, internal producer ID, and epoch (latter two obtained via Java reflection on the producer instance) needs to be snapshotted. This is because on restore, we no longer need to inject the internal producer ID and epoch on resumed producer instances to bypass the initTransactions() method. More on this in the On Restore section.

Failure Points

  • If the flush() or prepareTransaction() call in step 1 fails, then we strictly fail the job as this is a synchronization point failure in 2PC.

  • Failing to write the snapshot of the buffer in step 3 to Flink’s managed state backend is also a synchronization point failure and will fail the job.

  • For step 4, if a new producer instance cannot be obtained for the next checkpoint’s transaction (either the initialization failed, or all possible TIDs have been depleted which is a possibility with FLIP-ZZZ: TID Pooling for KafkaSink), we choose to fail the job by design. Strictly speaking, this is just implementation detail and not a synchronization point failure for 2PC; it is possible to delay / retry obtaining the new producer instance.

Commit Phase (on checkpoint complete notification)

Image Removed

The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM notify them of the completion) remains the same without any changes required.

When a KafkaSink subtask is notified that checkpoint N was successfully completed, all buffered prepared producers up to checkpoint N (recall that the buffer is ordered by checkpoint ID) will be retrieved and committed. Afterwards, the prepared producers may be removed from the buffer and released (depending on the implementation, this can mean either putting it back into a producer instance pool for reuse by future checkpoints if we reuse TIDs, or just closed).

Failure Points

  • If commitTransaction() call fails for any transaction, the KafkaSink retries the commit later. After attempting up to a maximum number of retries, only then does the Flink job fail.

Restore Phase (on operator initialization)

Image Removed

On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.

All dangling transactions from previous execution attempts can be categorized as the following:

  1. TIDs within PrecommittedRange: these TIDs have been successfully pre-committed, and therefore are always persisted within completed Flink checkpoints. When the Flink job fails and restores from a checkpoint, all TIDs read from the checkpoint are within PrecommittedRange and should be committed.

  2. TIDs outside of PrecommittedRange: these TIDs were NOT successfully pre-committed, and are NOT written in the restored Flink checkpoint. All dangling TIDs in this range need to be aborted. To obtain the TIDs, the KafkaSink has to either 1) query Kafka to list all TIDs that are possibly within this range, or 2) iterate through all possible TIDs that can be in this range.

Determining the PrecommittedRange depends on the specific algorithm the KafkaSink uses to construct TIDs. For example, as of v3.0.0 of the KafkaSink, TIDs are constructed as {userPrefix}-{subtaskId}-{checkpointId} and therefore always strictly increasing as the job runs. This means that for a given restored checkpoint ID N, the PrecommittedRange of TIDs to commit is simply all TIDs with checkpointId portion up to N, while all other TIDs with {checkpointId} portion being N+1 up to should be aborted. For other TID construction algorithms, e.g. FLIP-ZZZ: TID Pooling for KafkaSink, calculating the PrecommittedRange would be different. For the purpose of this FLIP, the specific TID construction algorithm is orthogonal implementation detail; it is sufficient to assume that there is a deterministic PrecommittedRange of possible TIDs that need to be committed, while all TIDs outside of that range should be aborted.

The main notable change here is in case (1), where for each restored TID the KafkaSink needs to create a new producer instance to resume the transaction and commit it. As opposed to before where we had to create this producer instance and use Java reflection to inject the internal producer ID and epoch in order to bypass the initTransactions() call, we can now simply initialize the producer using initTransactions(true) to retain the previous prepared transaction.

Compatibility, Deprecation, and Migration Plan

Upgrading to new KafkaSink

To upgrade, Flink jobs using older versions of the KafkaSink will need to do the following:

  1. Upgrade their Kafka cluster version to a minimum version that supports KIP-YYY.

  2. If authentication is enabled on the Kafka cluster, make sure that it is configured so that respective users have the TWO_PHASE_COMMIT ACL permissions set on the TransactionalId resource.

  3. Take a savepoint of their Flink job, and then stop it.

  4. Upgrade their job application code to use the new KafkaSink version. No code changes are required from the user; they simply need to upgrade the flink-connector-kafka dependency and recompile the job jar.

  5. Submit the upgraded job jar, configured to restore from the savepoint taken in step 3.

Note that if step 5 is done while the Kafka cluster upgrade is still in the progress of being rolled out, the job will fail whenever a transaction request reaches a Kafka broker that has not been rolled yet. It is recommended to only upgrade the Flink job once the Kafka cluster upgrade has been fully completed.

It will be strongly recommended to upgrade the KafkaSink version as soon as possible, since it inherently poses a risk of data loss.

Relaxing TWO_PHASE_COMMIT ACL requirement for smoother upgrade path

So far, when authentication is enabled for the Kafka cluster, KIP-YYY assumes that the TWO_PHASE_COMMIT ACL is setup in order for authenticated producer clients to set transaction.two.phase.commit.true to true as well as use Producer#initTransactions(true) to resume previous transactions. In other words, to use the new KafkaSink, the TWO_PHASE_COMMIT ACL must be setup as mentioned in step 2 in the section above. KIP-939 gates 2PC participation behind a new ACL because it is arguably a dangerous feature when used incorrectly (e.g. when used without a properly functioning external transaction coordinator, or even the lack of one).

However, it is entirely possible that a Flink user may not have access to the Kafka cluster admin to setup the TWO_PHASE_COMMIT ACL. For example, a Flink user may be using a cloud service provider for their Kafka deployment, with which ACL setup may be out of their control. In this case, users are hard-blocked in upgrading to the new KafkaSink.

If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-YYY relax the ACL constraint such that the Producer#initTransactions(true) operation only needs WRITE ACL to work and not the extra new TWO_PHASE_COMMIT ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.

Rejected Alternatives

Supporting two versions of Kafka’s InitPidRequest protocol for live migration

WIP

Appendix: Current State of Kafka Sink

Data Loss Scenario

WIP

Using Java Reflections to Bypass InitPidRequest on Restore

  1. PreparedTxnState state)

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

Public Interface Changes

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

Minimum required Kafka Broker Version

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-939 in order to use the new KafkaSink version.

KafkaCommittable schema change

A checkpointed KafkaCommittable would now become only:

class KafkaCommittable {
    String transactionalId;
PreparedTxnState preparedTxnState; }

as opposed to the previous schema which contains (producerId, producerEpoch, TID). We no longer need to persist producerId and producerEpoch because the Kafka’s new InitPidRequest protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol by checkpointing these values only to inject them into the Kafka client at restore time.

The new PreparedTxnState object encapsulates metadata returned by the Kafka client upon pre-committing / preparing the transaction.

Any user tooling that reads Flink checkpoints to, for example, inspect TIDs need to be updated to recognize the new schema.

Proposed Changes for Flink’s KafkaSink

Here we’ll go through the interaction changes between Flink’s KafkaSink and Kafka throughout the following lifecycles of the 2PC integration: 1) pre-commit phase (i.e. on checkpoint barrier), 2) commit phase (i.e. on checkpoint complete RPC notification), and 3) restore phase (i.e. on failure and restoring from successful Flink checkpoint).

As you’ll see, none of the main 2PC interaction flow has been altered and we also do not require any changes to the TwoPhaseCommittingSink abstraction. The actual changes are quite minimal as we're mainly just calling 2 new methods on Kafka's producer Java client, but for the sake of completeness, this FLIP will briefly illustrate the full 2PC integration while highlighting the actual changes.

Pre-Commit Phase (on checkpoint)

Image Added

On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:

  1. Current producer is flushed and the current transaction is prepared using the new Producer#prepareTransaction() method.

  2. The producer instance holding the prepared transaction, as well as the TID of the prepared transaction, is collected in a buffer ordered by the checkpoint ID associated with the pre-commit. The buffer may contain other prepared producers/TIDs of earlier pre-committed transactions (i.e. of previous checkpoints) that have not been committed yet.

  3. The buffer is snapshotted to obtain all pre-committed TIDs that are awaiting to be committed. This snapshot is written to Flink managed state backend for checkpoint persistence.

  4. Finally, the subtask obtains a new producer instance (with a different TID) for the next checkpoint’s transaction.

The above steps are done atomically w.r.t. the checkpoint barrier and don’t necessarily need to be done in this exact order (except from step 1 and 2).

Note that in step 3, we only need to snapshot the TID of transactions in Flink checkpoints, as opposed to before where the TID, internal producer ID, and epoch (latter two obtained via Java reflection on the producer instance) needs to be snapshotted. This is because on restore, we no longer need to inject the internal producer ID and epoch on resumed producer instances to bypass the initTransactions() method. More on this in the On Restore section.

Failure Points

  • If the flush() or prepareTransaction() call in step 1 fails, then we strictly fail the job as this is a synchronization point failure in 2PC.

  • Failing to write the snapshot of the buffer in step 3 to Flink’s managed state backend is also a synchronization point failure and will fail the job.

  • For step 4, if a new producer instance cannot be obtained for the next checkpoint’s transaction (either the initialization failed, or all possible TIDs have been depleted which is a possibility with FLIP-ZZZ: TID Pooling for KafkaSink), we choose to fail the job by design. Strictly speaking, this is just implementation detail and not a synchronization point failure for 2PC; theoretically, it is possible to delay / retry obtaining the new producer instance.

Commit Phase (on checkpoint complete notification)

Image Added

The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM to notify them of the completion) remains the same without any changes required.

When a KafkaSink subtask is notified that checkpoint N was successfully completed, all buffered prepared producers up to checkpoint N (recall that the buffer is ordered by checkpoint ID) will be retrieved and committed. Afterwards, the prepared producers may be removed from the buffer and released (depending on the implementation, this can mean either putting it back into a producer instance pool for reuse by future checkpoints if we reuse TIDs, or just closed).

Failure Points

  • If commitTransaction() call fails for any transaction, the KafkaSink retries the commit later. After attempting up to a maximum number of retries, only then does the Flink job fail.

Restore Phase (on operator initialization)

Image Added

On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution logic ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.

All open transactions from previous execution attempts can be categorized as the following:

  1. TIDs within PrecommittedRange: these TIDs have been successfully pre-committed, and therefore are always persisted within completed Flink checkpoints. When the Flink job fails and restores from a checkpoint, all TIDs read from the checkpoint are within PrecommittedRange and should be committed.

  2. TIDs outside of PrecommittedRange: these TIDs were NOT successfully pre-committed, and are NOT written in the restored Flink checkpoint. All TIDs in this range need to be aborted. To obtain the TIDs, the KafkaSink has to either 1) query Kafka to list all TIDs that are possibly within this range, or 2) iterate through all possible TIDs that can be in this range.

Determining the PrecommittedRange depends on the specific algorithm the KafkaSink uses to construct TIDs. For example, as of v3.0.0 of the KafkaSink, TIDs are constructed as {userPrefix}-{subtaskId}-{checkpointId} and therefore always strictly increasing as the job runs. This means that for a given restored checkpoint ID N, the PrecommittedRange of TIDs to commit is simply all TIDs with checkpointId portion up to N, while all other TIDs with {checkpointId} portion being N+1 up to should be aborted. For other TID construction algorithms, e.g. FLIP-ZZZ: TID Pooling for KafkaSink, calculating the PrecommittedRange would be different. For the purpose of this FLIP, the specific TID construction algorithm is orthogonal implementation detail; it is sufficient to assume that there is a deterministic PrecommittedRange of possible TIDs that need to be committed, while all TIDs outside of that range should be aborted.

The main notable change here is in case (1), where for each restored TID the KafkaSink needs to create a new producer instance to resume the transaction and commit it. As opposed to before where we had to create this producer instance and use Java reflection to inject the internal producer ID and epoch in order to bypass the initTransactions() call, we can now simply initialize the producer using initTransactions(true) to retain the previous prepared transaction.

Compatibility, Deprecation, and Migration Plan

Upgrading to new KafkaSink

To upgrade, Flink jobs using older versions of the KafkaSink will need to do the following:

  1. Upgrade their Kafka cluster version to a minimum version that supports KIP-939.

  2. If authentication is enabled on the Kafka cluster, make sure that it is configured so that respective users have the TWO_PHASE_COMMIT ACL permissions set on the TransactionalId resource.

  3. Take a savepoint of their Flink job, and then stop it.

  4. Upgrade their job application code to use the new KafkaSink version. No code changes are required from the user; they simply need to upgrade the flink-connector-kafka dependency and recompile the job jar.

  5. Submit the upgraded job jar, configured to restore from the savepoint taken in step 3.

Note that if step 5 is done while the Kafka cluster upgrade is still in the progress of being rolled out, the job will fail whenever a transaction request reaches a Kafka broker that has not been rolled yet. It is recommended to only upgrade the Flink job once the Kafka cluster upgrade has been fully completed.

It will be strongly recommended to upgrade the KafkaSink version as soon as possible, since it inherently poses a risk of data loss.

Relaxing TWO_PHASE_COMMIT ACL requirement for smoother upgrade path

So far, when authentication is enabled for the Kafka cluster, KIP-939 assumes that the TWO_PHASE_COMMIT ACL is setup in order for authenticated producer clients to set transaction.two.phase.commit.true to true as well as use Producer#initTransactions(true) to resume previous transactions. In other words, to use the new KafkaSink, the TWO_PHASE_COMMIT ACL must be setup as mentioned in step 2 in the section above. KIP-939 gates 2PC participation behind a new ACL because it is arguably a dangerous feature when used incorrectly (e.g. when used without a properly functioning external transaction coordinator, or even the lack of one).

However, it is entirely possible that a Flink user may not have access to the Kafka cluster admin to setup the TWO_PHASE_COMMIT ACL. For example, a Flink user may be using a cloud service provider for their Kafka deployment, with which ACL setup may be out of their control. In this case, users are hard-blocked in upgrading to the new KafkaSink.

If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-939 relax the ACL constraint such that the Producer#initTransactions(true) operation only needs WRITE ACL to work and not the extra new TWO_PHASE_COMMIT ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.

Appendix: Known Critical Issues with current KafkaSink

Data Loss Scenario

Data loss under exactly-once mode occurs for Flink's KafkaSink under the following scenario:

  1. Upon receiving the checkpoint barrier, KafkaSink subtasks successfully flushes to their current transaction, and acknowledges the JM of completion.
  2. After the JM receives acknowledgement from all sink subtasks, it commits the current checkpoint as completed (in other words, voting phase of 2PC succeeds and is recorded by the transaction manager).
  3. To trigger following commit phase of 2PC, the JM notifies all sink subtasks of checkpoint completion.
  4. KafkaSink subtasks receive the notification, but by the time it tries to commit the pre-committed transaction, Kafka had already aborted the transaction due to exceeding transaction.timeout.ms.
  5. Since the decision that the global distribution transaction has succeeded was already committed by Flink, it could only attempt to restart from the completed checkpoint and try to re-commit the transactions, only to repeatedly fail to do so.

Image Added

Generally speaking, as long as there is enough delay for sink subtasks to receive checkpoint complete notifications to trigger the second phase of 2PC, the scenario described above can occur. This can happen simply due to a misfortunately configured combo of Flink's checkpoint interval / delay and Kafka transaction timeout, or if the Flink job happens to fail right after a checkpoint completes and remains down for an extended amount of time.

Using Java Reflections to Bypass InitProducerId request on Restore

At any point in time, a Flink job may fail for any reason, causing it restart from the last completed Flink checkpoint. Within the Flink checkpoints, the KafkaSink stores TIDs (Kafka's transactional.id) of Kafka transactions that were pre-commited as part of the checkpoint. On restart, these TIDs are restored and their previous ongoing transactions should be (possibly redundantly) committed.

The main issue is that Kafka, by definition of its current transaction protocol, does not allow a recovered producer client instance to resume its previous transaction. By protocol, all newly created or recovered producer clients are expected to issue an InitProducerId request to initialize the producers (e.g. obtain internal producer id and epoch). When brokers receive this initialization request, it would check if the producer instance had a previous on-going transaction, and if so, abort it.

To be able to resume pre-commited transactions, Flink's KafkaSink avoids having to issue InitProducerId  after recovering producer clients by 1) extracting the internal producer id and epoch, and storing them alongside TIDs in Flink checkpoints, 2) on restart, these values are restored and injected in the client, and 3) alter the client state so that it is considered to be mid-transaction. All of this is done via Java reflection as these are not publicly accessible.

The code that implements all of this can be found in FlinkKafkaInternalProducer : https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java.WIP