Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Removed line that originated in the template

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a user runs a connector on Apache Kafka Connect, the framework starts one instance of the connector’s Connector implementation class and one or more instances of the connector’s Task implementation class. Any of these instances can experience an error. Generally if the Connector or Task instance throws an exception, the Connect framework will mark that instance as failed and expose it as FAILED through the Connect REST API.

Currently users must use the REST API status method and/or JMX metrics to monitor the health ("status") of each named connector's Connector and Task instances. If any of those instances have failed, a user must issue a separate REST API call to manually restart each of the Connector and Task instances.

...

Code Block
languagetext
titleSample Connector Status Response With Some Failed Tasks
GET /connectors/my-connector/status
200 OK
{
    "name": "my-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "fakehost1:8083"
    },
    "tasks":
    [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "fakehost2:8083"
        },
        {
            "id": 1,
            "state": "FAILED",
            "worker_id": "fakehost3:8083",
            "trace": "Unable to execute HTTP request: Connect to data.example.com:443 failed: connect timed out"
        },
        {
            "id": 2,
            "state": "FAILED",
            "worker_id": "fakehost1:8083",
            "trace": "Unable to execute HTTP request: Connect to data.example.com:443 failed: connect timed out"
        }
    ]
}

Here we can see that the Connector instance and the one of the three Task instances have a RUNNING state, but task 1 and 2 each have a FAILED state. If a user detects this and wants to restart the two failed tasks (task 1 and task 2), they have to manually request a restart of each of the failed tasks:

    POST /connectors/my-connector/tasks/1/restart

and

    POST /connectors/my-connector/tasks/2/restart

Some custom tooling can be built to extract the numbers of the failed tasks from the connector’s status response, but Kafka Connect does not do any of this out of the box.

How do the restart methods currently work?

There are two restart methods in the Kafka Connect REST API:

  • POST /connectors/{connectorName}/restart – restarts only the Connector instance for the named connector.

  • POST /connectors/{connectorName}/tasks/{taskNum}/restart – restarts only the Task instance for the named connector and specified task number.

Most users expect that the first method restarts everything associated with the named connector. This is a sensible expectation, but it unfortunately does not at all align with the actual behavior described above.

A user can submit such restart requests to any worker. If the worker is assigned the corresponding Connector or Task instance, the worker will perform the restart. Otherwise, the worker is not assigned the instance and must forward the request to the worker that is assigned that instance. (If the worker receiving the request is not the leader, it actually forwards the request to the leader first, which then either restarts its assigned instance or forwards it to the worker assigned the instance.)

As the connector or task are restarted, changes in the status of Connector and Task instances are written to the internal status topic by the worker to which those instances are assigned. Every worker consumes this topic and therefore knows the current status of all Connector and Task instances for all named connectors. This is why any worker can respond to the GET /connectors/{connectorName}/status or GET /connectors/{connectorName}/tasks/{taskNum}/status methods.

Scope

The objective of this KIP is to modify the existing “connector restart” method in the Kafka Connect REST API to allows a user to issue one request to restart all or just the failed Connector and Task instances for a named connector. We need to keep the existing behavior of the method to just restart the Connector object must be retained for backward compatibility, but we can add optional query parameters that make the method more closely match the expectation of many users.

Public Interfaces

REST API

Two existing methods of the Connect REST API will be changed: the connector restart method and the connector status method. No other REST API methods will be changed.

Restart Method

The existing method to request a restart of the Connector object for a named connector is as follows:

    POST /connectors/{connectorName}/restart

...

    POST /connectors/{connectorName}/restart?includeTasks=<true|false>&onlyFailed=<true|false>

where the behavior of these parameters is as follows:

  • the "includeTasks" parameter specifies whether to restart the connector instance and task instances ("includeTasks=true") or just the connector instance ("includeTasks=false"), and defaults to "false".

  • the "onlyFailed" parameter specifies whether to restart just the instances with a FAILED status ("onlyFailed=true") or all instances ("onlyFailed=false"), and defaults to "false".

The default value of these new query parameters is such that invocations without them result in the same old behavior of this REST API method: the Connector instance is restarted regardless of its current status, and no Task instances are restarted. The query parameters must be used to restart any other combination of the Connector and/or Task instances for the named connector. This approach satisfies the backward compatibility requirement.

