Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The timing for offset commits will remain the same as it is today; they will be triggered periodically on a fixed interval that users can adjust via the offset.flush.interval.ms property. Transaction boundaries and record batches will be defined by these offset commits; expanded support for transaction boundary definition can be added later and is noted in the "future work" section.

Once an offset commit is complete, if the connector is configured with a custom offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

...

When a rebalance is triggered, before (re-)joining rejoining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

...

  1. For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
    1. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
      1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector.** 
    2. Write the new task count record for all of the fenced-out connectors to the config topic
  2. Read to the end of the config topic to verify that all task count records have been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster.

...