Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It has been a long-standing issue that Flink’s Kafka Sink implementation for exactly-once delivery guarantees comes with two critical pitfalls (please see the Appendix for an illustrated a detail recap of why these issues exist):

  1. Data loss can occur when Kafka aborts a successfully checkpointed transaction due to timeout. Users were recommended to mitigate this by setting a substantially large transactionstransaction.timeout.ms Kafka setting, but ultimately there’s always the chance of data loss if it takes long enough to restore the job. Moreover, Kafka brokers can limit (transaction.max.timeout.ms broker config) the upper bound of the timeout, and users don't always have access to alter this (e.g. if they are writing to a managed Kafka cluster service).

  2. When a KafkaSink is restored from a checkpoint, checkpointed transactions need to be resumed and committed. Here, to create a producer instance that resumes is capable of resuming the previous on-going transaction, the KafkaSink implementation relies heavily on Java reflection in order to bypass the Kafka transaction protocol, which by definition aborts any previous ongoing transactions when a producer session is recovered. The use of Java reflection makes upgrading Kafka clients in Flink hard and highly error-prone.

These issues exist because Kafka’s transaction protocol was originally designed with the simple “read-process-write“ loop in mind, where each transaction is made by a single consumer-producer process atomically reading from and writing to Kafka, and is . Such transactions are expected to be short-lived only for the duration of this loop. Therefore, any client downtime would result in the transaction being aborted by Kafka after the client is recovered so that the new producer session can start with a clean state. The same goes for any transactions that are in-doubt for too long, e.g. if the producer process is lost forever, Kafka aborts the transaction on timeout to prevent it from permanently blocking downstream consumers.

These protocol behaviors directly prevent Kafka producers from being safely usable as a participant in an externally-coordinated 2PC transaction. As a 2PC participant, a Kafka producer needs to be able to guarantee that once a transaction is pre-committed, it is permanently durable and can stay in-doubt as long as it takes for the external coordinator to issue a commit the second phase that decides the transaction fate. In Flink terms, once a KafkaSink subtask flushes its writes to the ongoing transaction and acknowledges the corresponding Flink checkpoint (the pre-commit voting phase), the transaction should remain in-doubt until all KafkaSink sink subtasks also acknowledge the checkpoint and then JM (the external transaction coordinator) notifies all subtasks to commit their transactions (the commit phase).

Kafka is attempting to address this with KIP-939: Support Participation in 2PC. In short, with KIP-939 it is now possible to declare that a transactional Kafka producer is participating in an externally-coordinated 2PC transaction. Effectively, this means Kafka acknowledges that an external transaction coordinator exists and has have full control in deciding transaction fate; Kafka will no longer proactively abort transactions from the 2PC-participating producer based on timeout, and will also allow resuming previous transactions so that the external coordinator has the final say in whether to commit or abort it. For a more detailed summary of KIP-939, please see KIP-939 Public Interface Summary.

To conclude, having a new KafkaSink that works with This FLIP proposes to update Flink's KafkaSink connector to integrate with the new public APIs made available by KIP-939 for proper 2PC integration brings the following benefits for our users:

  • No more risk of data loss when using the KafkaSink under EXACTLY_ONCE mode, and

  • A more robust implementation that does not rely on Java reflection on the Kafka Java client

. As the KafkaSink was already implemented on top of Flink's TwoPhaseCommittingSink abstraction, the actual code changes required is minimal. Practically, this would include:

  • Remove FlinkKafkaInternalProducer, which implements the Java reflection accesses on the Kafka client to bypass the Kafka transaction protocol
  • Integrate with the new KIP-939 transaction operation API methods
  • Implement state schema migration paths (via savepointing and recovering the job) due to a required schema change in the KafkaCommittable

To conclude, this improvement brings the following benefits for our users:

  • No more risk of data loss when using the KafkaSink under EXACTLY_ONCE mode

  • A much more robust implementation that does not rely on Java reflection on the Kafka Java client. For Flink devs, this improves code maintainability for the KafkaSink, and overall makes it much easier to upgrade Kafka client version. For our users, this will indirectly resolve many productions issues users have been reporting with their Kafka-writing Flink jobs, such as long startup times and orphaned hanging transactions.

Public Interfaces

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

...

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-YYY 939 in order to use the new KafkaSink version.

...

A checkpointed KafkaCommittable would now become only:

class KafkaCommittable {
    String transactionalId;
private final StringPreparedTxnState transactionalIdpreparedTxnState; }

as opposed to the previous schema which contains (producerId, producerEpoch, TID). We no longer need to persist producerId and producerEpoch because the Kafka’s new InitPidRequest protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol.

...