...
During the discussion for KIP-980, which proposed the creation of connectors in STOPPED state, there was a suggestion to also allow setting the initial offset for a connector in the connector creation API. The proposal was deemed valid (point no.4) but was deferred to a future KIP. This KIP proposes to implement that change.
This feature will be a helpful to users who want to create a connector and start from a specific offset.
...
{ "name": "file-src-1", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "file": "test.txt", "topic": "test-topic", "tasks.max": "1" }, "initial_offsets": [ { "partition": { // source partition }, "offset": { // Desired initial offset } } ] } |
As mentioned in KIP-875: First-class offsets support in Kafka Connect#Altering/resettingoffsets(response) , the response message (`initial_offsets_response
`) for successfully setting the initial_offsets will depend on whether the connector has implemented alterOffsets()
method.
If the connector has implemented alterOffsets
and everything has succeeded, the HTTP status will be 200 and the create-connector response body will be:
response (definite success)
"name": "ConnectorOne", "config": {...}, |
If the connector has not implemented alterOffsets
but everything else has succeeded, the HTTP status will be 200 and the create-connector response body will be:
response (possible success)
"name": "ConnectorOne", "config": {...},
|
This method will be supported in both the distributed mode as well as the standalone mode.
...
- Validate connector configs
- Validate the offset using the same checks performed while altering connector offsets (
PATCH /$connector/offsets
) as specified in KIP-875 (We will skip validations which check if the connector exists or is in STOPPED state) - Wipe all existing offsets for the connector (i.e we do not want to merge existing offsets and "initial_offsets")
- Set the offset of the connector to the value of “initial_offsets”
Write connector config (with whatever initial state is specified in the request, or the default if none is specified)
If Connect runtime encounters an error in any of these steps, it will cleanup (if required) and return an error response mentioning the step at which the failure was encountered.
If step no. 4 fails, the existing offsets of the connector would've already been wiped in step no. 3.
If step no. 5 fails, the initial_offset set in step 4 will be wiped as part of cleanup.
In either failure case, the offset of the connector, set prior to invoking this API, will be unrecoverable.
Minor details
Validate offset before connector creation
...