...
If exactly-once source support is enabled on a worker that receives a request to reset alter offsets for a source connector, it will fence out all previously-running tasks for the connector (if any exist, based on the presence of a task count record in the config topic) before invoking alterOffsets
and resetting altering the offsets for the connector. This will require forwarding to the leader if the worker is not the leader. Offsets will be reset altered transactionally for each topic that they exist in: a single transaction will be used to emit all records for the connector's dedicated primary offsets topic (if one is used) and another transaction will be used to emit all records for the worker's global offsets topici.e., the one that is written to transactionally by source tasks when the connector is running). The transactional ID used for this operation will be ${groupId}-${connector}
, where ${groupId}
is the group ID of the Connect cluster and ${connector}
is the name of the connector.
Requests to this endpoint will only augment any existing offsets for the connector; they will not implicitly remove any offsets for the connector that are not included in the request body.
...
If exactly-once source support is enabled on a worker that receives a request to reset offsets for a source connector, it will fence out all previously-running tasks for the connector (if any exist, based on the presence of a task count record in the config topic) before invoking alterOffsets
and resetting the offsets for the connector. This will require forwarding to the leader if the worker is not the leader. Offsets will be reset transactionally for each topic that they exist in: a single transaction will be used to emit all tombstone records for the connector's dedicated primary offsets topic (if one is used) and another transaction will be used to emit all tombstone records for the worker's global offsets topici.e., the one that is written to transactionally by source tasks when the connector is running). The transactional ID for this operation will be ${groupId}-${connector}
, where ${groupId}
is the group ID of the Connect cluster and ${connector}
is the name of the connector. If this topic is different from the worker's global offsets topic, the connector's offsets will be removed from that topic as well, but without the use of a transaction.
Requests to reset sink connector offsets will be satisfied by deleting the consumer group for that sink connector (as opposed to deleting all known offsets for that consumer group). Requests to reset sink connector offsets will fail if there are any active members of the sink connector's consumer group.
...
This feature is fully backwards-compatible with existing Kafka Connect releases. Migration should occur automatically whenever a Kafka Connect cluster is upgraded to a version that supports this feature.
Additional connector ACLs
If exactly-once source support is enabled on a worker, in order to handle requests to alter or reset offsets, the connector's principal must be able to use a transactional ID of ${groupId}-${connector}
, where ${groupId}
is the group ID of the Connect cluster and ${connector}
is the name of the connector.
STOPPED target state
Diligent readers will note that the addition of a new target state creates problems with cluster downgrades and rolling upgrades. If a connector is stopped, and a worker running an older version of Kafka Connect either joins or currently exists in the cluster, that worker may not know how to handle the new record in the config topic that includes the request to stop the connector.
...