Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

During the discussion for KIP-980, which proposed the creation of connectors in STOPPED state, there was a suggestion to also allow setting the initial offset for a connector in the connector creation API. The proposal was deemed valid (point no.4) but was deferred to a future KIP. This KIP proposes to implement that change.


This feature will be a helpful to users who want to create a connector and start from a specific offset.

...

{

    "name": "file-src-1",

    "config": {

        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",

        "file": "test.txt",

        "topic": "test-topic",

        "tasks.max": "1"

    },

    "initial_offsets": [

    {

      "partition": {

        //  source partition

      },

      "offset": {

        // Desired initial offset

      }

    }

  ]

}


As mentioned in KIP-875: First-class offsets support in Kafka Connect#Altering/resettingoffsets(response) , the response message (`initial_offsets_response`) for successfully setting the initial_offsets will depend on whether the connector has implemented alterOffsets()  method.

If the connector has implemented alterOffsets  and everything has succeeded, the HTTP status will be 200 and the create-connector response body will be:

response (definite success)

{

   "name": "ConnectorOne",

    "config": {...},
  "initial_offsets_response": "The offsets for this connector have been set successfully"
}

If the connector has not implemented alterOffsets  but everything else has succeeded, the HTTP status will be 200 and the create-connector response body will be:
response (possible success)

{

   "name": "ConnectorOne",

    "config": {...},


  "initial_offsets_response": "The framework-managed offsets for this connector have been set successfully. However, if this connector manages offsets externally, they will need to be manually set in the system that the connector uses."
}

This method will be supported in both the distributed mode as well as the standalone mode.

...

  1. Validate connector configs
  2. Validate the offset using the same checks performed while altering connector offsets (PATCH /$connector/offsets ) as specified in KIP-875 (We will skip validations which check if the connector exists or is in STOPPED state)
  3. Submit the connector config with STOPPED state to config topic.
  4. Wipe all existing offsets for the connector (i.e we do not want to merge existing offsets and "initial_offsets")
  5. Set Update the offset of the connector created in step 1 to the value of “initial_offsets”If “initial_
  6. state” property of the connector is set

    Write connector config (with whatever initial state is specified in the request,

    the runtime will set the state of the connector to that value. If “initial_state” is not specified, the runtime will set the state of the connector to RUNNING

    or the default if none is specified)

If Connect runtime encounters an error in any of these steps, it will cleanup (if required) and return an error response mentioning the step at which the failure was encountered.

If step no. 4  fails, the existing offsets of the connector would've already been wiped in step no. 3.

If step no. 5  fails, the initial_offset set in step 4 will be wiped as part of cleanup.

In either failure case, the offset of the connector, set prior to invoking this API,  will be unrecoverable.


Minor details

Validate offset before connector creation

...