Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Draft

Discussion threadhttps://lists.apache.org/thread.html/r9d1c89b871792655cd14ff585980bb0ace639d85d9200e239cc0e1cd%40%3Cdev.kafka.apache.org%3E

Voting threadhttps://lists.apache.org/thread.html/rbfe08bfb15e14db14c54d1ca5c86bfcd17dc952084ad0a4dec8255b6%40%3Cdev.kafka.apache.org%3E

...

  • as mentioned above, Kafka Connect Source Connector / Task do not provide EOS by nature, mostly because of async and periodic source task offset commit, which in MirorrMaker case, the "offset" is consumer offset. So this is one blocker to enable EOS without a lots of changes in WorkerSourceTask.
  • MirrorSourceTask explicitly avoided using subscribe() and instead handle rebalances and new topic-partitions explicitly. While MirrorSinkTask that extends SinkTask is initiated by WorkerSinkTask. WorkerSinkTask uses consumer.subscribe() in which the benefits of rebalances and auto-detection of new topics/partitions are out-of-the-box. When the consumer or its rebalance handling is upgraded in WorkerSinkTask as part of Kafka Connect, MirrorSinkTask will take the advantages transparently.
  • Since MirrorSinkTask is a new implementation, on the other end we can fully control how a producer is created (transactional v.s. non-transactional) and handle various Exception cases (especially in transactional mode), purely in the new implementation, Since MirrorSinkTask is a new implementation, we can fully control how a producer is created (transactional v.s. non-transactional) and handle various Exception cases (especially in transactional mode), purely in the new implementation, rather than changing the existing producer in WorkerSourceTask
  • HDFS Sink Connector already achieved EOS, we can correctly implement MirrorSinkTask based on the methods of SinkTask by referring the best practices from HDFS Sink Connector.

...

A: The short answer is consumer group offsets are managed, committed by the transactional producer and are stored on the target cluster instead.

However the the consumer still has to live on the source cluster in order to pull the data, but it seems the “source-of-truth” offsets are not no longer stored in the source cluster. We propose to use the following idea to rewind the consumer correctly when data mirroring task restarts or rebalances, while its the “source-of-truth” of consumer offsets are stored in the target cluster: (the pseudocode are shown in below)

    MirrorSinkTask don't rely on Connect's internal offsets tracking or
  • Consumer offsets are stored on the target cluster using a “fake” consumer group, that can be created programmatically as long as we know the name of consumer group. The “fake” means there would be no actual records being consumed by the group, just offsets being stored in __consumer_offsets
  • on the source cluster.
  • the offsets are only written by transaction producer to the target cluster.
  • topic. However, the __consumer_offsets topic on the target cluster (managed by the “fake” consumer group) is the “source of truth” offsets.
  • offsets are stored on the target cluster using a "fake" consumer group. The "fake" means there would be no actual records being consumed by the group, just offsets being stored in
  • With the “fake” consumer group on target cluster, MirrorSinkTask don't rely on Connect's internal offsets tracking or
  • __consumer_offsets
  • topic. However, the __consumer_offsets topic on the target cluster (managed by the "fake" consumer group) is the "source of truth" offsets
  • on the source cluster.
  • the consumer offsets are only written by the producer evolved in the transaction to the target cluster.
  • all records are written in a transaction., as if in the single cluster
  • when MirrorSinkTask when MirrorSourceTask starts or rebalances, it loads initial offsets from __consumer_offsets on the target cluster.

The outcome of the above idea:

  • if the transaction succeeds, the __consumer_offsets topic on the target cluster is updated
  • .
  • by following the current protocol of Exactly-Once framework
  • if the transaction aborts, all data records are dropped, and the __consumer_offsets topic on the target cluster is not updated.
  • when MirrorSinkTask starts/restarts, it resumes at the last committed offsets, as stored in the target cluster.

...