Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

MirrorSinkTask

There are several key challenges to make EOS happen across clusters. Those challenges are discussed here one-by-one:

(1) in MirrorMaker case, there are source and target clusters. The consumer pulls data and stores its offsets are typically stored in source cluster, while the producer sends to target cluster. However Kafka transaction can not happen across clusters out-of-the-box. What modifications need to be done?

A: The short answer is the consumer offsets, which are supposed to be stored on source cluster, are maintained / committed by transactional producer and stored on the target cluster instead.

But the consumer still has to live on the source cluster in order to pull the data and however the offsets are not stored in source cluster (or stored in source cluster, not not accurate). We propose to use the following idea to position the consumer correctly while its offsets are in target cluster: (the pseudocode are shown in below)

  • MirrorSinkTask don't rely on Connect's internal offsets tracking or __consumer_offsets on the source cluster.
  • the offsets are only written by transaction producer to the target cluster.
  • offsets are stored on the target cluster using a "fake" consumer group. The "fake" means there would be no actual records being consumed by the group, just offsets being stored in __consumer_offsets topic.
  • all records are written in a transaction.
  • when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster.

The outcome of the above idea:

  • if the transaction succeeds, the __consumer_offsets topic on the target cluster is updated.
  • if the transaction aborts, all data records are dropped, and the __consumer_offsets topic is not updated.
  • when MirrorSinkTask starts/restarts, it resumes at the last committed offsets, as stored in the target cluster.

Some items to pay attention in order to make above idea work correctly:

  • If consumer group already exists on source cluster, while the "fake" consumer group (with same Group Id) on the target cluster does not exist or its offsets lower than the high watermark. We need to do a one-time job to sync the offsets from source cluster to target cluster.
  • Since the "fake" consumer group becomes "Single source of truth" and will be only maintained by MirrorSinkTask, some cases may need special handling, e.g. adding or deleting topics for existing set of topics to be replicated.

(2) The consumer in WorkerSinkTask periodically commits the offsets that are maintained by itself, how to disable this behavior so that transactional producer in MirrorSinkTask can control when and what to commit?

A: The SinkTask provides a preCommit() method let the implementation of SinkTask (here is MirrorSinkTask) to determine what offsets should be committed in WorkerSinkTask. If in transaction mode, we will intentionally return empty Map in preCommit() (see the code below), so that if the return of preCommit() is empty, the consumer of WorkerSinkTask will ignore committing the offsets that are maintained by itself.


The following is the pseudocode snippet illustrates the high-level key implementation:

...