...
- Resetting the offsets for a connector while iterating rapidly in a development environment (so that the connector does not have to be renamed each time)
- Viewing the in-memory offsets for source connectors on the cluster in order to recover from accidental deletion of the Connect source offsets topic (this is currently possible with the config topic, and can be a life-saver)
- Monitoring the progress of running connectors on a per-source-partition basis
- Skipping records that cause issues with a connector and cannot be addressed using existing error-handling features
Kafka Connect has also evolved over the years to support some use cases that make offset management complex. These include:
- Creating connectors that target a different Kafka cluster than the one that hosts the Kafka Connect cluster's internal topics (possible as of KIP-458: Connector Client Config Override Policy)
- Creating sink connectors with an overridden, user-specified consumer group ID (possible as of KIP-458: Connector Client Config Override Policy)
- Creating source connectors with a custom offsets topic (possible as of KIP-618)
Any support for viewing and manipulating connector offsets added to Kafka Connect should accommodate all of these cases.
Public Interfaces
Endpoints
...
Verb | Path | Summary | ||||
---|---|---|---|---|---|---|
GET | /connectors/{connector}/offsets | Retrieve the offsets for a connector; the connector must exist | ||||
GETDELETE | /offsets/sourceconnectors/{connector} | Retrieve the source offsets for a connector; the connector may or may not exist | GET | /offsets/sink/{connector}?rawGroupId={true/false} | Reset the Retrieve the sink offsets for a connector; the connector may or may not exist | |
DELETE | /offsets/source/{connector} | Delete the source offsets for a connector; the connector must not exist | ||||
must exist, and must be in the STOPPED state (described below) | ||||||
PUT | /connectors | DELETE | /offsets/sink/{connector} | ?rawGroupId={true/false}Delete the sink offsets for a connector/stop | Stop the connector, but do not delete it (described below); the connector must | notexist |
Response format
All responses that return Queries for a connector's offsets will use one of these formats, depending on whether the offsets are for a source or a sink connector. Care is taken to keep the two formats symmetrical, and suitable for use with both endpoints that require the connector to exist and endpoints that do not.
Code Block | ||||
---|---|---|---|---|
| ||||
{ "sourceoffsets": { "offsetssource": [ { "partition": { // Connector-defined source partition }, "offset": { // Connector-defined source offset } } ] } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "sinkoffsets": { "offsetssink": [ { "partition": { "topic": // Kafka topic "partition": // Kafka partition }, "offset": { "offset": // Kafka offset } } ] } } |
...
Offsets endpoints behavior
The GET /connectors/{connector}/offsets
endpoint will be useful for examining the offsets of a currently-running connector. It will automatically deduce whether the connector is a source or sink based on the connector's configuration, and then return the offsets for the connector. The request will fail with a 404 response if the connector does not exist on the cluster.
The GET /offsets/source/{connector}
and GET /offsets/sink/{connector}
endpoints will be useful for examining the offsets of deleted connectors, possibly in preparation for resetting them. Requests will be accepted regardless of whether the connector exists on the cluster. The request will fail with a 404 response if no offsets can be found for the connector.The DELETE /offsets/source/{connector}
and DELETE /offsets/sink/{connector}
endpoints DELETE /connectors/connector/offsets
endpoint will be useful for resetting the offsets of deleted stopped connectors. Requests will be rejected if the connector still exists does not exist on the cluster (based on the config topic), or if a rebalance is pending, or if the connector is not in the STOPPED
state (described below). If successful, the request will be met with a 204 ("no content") response and an empty response body. The request will fail with a 404 response if no offsets can be found for the connector.The rawGroupId
parameter, whose default value will be false
, can be used to control the consumer group ID that is used to look up and reset offsets for a sink connector. By default, Connect uses the group ID connect-{connector}
for sink connectors, but this can be overridden using the consumer.override.group.id
property in a sink connector's configuration. When rawGroupId
is false
, the connect-
prefix followed by the connector name will be used as the consumer group ID. However, when rawGroupId
is set to true
, only the connector name will be used as the consumer group ID. (In this case, the {connector}
part of the URL need not actually be the connector name; it should be the name of the consumer group ID used for the connector.)the connector does not exist on the cluster, and will fail with a 400 response if the connector does exist but is not in the correct state. This endpoint will be idempotent; multiple consecutive requests to reset offsets for the same connector with no new offsets produced in between those requests will all result in a 204 response (if they are successful).
A source offset will only be considered successfully deleted if the Connect worker is able to emit a tombstone to the offsets topic for its partition, and then read to the end of the offsets topic. A request to reset offsets for a source connector will only be considered successful if the worker is able to delete all known offsets for that connector, on both the worker's global offsets topic and (if one is used) the connector's dedicated offsets topic.
If exactly-once source support is enabled on a worker that receives a request to reset offsets for a source connector, it will fence out all previously-running tasks for the connector (if any exist, based on the presence of a task count record in the config topic) before resetting the offsets for the connector. Offsets will be reset transactionally for each topic that they exist in: a single transaction will be used to emit all tombstone records for the connector's dedicated offsets topic (if one is used) and another transaction will be used to emit all tombstone records for the worker's global offsets topic.
Requests to reset sink connector offsets will be satisfied by deleting the consumer group for that sink connector (as opposed to deleting all known offsets for that consumer group).
All offset reset requests will be forwarded to the leader of the cluster.
New target state: STOPPED
Background: target states today
Kafka Connect currently supports two "target states" for a connector: RUNNING
(the default), and PAUSED
. The target state for a connector can be controlled using the REST API, using the PUT /connectors/connector/resume
endpoint for the RUNNING
state and the PUT /connectors/connector/paused
endpoint for the PAUSED
state.
When a connector is paused, its tasks continue to exist on the cluster. Many resources allocated for them, including Kafka and Kafka Connect resources such as clients, converters, and transformations, and connector-specific resources such as database connections, file descriptors, memory buffers, remain allocated. This can lead to confusing and sometimes suboptimal behavior when users pause connectors but notice that resources (especially database connections) are still allocated.
In addition, connectors and tasks are allocated across the cluster without regard to their target state during rebalance. This can lead to some skew in resource usage (such as network traffic and CPU utilization) across the cluster; in the worst case, all paused tasks are allocated to one set of workers, and all running tasks are allocated to a disjoint set of workers.
A new STOPPED state
A new target state will be added for connectors: STOPPED
. The semantics for a connector that becomes stopped will be:
- The connector config remains present in the config topic of the cluster (if running in distributed mode), unmodified
- The connector config remains visible in the REST API
- All tasks for the connector are shut down completely. If running in distributed mode, a set of empty tasks is published to the config topic for the connector
- If running in distributed mode, as a result of the empty set of task configs being published to the config topic, a rebalance will be triggered, and no tasks for the connector will be allocated across the cluster
- As a further result of that rebalance, any information on the connector provided by the REST API will show it with an empty set of tasks
- The
Connector
, if running, will be shut down (by invokingConnector::stop
and deallocating all Kafka Connect- and Kafka-related resources allocated for it by Kafka Connect) - The
Connector
will still appear in the status portion of the REST API, with a state ofSTOPPED
. This will take place even if the connector was in theFAILED
state before the request to stop it, or if it failed during shutdown in response to a request to stop - If running in distributed mode, the
Connector
will still be assigned to a worker during each rebalance - The
Connector
will not be started (by invokingConnector::start
), and it will not be able to generate new task configurations (by invokingConnectorContext::requestTaskReconfiguration
)
When a stopped connector is resumed or paused (there is no difference between the two transitions), the semantics will be:
- The
Connector
is started on the worker that it is assigned to, and allowed to generate a set of task configurations - In standalone mode, tasks are immediately brought up based on that set of configurations
- In distributed mode, if that set of configurations is non-empty, it is written to the config topic, and a rebalance ensues
- All further behavior is identical to any other cases where a new set of task configurations for a connector is generated (including a connector being reconfigured via the REST API, or a connector automatically generating a new set of configurations based on a monitoring thread that polls the external system for changes)
Compatibility, Deprecation, and Migration Plan
Offsets endpoints
This feature is fully backwards-compatible with existing Kafka Connect releases. Migration should occur automatically whenever a Kafka Connect cluster is upgraded to a version that supports this feature.
Test Plan
Integration and/or system tests will be added for these cases:
STOPPED target state
Diligent readers will note that the addition of a new target state creates problems with cluster downgrades and rolling upgrades. If a connector is stopped, and a worker running an older version of Kafka Connect either joins or currently exists in the cluster, that worker may not know how to handle the new record in the config topic that includes the request to stop the connector.
With some careful work, we can actually gracefully degrade in this scenario instead of confusing the worker running the older version of Kafka Connect.
Instead of publishing a "naive" record to the config topic with contents like this:
Key | Value | |||||||
---|---|---|---|---|---|---|---|---|
target-state-{connector} |
|
We can "fool" older workers into treating STOPPED
requests as PAUSE
requests by emitting records with this format:
Key | Value | |||||||
---|---|---|---|---|---|---|---|---|
target-state-{connector} |
|
Older workers will inspect the state
field of the record value, see that it is PAUSED
, and pause the parts of the connector that they are assigned.
Newer workers will first inspect the state.v2
field and, if it is found, use that as the new target state. If no state.v2
field is found, they will fall back on the state
field.
Test Plan
Integration and/or system tests will be added for these cases:
Offsets tests:
- Cannot view the offsets of a nonexistent connector
- Cannot view the offsets of a nonexistent connector via the
GET /connectors/{connector}/offsets
endpoint - Cannot view the offsets of a connector for which no offsets exist via the
GET /offsets/source/{connector}
andGET /offsets/sink/{connector}
endpoints - Cannot delete the offsets of a connector for which no offsets exist via the
DELETE /offsets/source/{connector}
andDELETE /offsets/sink/{connector}
endpoints - Can view the offsets of a running sink and source connector via the
GET /connectors/{connector}/offsets
endpoint, and verify that that those offsets reflect an expected level of progress for each connector (i.e., they are greater than or equal to a certain value depending on how the connectors are configured and how long they have been running)Can view - Cannot reset the offsets of a running sink and source connector via the
GET /offsets/source/{connector}
andGET /offsets/sink/{connector}
endpoints, making similar assertions about an expected level of progressAfter deleting those connectors, can still view their offsets via theGET /offsets/source/{connector}
andGET /offsets/sink/{connector}
endpointsnonexistent, paused, or running connector - Can reset the offsets for those connectors after they are deleted via the
DELETE /offsets/source/{connector}
andDELETE /offsets/sink/{connector}
endpointsof a stopped sink and source connector - Second follow-up requests to reset offsets for those connectors after their offsets have already been reset once are met with 404 responsesalso successful
- Can view/reset the offsets for a sink connector that uses an overridden consumer group ID
- Can view/reset the offsets for a source connector that uses a custom offsets topic
- Can view/reset the offsets for a sink and source connector that targets a different Kafka cluster than the one used for the internal topics of the Kafka Connect cluster
Stopped state tests:
- Can stop a running connector
- Can stop a paused connector
- Cannot see task configuration or status information in the REST API for a stopped connector
- Can resume a stopped connector
- Can pause a stopped connector
- Stopping a failed connector updates its state to
STOPPED
in the REST API - Stopping a connector that fails during shutdown after receiving a stop request updates its state to
STOPPED
in the REST API - Can resume a connector after its
Connector
has failed both before and during shutdown after receiving a stop request
Future work
Overwriting offsets
...