The response of this method will be changed slightly, but will still be compatible with the old behavior (where "includeTasks=true" and "onlyFailed=false"). When these new query parameters are used with "includeTasks" and/or "onlyFailed" set to true, a successful response will be 202 ACCEPTED, signaling that the request to restart some subset of the Connector and/or Task instances was accepted and will be asynchronously performed.

  • 200 OK when the named connector exists and the server has successfully stopped and begun restarting only the Connector object (e.g., "includeTasks=false" and "onlyFailed=false"). No response body will be returned (to align with the existing behavior).

  • 202 ACCEPTED when the named connector exists and the server has successfully and durably recorded the request to stop and begin restarting at least one failed or running Connector object and Task instances (e.g., "includeTasks=true" or "onlyFailed=true"). A response body will be returned, and it is similar to the GET /connector/{connectorName}/status response except that the "state" field is set to RESTARTING for all instances that will eventually be restarted.

  • 404 NOT FOUND when the named connector does not exist.

  • 409 CONFLICT when a rebalance is needed, forthcoming, or underway while restarting any of the Connector and/or Task objects; the reason may mention that the Connect cluster’s leader is not known, or that the worker assigned the Connector cannot be found.

  • 500 Internal Server Error when the request timed out (takes more than 90 seconds), which means the request could not be durably recorded, perhaps because the worker or cluster are shutting down or because the worker receiving the request has temporarily lost contact with the Kafka cluster.

Note that this method is asynchronous: a 202 ACCEPTED response indicates that the request to restart was accepted, and that all Connector and/or Task objects specified in the request will eventually be restarted. This closely aligns with the other asynchronous REST API methods (e.g., create connector, delete connector, etc.), and is better able to scale to connectors with large numbers of tasks.

Using our example above, we could use this expanded restart method to request the restart of all failed Connector and/or Task instances:

Code Block
titleExample Usage of Restart Connector and Tasks
POST /connectors/my-connector/restart?includeTasks=true&onlyFailed=true
202 ACCEPTED
{
    "name": "my-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "fakehost1:8083"
    },
    "tasks":
    [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "fakehost2:8083"
        },
        {
            "id": 1,
            "state": "RESTARTING",
            "worker_id": "fakehost3:8083"
        },
        {
            "id": 2,
            "state": "RESTARTING",
            "worker_id": "fakehost1:8083"
        }
    ]
}

The 202 ACCEPTED response signifies that the request to restart the instances has (at least) been accepted and enqueued for completion. This restart might take some time depending upon the state of the Connect cluster and the number of tasks in the connector, but it will eventually be performed in its entirety.

Also, the response signals that the Connector instance and task 0 were not going to be restarted. The request included the "onlyFailed=true" query parameter, and both the Connector instance and task 0 were RUNNING when this call was made. However, if either of these instances were to fail before their assigned workers process the restart request, they too would be restarted. Note that if a worker is restarted before it processes the restart request, it may skip its share of the restart request since the worker will restart all of its assigned Connector and Task instances as part of its normal startup process. 

The state of each restarting instance will eventually transition to STOPPED when the assigned worker stops that instance, and then to RUNNING once when assigned worker (re)starts that instance. The user can monitor this progress with subsequent calls to the GET /connector/{connectorName}/status method, though a user using the REST API to monitor the status of the connector may not observe all of these transitions between RESTARTING and RUNNING.

Status Method

The existing REST API method will also be changed slightly:

    GET /connector/{connectorName}/status

The response of this method will be changed to include "RESTARTING" as a possible value for the "state" fields. This aligns with the POST /connectors/{connectorName}/restart?includeTasks=true.

Metrics

The following existing metrics will be modified:

MBean Name

Change

New Range of Values

kafka.connect:type=connector-metrics,name=status,connector=([-.\w]+)Add "restarting" as a possible value for this metric.one of: "unassigned", "running", "paused", "failed", "restarting" or "destroyed"
kafka.connect:type=connector-task-metrics,name=status,connector=([-.\w]+)Add "restarting" as a possible value for this metric.one of: "unassigned", "running", "paused", "failed", "restarting" or "destroyed"

...