Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The PATCH /connectors/{connector}/offsets  endpoint will be useful for altering the offsets of stopped connectors. Requests will be rejected if the connector does not exist on the cluster (based on the config topic), if a rebalance is pending, if the connector is not in the STOPPED  state (described below), or if it does not have an empty set of tasks in the config topic. If successful, the request will be met with a 204 ("no content") 200 response and an empty response the appropriate body. The request will fail with a 404 response if the connector does not exist on the cluster, and will fail with a 400 response if the connector does exist but is not in the correct state.

The connector's alterOffsets  method (described below) will be invoked, before modifying its consumer group offsets or source offsets.

Requests to this endpoint will only augment any existing offsets for the connector; they will not implicitly remove any offsets for the connector that are not included in the request body.

If exactly-once source support is enabled on a worker that receives a request to alter offsets for a source connector, it will fence out all previously-running tasks for the connector (if any exist, based on the presence of a task count record in the config topic) before invoking alterOffsets  and altering the offsets for the connector. Offsets will be altered transactionally for the connector's primary offsets topic (i.e., the one that is written to transactionally by source tasks when the connector is running). The transactional ID used for this operation will be ${groupId}-${connector}, where ${groupId} is the group ID of the Connect cluster and ${connector} is the name of the connector.

Requests to this endpoint will only augment any existing offsets for the connector; they will not implicitly remove any offsets for the connector that are not included in the request body.

All offset alter requests will be forwarded to the leader of the cluster.

...

The DELETE /connectors/connector/offsets  endpoint will be useful for resetting the offsets of stopped connectors. Requests will be rejected if the connector does not exist on the cluster (based on the config topic), if a rebalance is pending, if the connector is not in the STOPPED  state (described below), or if it does not have an empty set of tasks in the config topic. If successful, the request will be met with a 204 ("no content") response and an empty response 200 response and the appropriate body. The request will fail with a 404 response if the connector does not exist on the cluster, and will fail with a 400 response if the connector does exist but is not in the correct state. This endpoint will be idempotent; multiple consecutive requests to reset offsets for the same connector with no new offsets produced in between those requests will all result in a 204 200 response (if they are successful).

...

If exactly-once source support is enabled on a worker that receives a request to reset offsets for a source connector, it will fence out all previously-running tasks for the connector (if any exist, based on the presence of a task count record in the config topic) before invoking alterOffsets  and resetting the offsets for the connector. Offsets will be reset transactionally for each topic that they exist in: a single transaction will be used to emit all tombstone records for the connector's dedicated offsets topic (if one is used) and another transaction will be used to emit all tombstone records for the worker's global offsets topic alterOffsets  and resetting the offsets for the connector. Offsets will be reset transactionally for the connector's primary offsets topic (i.e., the one that is written to transactionally by source tasks when the connector is running). The transactional ID for this operation will be ${groupId}-${connector}, where ${groupId} is the group ID of the Connect cluster and ${connector} is the name of the connector. If this topic is different from the worker's global offsets topic, the connector's offsets will be removed from that topic as well, but without the use of a transaction.

Requests to reset sink connector offsets will be satisfied by deleting the consumer group for that sink connector (as opposed to deleting all known offsets for that consumer group). Requests to reset sink connector offsets will fail if there are any active members of the sink connector's consumer group.

...

This feature is fully backwards-compatible with existing Kafka Connect releases. Migration should occur automatically whenever a Kafka Connect cluster is upgraded to a version that supports this feature.

Additional connector ACLs

If exactly-once source support is enabled on a worker, in order to handle requests to alter or reset offsets, the connector's principal must be able to use a transactional ID of ${groupId}-${connector}, where ${groupId} is the group ID of the Connect cluster and ${connector} is the name of the connector.

STOPPED target state

Diligent readers will note that the addition of a new target state creates problems with cluster downgrades and rolling upgrades. If a connector is stopped, and a worker running an older version of Kafka Connect either joins or currently exists in the cluster, that worker may not know how to handle the new record in the config topic that includes the request to stop the connector.

...