You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadpending
Vote threadpending
JIRApending
Releasepending


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents

Motivation

It has been a long-standing issue that Flink’s Kafka Sink implementation for exactly-once delivery guarantees comes with two critical pitfalls (please see the Appendix for an illustrated detail recap of why these issues exist):

  1. Data loss can occur when Kafka aborts a successfully checkpointed transaction due to timeout. Users were recommended to mitigate this by setting a substantially large transactions.timeout.ms Kafka setting, but ultimately there’s always the chance of data loss if it takes long enough to restore the job.

  2. When a KafkaSink is restored from a checkpoint, checkpointed transactions need to be resumed and committed. Here, to create a producer instance that resumes the previous transaction, the KafkaSink implementation relies heavily on Java reflection in order to bypass the Kafka transaction protocol, which by definition aborts any previous ongoing transactions when a producer session is recovered. The use of Java reflection makes upgrading Kafka clients hard and highly error-prone.

These issues exist because Kafka’s transaction protocol was originally designed with the simple “read-process-write“ loop in mind, where each transaction is made by a single consumer-producer process atomically reading from and writing to Kafka, and is expected to be short-lived only for the duration of this loop. Therefore, any client downtime would result in the transaction being aborted by Kafka after the client is recovered so that the new producer session can start with a clean state. The same goes for any transactions that are in-doubt for too long, e.g. if the producer process is lost forever, Kafka aborts the transaction on timeout to prevent it from permanently blocking downstream consumers.

These protocol behaviors directly prevent Kafka producers from being safely usable as a participant in an externally-coordinated 2PC transaction. As a 2PC participant, a Kafka producer needs to be able to guarantee that once a transaction is pre-committed, it is permanently durable and can stay in-doubt as long as it takes for the external coordinator to issue a commit phase that decides the transaction fate. In Flink terms, once a KafkaSink subtask flushes its writes to the ongoing transaction and acknowledges the corresponding Flink checkpoint (the pre-commit phase), the transaction should remain in-doubt until all KafkaSink subtasks also acknowledge the checkpoint and then JM (the external transaction coordinator) notifies all subtasks to commit their transactions (the commit phase).

Kafka is attempting to address this with KIP-939: Support Participation in 2PC. In short, with KIP-939 it is now possible to declare that a transactional Kafka producer is participating in an externally-coordinated 2PC transaction. Effectively, this means Kafka acknowledges that an external transaction coordinator exists and has have full control in deciding transaction fate; Kafka will no longer proactively abort transactions from the 2PC-participating producer based on timeout, and will also allow resuming previous transactions so that the external coordinator has the final say in whether to commit or abort it. For a more detailed summary of KIP-939, please see KIP-939 Public Interface Summary.

To conclude, having a new KafkaSink that works with KIP-939 for proper 2PC integration brings the following benefits for our users:

  • No more risk of data loss when using the KafkaSink under EXACTLY_ONCE mode, and

  • A more robust implementation that does not rely on Java reflection on the Kafka Java client

Public Interfaces

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

Minimum required Kafka Broker Version

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-YYY in order to use the new KafkaSink version.

KafkaCommittable schema change

A checkpointed KafkaCommittable would now become only:

class KafkaCommittable {
    private final String transactionalId;
}

as opposed to the previous schema which contains (producerId, producerEpoch, TID). We no longer need to persist producerId and producerEpoch because the Kafka’s new InitPidRequest protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol.

Any user tooling that reads Flink checkpoints to inspect TIDs need to be updated to recognize the new schema.

KIP-939 Public Interface Summary

KIP-939 now adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer Java client API as that is what Flink’s KafkaSink interfaces with.

  1. transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) to declare that the producer is participating in a distributed 2PC transaction. Effectively, Kafka will disable timeouts for all transactions issued by this producer.

  2. New Producer#prepareTransaction() method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction() or Producer#abortTransaction() can be called to finalize the transaction.

  3. New Producer#initTransactions(boolean keepPreparedTxn) method: alongside the existing Producer#initTransactions() method, an additional Producer#initTransactions(boolean) overload variant has been added. Just like former , the new method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former always aborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that the external transaction manager may choose to either commit or abort it.

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

