Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.


Status

Current state: Under Discussion

Discussion thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

A new optional field "initial_offsetoffsets"  will be added to the request body format for the POST /connectors endpoint. The value for this field has to adhere to format specified in KIP-875 else a 400 Bad Request response will be returned. Please note that sink and source connectors have different schemas for offset.


If the field is omitted in the request body, the current behaviour of creating connectors will be preserved. An example request body would look like:

...

{

    "name": "file-src-1",

    "config": {

        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",

        "file": "test.txt",

        "topic": "test-topic",

        "tasks.max": "1"

    },

    "initial_offsetoffsets": [

    {

      "partition": {

        // Connector-defined   source partition

      },

      "offset": {

        // Connector-defined source Desired initial offset

      }

    }

  ]

}


This method will be supported in both the distributed mode as well as the standalone mode.

...

When Connect runtime receives a request with initial_offset offsets to create a connector via POST /connectors, it will perform the following steps

  1. Create a connector in STOPPED stateValidate connector configs
  2. Validate the offset using the same checks performed while altering connector offsets (PATCH /$connector/offsets ) as specified in KIP-875 (We will skip validations which check if the connector exists or is in STOPPED state)
  3. Submit the connector config with STOPPED state to config topic.
  4. Update the offset of the connector created in step 1 to the value of “initial_offset”offsets”
  5. If “initial_state” property of the connector is set in the request, the runtime will set the state of the connector to that value. If “initial_state” is not specified, the runtime will set the state of the connector to RUNNING

If Connect runtime encounters an error in any of these steps, it will delete the connector cleanup (if required) and return an error response.


Minor details

Validate offset before connector creation

There is a slight chance of a race condition if two connector creation requests with same name , with and without initial_offsets , are received. The request without offset might create the connector while the other one validates and writes the offset. We still would like to validate the offset and config first - so that there is lesser chance of a cleanup while creating the connector and setting it's offset.

Remanent offsets from connectors with same name

There could be cases when a previous instance of connector with same name has offsets left in the offsets topic. The offsets sent along with the create connector request will be written after wiping out the previous offsets. This is done to ensure a clean slate for the connector, thereby avoiding any surprises later.

Compatibility, Deprecation, and Migration Plan

Since “initial_offset” offsets” is an optional field and when unspecified will default to current behaviour, we do not foresee a compatibility issue while downgrading the Connect cluster. This new feature will use the current OffsetStorageWriterreuse existing logic for storing offsets. Hence the connector will be indistinguishable from one which started with no initial offset specified and in due course committed an offset to the store.

...

  • Create connector with initial_offset offsets in standalone mode
  • Create connector with initial_state and initial_offset offsets in standalone mode
  • Create connector with initial_offset offsets in distributed mode
  • Create connector with initial_state and initial_offset offsets in distributed mode
  • Failure while creating connector with initial_offset offsets will clean up and return error response
  • Standalone mode cli accepts initial_offset offsets in JSON config file


Rejected Alternatives

Restrict initial_offset offsets usage to cases where initial_state == STOPPED

We could enforce intial_offset usage only to cases when a connector is created in STOPPED state. This would’ve reduced one point of failure while performing the action. However,  we feel that this will impact the usability of this new API and will not offer any major benefit from the steps which a user has to run currently , namely, creating a connector in STOPPED state , altering the offset and then setting the state to RUNNING.

Validate initial_offset before creating the connector in STOPPED state

We could’ve validated the initial_offset _before_ creating the connector and thereby avoid a clean-up if the offset is not valid. However, validating offset after connector creation helps in the following ways

...