Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:   [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

While upgrading a Connect cluster we have to replace workers. The leader today waits for sometime before deciding to re-assign tasks which were running on the worker which was replaced. We would like to have a way for a worker to to relinquish tasks and thereby signal the leader that the tasks can be safely re-assigned.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

During the discussion for KIP-980, which proposed the creation of connectors in STOPPED state, there was a suggestion to also allow setting the initial offset for a connector in the connector creation API. The proposal was deemed valid (point no.4) but was deferred to a future KIP. This KIP proposes to implement that change.


This feature will be a helpful to users who want create a connector and start from a specific offset.

To do this today, user will have to

  1. first create a connector in STOPPED state and check the response for errors
  2. alter the offsets for said connector and check the response for errors
  3. finally set the connector state to RUNNING.


Public Interfaces

"POST /connectors" REST API endpoint

A new optional field "initial_offset"  will be added to the request body format for the POST /connectors endpoint. The value for this field has to adhere to format specified in KIP-875 else a 400 Bad Request response will be returned. Please note that sink and source connectors have different schemas for offset.

If the field is omitted in the request body, the current behaviour of creating connectors will be preserved. An example request body would look like:

CreateConnectorRequest

{

    "name": "file-src-1",

    "config": {

        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",

        "file": "test.txt",

        "topic": "test-topic",

        "tasks.max": "1"

    },

    "initial_offset": [

    {

      "partition": {

        // Connector-defined source partition

      },

      "offset": {

        // Connector-defined source offset

      }

    }

  ]

}


This method will be supported in both the distributed mode as well as the standalone mode.

KIP-980 had introduced an option to pass JSON config to bin/connect-standalone.sh. This new field will be supported there as well.


Proposed Changes

When Connect runtime receives a request with initial_offset to create a connector via POST /connectors, it will perform the following steps

  1. Create a connector in STOPPED state
  2. Validate the offset using the same checks performed while altering connector offsets (PATCH /$connector/offsets ) as specified in KIP-875
  3. Update the offset of the connector created in step 1 to the value of “initial_offset”
  4. If “initial_state” property of the connector is set in the request, the runtime will set the state of the connector to that value. If “initial_state” is not specified, the runtime will set the state of the connector to RUNNING

If Connect runtime encounters an error in any of these steps, it will delete the connector and return an error responseDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

Since “initial_offset” is an optional field and when unspecified will default to current behaviour, we do not foresee a compatibility issue while downgrading the Connect cluster. This new feature will use the current OffsetStorageWriter. Hence the connector will be indistinguishable from one which started with no initial offset specified and in due course committed an offset to the store.



Test Plan

The following scenarios will be tested using Unit, Integration or system tests


  • Create connector with initial_offset in standalone mode
  • Create connector with initial_state and initial_offset in standalone mode
  • Create connector with initial_offset in distributed mode
  • Create connector with initial_state and initial_offset in distributed mode
  • Failure while creating connector with initial_offset will clean up and return error response
  • Standalone mode cli accepts initial_offset in JSON config file


Rejected Alternatives

Restrict initial_offset usage to cases where initial_state == STOPPED

We could enforce intial_offset usage only to cases when a connector is created in STOPPED state. This would’ve reduced one point of failure while performing the action. However,  we feel that this will impact the usability of this new API and will not offer any major benefit from the steps which a user has to run currently , namely, creating a connector in STOPPED state , altering the offset and then setting the state to RUNNING.


Validate initial_offset before creating the connector in STOPPED state

We could’ve validated the initial_offset _before_ creating the connector and thereby avoid a clean-up if the offset is not valid. However, validating offset after connector creation helps in the following ways

  • Multiple concurrent connector creation requests with the same connector name with and without initial_offset : this would’ve resulted in a race between the request which gets to write the offset first and the request which writes the connector state first
  • Reuse existing alter offset code : This code assumes that the connector already exists and is in STOPPED state