Proposed Changes for Flink’s KafkaSink

Here we’ll go through the interaction changes between Flink’s KafkaSink and Kafka throughout the following lifecycles of the 2PC integration: 1) pre-commit phase (i.e. on checkpoint barrier), 2) commit phase (i.e. on checkpoint complete RPC notification), and 3) restore phase (i.e. on failure and restoring from successful Flink checkpoint).

As you’ll see, none of the main 2PC interaction flow has been altered and we also do not require any changes to the TwoPhaseCommittingSink abstraction. The actual changes are quite minimal as we're mainly just calling 2 new methods on Kafka's producer Java client, but for the sake of completeness, this FLIP will briefly illustrate the full 2PC integration while highlighting the actual changes.

Pre-Commit Phase (on checkpoint)

On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:

  1. Current producer is flushed and the current transaction is prepared using the new Producer#prepareTransaction() method.

  2. The producer instance holding the prepared transaction, as well as the TID of the prepared transaction, is collected in a buffer ordered by the checkpoint ID associated with the pre-commit. The buffer may contain other prepared producers/TIDs of earlier pre-committed transactions (i.e. of previous checkpoints) that have not been committed yet.

  3. The buffer is snapshotted to obtain all pre-committed TIDs that are awaiting to be committed. This snapshot is written to Flink managed state backend for checkpoint persistence.

  4. Finally, the subtask obtains a new producer instance (with a different TID) for the next checkpoint’s transaction.

The above steps are done atomically w.r.t. the checkpoint barrier and don’t necessarily need to be done in this exact order (except from step 1 and 2).

Note that in step 3, we only need to snapshot the TID of transactions in Flink checkpoints, as opposed to before where the TID, internal producer ID, and epoch (latter two obtained via Java reflection on the producer instance) needs to be snapshotted. This is because on restore, we no longer need to inject the internal producer ID and epoch on resumed producer instances to bypass the initTransactions() method. More on this in the On Restore section.

Failure Points

  • If the flush() or prepareTransaction() call in step 1 fails, then we strictly fail the job as this is a synchronization point failure in 2PC.

  • Failing to write the snapshot of the buffer in step 3 to Flink’s managed state backend is also a synchronization point failure and will fail the job.

  • For step 4, if a new producer instance cannot be obtained for the next checkpoint’s transaction (either the initialization failed, or all possible TIDs have been depleted which is a possibility with FLIP-ZZZ: TID Pooling for KafkaSink), we choose to fail the job by design. Strictly speaking, this is just implementation detail and not a synchronization point failure for 2PC; it is possible to delay / retry obtaining the new producer instance.

Commit Phase (on checkpoint complete notification)

The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM notify them of the completion) remains the same without any changes required.

When a KafkaSink subtask is notified that checkpoint N was successfully completed, all buffered prepared producers up to checkpoint N (recall that the buffer is ordered by checkpoint ID) will be retrieved and committed. Afterwards, the prepared producers may be removed from the buffer and released (depending on the implementation, this can mean either putting it back into a producer instance pool for reuse by future checkpoints if we reuse TIDs, or just closed).

Failure Points

  • If commitTransaction() call fails for any transaction, the KafkaSink retries the commit later. After attempting up to a maximum number of retries, only then does the Flink job fail.

Restore Phase (on operator initialization)

On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.

All dangling transactions from previous execution attempts can be categorized as the following:

  1. TIDs within PrecommittedRange: these TIDs have been successfully pre-committed, and therefore are always persisted within completed Flink checkpoints. When the Flink job fails and restores from a checkpoint, all TIDs read from the checkpoint are within PrecommittedRange and should be committed.

  2. TIDs outside of PrecommittedRange: these TIDs were NOT successfully pre-committed, and are NOT written in the restored Flink checkpoint. All dangling TIDs in this range need to be aborted. To obtain the TIDs, the KafkaSink has to either 1) query Kafka to list all TIDs that are possibly within this range, or 2) iterate through all possible TIDs that can be in this range.

