...
{ "name": "file-src-1", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "file": "test.txt", "topic": "test-topic", "tasks.max": "1" }, "initial_offsets": [ { "partition": { // source partition }, "offset": { // Desired initial offset } } ] } |
As mentioned in KIP-875: First-class offsets support in Kafka Connect#Altering/resettingoffsets(response) , the response message (`initial_offsets_response
`) for successfully setting the initial_offsets will depend on whether the connector has implemented alterOffsets()
method.
If the connector has implemented alterOffsets
and everything has succeeded, the HTTP status will be 200 and the create-connector response body will be:
response (definite success)
"name": "ConnectorOne", "config": {...}, |
If the connector has not implemented alterOffsets
but everything else has succeeded, the HTTP status will be 200 and the create-connector response body will be:
response (possible success)
"name": "ConnectorOne", "config": {...},
|
This method will be supported in both the distributed mode as well as the standalone mode.
...
If Connect runtime encounters an error in any of these steps, it will cleanup (if required) and return an error response mentioning the step at which the failure was encountered.
If step no. 4 fails, the existing offsets of the connector would've already been wiped in step no. 3.
If step no. 5 fails, the initial_offset set in step 4 will be wiped as part of cleanup.
In either failure case, the offset of the connector, set prior to invoking this API, will be unrecoverable.
Minor details
Validate offset before connector creation
...