Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Supporting two versions of Kafka’s InitPidRequest protocol for live migration

WIP

Appendix:

...

Known Critical Issues with current KafkaSink

Data Loss Scenario

Data loss under exactly-once mode occurs for Flink's KafkaSink under the following scenario:

...

Generally speaking, as long as there is enough delay for sink subtasks to receive checkpoint complete notifications to trigger the second phase of 2PC, the scenario described above can occur. This can happen simply due to a misfortunately configured combo of Flink's checkpoint interval / delay and Kafka transaction timeout, or if the Flink job happens to fail right after a checkpoint completes and remains down for an extended amount of time.

Using Java Reflections to Bypass

...

InitProducerId request on Restore

At any point in time, a Flink job may fail for any reason, causing it restart from the last completed Flink checkpoint. Within the Flink checkpoints, the KafkaSink stores TIDs (Kafka's transactional.id) of Kafka transactions that were pre-commited as part of the checkpoint. On restart, these TIDs are restored and their previous ongoing transactions should be (possibly redundantly) committed.

The main issue is that Kafka, by definition of its current transaction protocol, does not allow a recovered producer client instance to resume its previous transaction. By protocol, all newly created or recovered producer clients are expected to issue an InitProducerId request to initialize the producers (e.g. obtain internal producer id and epoch). When brokers receive this initialization request, it would check if the producer instance had a previous on-going transaction, and if so, abort it.

To be able to resume pre-commited transactions, Flink's KafkaSink avoids having to issue InitProducerId  after recovering producer clients by 1) extracting the internal producer id and epoch, and storing them alongside TIDs in Flink checkpoints, 2) on restart, these values are restored and injected in the client, and 3) alter the client state so that it is considered to be mid-transaction. All of this is done via Java reflection as these are not publicly accessible.

The code that implements all of this can be found in FlinkKafkaInternalProducer : https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java.WIP