Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Source offset accuracy: The Connect framework periodically writes source task offsets to an internal Kafka topic at a configurable interval, once the source record records that they correspond to has have been successfully sent to Kafka. This provides at-least-once delivery guarantees for most source connectors, but allows for the possibility of duplicate writes in cases where source records are written to Kafka but the worker is interrupted before it can write the offsets for those records to Kafka.

...

When exactly.once.source.enabled is set to true for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for offsets to be committed for a task, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

The timing for offset commits will remain the same as it is today; they will be triggered periodically on a fixed interval that users can adjust via the offset.flush.interval.ms property. Transaction boundaries and record batches will be defined by these offset commits; expanded support for transaction boundary definition can be added later and is noted in the "future work" section.

Once an offset commit is complete, if the connector is configured with a custom offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

...

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is, by default, is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster (but may be overwritten by users using . If users try to override this by setting the transactional.id worker property), the user-provided value will be ignored and a message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

...

Operation

Resource Type

Resource Name

Write

TransactionalIdThe

group.id of the cluster, or the transactional.id if one is specified in the worker configuration.connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

...