Determining the PrecommittedRange depends on the specific algorithm the KafkaSink uses to construct TIDs. For example, as of v3.0.0 of the KafkaSink, TIDs are constructed as {userPrefix}-{subtaskId}-{checkpointId} and therefore always strictly increasing as the job runs. This means that for a given restored checkpoint ID N, the PrecommittedRange of TIDs to commit is simply all TIDs with checkpointId portion up to N, while all other TIDs with {checkpointId} portion being N+1 up to should be aborted. For other TID construction algorithms, e.g. FLIP-ZZZ: TID Pooling for KafkaSink, calculating the PrecommittedRange would be different. For the purpose of this FLIP, the specific TID construction algorithm is orthogonal implementation detail; it is sufficient to assume that there is a deterministic PrecommittedRange of possible TIDs that need to be committed, while all TIDs outside of that range should be aborted.

The main notable change here is in case (1), where for each restored TID the KafkaSink needs to create a new producer instance to resume the transaction and commit it. As opposed to before where we had to create this producer instance and use Java reflection to inject the internal producer ID and epoch in order to bypass the initTransactions() call, we can now simply initialize the producer using initTransactions(true) to retain the previous prepared transaction.

Compatibility, Deprecation, and Migration Plan

Upgrading to new KafkaSink

To upgrade, Flink jobs using older versions of the KafkaSink will need to do the following:

  1. Upgrade their Kafka cluster version to a minimum version that supports KIP-YYY.

  2. If authentication is enabled on the Kafka cluster, make sure that it is configured so that respective users have the TWO_PHASE_COMMIT ACL permissions set on the TransactionalId resource.

  3. Take a savepoint of their Flink job, and then stop it.

  4. Upgrade their job application code to use the new KafkaSink version. No code changes are required from the user; they simply need to upgrade the flink-connector-kafka dependency and recompile the job jar.

  5. Submit the upgraded job jar, configured to restore from the savepoint taken in step 3.

Note that if step 5 is done while the Kafka cluster upgrade is still in the progress of being rolled out, the job will fail whenever a transaction request reaches a Kafka broker that has not been rolled yet. It is recommended to only upgrade the Flink job once the Kafka cluster upgrade has been fully completed.

It will be strongly recommended to upgrade the KafkaSink version as soon as possible, since it inherently poses a risk of data loss.

Relaxing TWO_PHASE_COMMIT ACL requirement for smoother upgrade path

So far, when authentication is enabled for the Kafka cluster, KIP-YYY assumes that the TWO_PHASE_COMMIT ACL is setup in order for authenticated producer clients to set transaction.two.phase.commit.true to true as well as use Producer#initTransactions(true) to resume previous transactions. In other words, to use the new KafkaSink, the TWO_PHASE_COMMIT ACL must be setup as mentioned in step 2 in the section above. KIP-939 gates 2PC participation behind a new ACL because it is arguably a dangerous feature when used incorrectly (e.g. when used without a properly functioning external transaction coordinator, or even the lack of one).

However, it is entirely possible that a Flink user may not have access to the Kafka cluster admin to setup the TWO_PHASE_COMMIT ACL. For example, a Flink user may be using a cloud service provider for their Kafka deployment, with which ACL setup may be out of their control. In this case, users are hard-blocked in upgrading to the new KafkaSink.

If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-YYY relax the ACL constraint such that the Producer#initTransactions(true) operation only needs WRITE ACL to work and not the extra new TWO_PHASE_COMMIT ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.

Rejected Alternatives

Supporting two versions of Kafka’s InitPidRequest protocol for live migration

WIP

Appendix: Current State of Kafka Sink

Data Loss Scenario

WIP

Using Java Reflections to Bypass InitPidRequest on Restore

WIP

  • No labels