Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Table of Contents

Motivation

It has been a long-standing issue that Flink’s Kafka Sink implementation for exactly-once delivery guarantees comes with two critical pitfalls (please see the Appendix for an illustrated detail recap of why these issues exist):

...

  • No more risk of data loss when using the KafkaSink under EXACTLY_ONCE mode, and

  • A more robust implementation that does not rely on Java reflection on the Kafka Java client

Public Interfaces

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

Minimum required Kafka Broker Version

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-YYY in order to use the new KafkaSink version.

KafkaCommittable schema change

A checkpointed KafkaCommittable would now become only:

...

Any user tooling that reads Flink checkpoints to inspect TIDs need to be updated to recognize the new schema.

KIP-939 Public Interface Summary

KIP-939 now adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer Java client API as that is what Flink’s KafkaSink interfaces with.

...

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

Proposed Changes for Flink’s KafkaSink

Here we’ll go through the interaction changes between Flink’s KafkaSink and Kafka throughout the following lifecycles of the 2PC integration: 1) pre-commit phase (i.e. on checkpoint barrier), 2) commit phase (i.e. on checkpoint complete RPC notification), and 3) restore phase (i.e. on failure and restoring from successful Flink checkpoint).

As you’ll see, none of the main 2PC interaction flow has been altered and we also do not require any changes to the TwoPhaseCommittingSink abstraction. The actual changes are quite minimal as we're mainly just calling 2 new methods on Kafka's producer Java client, but for the sake of completeness, this FLIP will briefly illustrate the full 2PC integration while highlighting the actual changes.

Pre-Commit Phase (on checkpoint)

On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:

...

Note that in step 3, we only need to snapshot the TID of transactions in Flink checkpoints, as opposed to before where the TID, internal producer ID, and epoch (latter two obtained via Java reflection on the producer instance) needs to be snapshotted. This is because on restore, we no longer need to inject the internal producer ID and epoch on resumed producer instances to bypass the initTransactions() method. More on this in the On Restore section.

Failure Points

  • If the flush() or prepareTransaction() call in step 1 fails, then we strictly fail the job as this is a synchronization point failure in 2PC.

  • Failing to write the snapshot of the buffer in step 3 to Flink’s managed state backend is also a synchronization point failure and will fail the job.

  • For step 4, if a new producer instance cannot be obtained for the next checkpoint’s transaction (either the initialization failed, or all possible TIDs have been depleted which is a possibility with FLIP-ZZZ: TID Pooling for KafkaSink), we choose to fail the job by design. Strictly speaking, this is just implementation detail and not a synchronization point failure for 2PC; it is possible to delay / retry obtaining the new producer instance.

Commit Phase (on checkpoint complete notification)

The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM notify them of the completion) remains the same without any changes required.

When a KafkaSink subtask is notified that checkpoint N was successfully completed, all buffered prepared producers up to checkpoint N (recall that the buffer is ordered by checkpoint ID) will be retrieved and committed. Afterwards, the prepared producers may be removed from the buffer and released (depending on the implementation, this can mean either putting it back into a producer instance pool for reuse by future checkpoints if we reuse TIDs, or just closed).

Failure Points

  • If commitTransaction() call fails for any transaction, the KafkaSink retries the commit later. After attempting up to a maximum number of retries, only then does the Flink job fail.

Restore Phase (on operator initialization)

On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.

...

The main notable change here is in case (1), where for each restored TID the KafkaSink needs to create a new producer instance to resume the transaction and commit it. As opposed to before where we had to create this producer instance and use Java reflection to inject the internal producer ID and epoch in order to bypass the initTransactions() call, we can now simply initialize the producer using initTransactions(true) to retain the previous prepared transaction.

Compatibility, Deprecation, and Migration Plan

Upgrading to new KafkaSink

To upgrade, Flink jobs using older versions of the KafkaSink will need to do the following:

...

It will be strongly recommended to upgrade the KafkaSink version as soon as possible, since it inherently poses a risk of data loss.

Relaxing TWO_PHASE_COMMIT ACL requirement for smoother upgrade path

So far, when authentication is enabled for the Kafka cluster, KIP-YYY assumes that the TWO_PHASE_COMMIT ACL is setup in order for authenticated producer clients to set transaction.two.phase.commit.true to true as well as use Producer#initTransactions(true) to resume previous transactions. In other words, to use the new KafkaSink, the TWO_PHASE_COMMIT ACL must be setup as mentioned in step 2 in the section above. KIP-939 gates 2PC participation behind a new ACL because it is arguably a dangerous feature when used incorrectly (e.g. when used without a properly functioning external transaction coordinator, or even the lack of one).

...

If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-YYY relax the ACL constraint such that the Producer#initTransactions(true) operation only needs WRITE ACL to work and not the extra new TWO_PHASE_COMMIT ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.

Rejected Alternatives

Supporting two versions of Kafka’s InitPidRequest protocol for live migration

WIP

Appendix: Current State of Kafka Sink

Data Loss Scenario

WIP

Using Java Reflections to Bypass InitPidRequest on Restore

WIP