Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Correct response from restart API in AK 2.8 and earlier

Table of Contents

Status

Current state: Under Adopted

Discussion thread: here

Discussion Vote thread: here

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4793

...

The objective of this KIP is to modify the existing “connector restart” method in the Kafka Connect REST API to allows a user to issue one request to restart all or just the failed Connector and Task instances for a named connector. We need to keep However, any changes should use optional query parameter that default to the existing behavior of the method to just restarting just restart the Connector object must be retained for backward compatibility, but we can add optional query parameters that make the method more closely match the expectation of many userswhen those optional query parameters are not supplied by clients.

Public Interfaces

REST API

...

The response of this method will be changed slightly, but will still be compatible with the old behavior (where "includeTasks=true" and "onlyFailed=false"). When these new query parameters are used with "includeTasks" and/or "onlyFailed" set to true, a successful response will be 202 ACCEPTED, signaling that the request to restart some subset of the Connector and/or Task instances was accepted and will be asynchronously performed.

  • 200 OK 202 ACCEPTED when the named connector exists and the server has successfully stopped and begun restarting only the Connector object (e.g., "includeTasks=false" and "onlyFailed=false"). No response body will be returned (to align with the existing behavior).202 ACCEPTED when the named connector exists and the server has successfully and durably recorded the request to durably recorded the request to stop and begin restarting at least one failed or running Connector object and Task instances (e.g., "includeTasks=true" or "onlyFailed=true"). A response body will be returned, and it is similar to the GET /connector/{connectorName}/status response except that the "state" field is set to RESTARTING for all instances that will eventually be restarted.

  • 204 NO CONTENT when the named connector exists and the server has successfully stopped and begun restarting only the Connector object (e.g., "includeTasks=false" and "onlyFailed=false"). No response body will be returned (to align with the existing behavior).

  • 404 NOT FOUND when the named connector does not exist.404 NOT FOUND when the named connector does not exist.

  • 409 CONFLICT when a rebalance is needed, forthcoming, or underway while restarting any of the Connector and/or Task objects; the reason may mention that the Connect cluster’s leader is not known, or that the worker assigned the Connector cannot be found.

  • 500 Internal Server Error when the request timed out (takes more than 90 seconds), which means the request could not be durably recorded, perhaps because the worker or cluster are shutting down or because the worker receiving the request has temporarily lost contact with the Kafka cluster.

...

The state of each restarting instance will eventually transition to STOPPED to RUNNING once when the assigned worker stops that instance, and then to RUNNING once when assigned worker (re)starts that instance. The (re)starts that instance. The user can monitor this progress with subsequent calls to the GET /connector/{connectorName}/status method, though a user using the REST API to monitor the status of the connector may not observe all of these transitions between RESTARTING and RUNNING.

...

A new RESTARTING state will be added to the AbstractStatus.State enumeration, and all corresponding code using this will be changed accordingly. Only the herders will explicitly set the RESTARTING state in the state store and in the metrics; the states of each instance will transition to STOPPED when to RUNNING when the instance is stopped, and to RUNNING when the instance is restarted. The details of how this happens differs in the Standalone and Distributed runtimes.

...

  1. Get the current states of the Connector and Task instances and determine which the herder will target for restart.

  2. If at least one instance is to be restarted:
    1. Stop and await all targeted instances, which will transition the states to STOPPED.

    2. Set the RESTARTING state for these targeted instances.

    3. Restart all targeted instances, which will transition the states to RUNNING.

  3. Build a ConnectorStateInfo result based upon the original status, with "state=RESTARTING" for all instances that were restarted.

...

The 202 ACCEPTED response signifies that the “restart request” has been durably written to the config topic and all the workers in the Connect cluster will (eventually) see the restart request. If the worker reads the restart request as part of worker startup, it can ignore the restart request since the worker will subsequently attempt to start all of its assigned Connector and Task instances, effectively achieving the goal of restarting the instances assigned to that worker. If the worker reads the restart request after worker startup, then the DistributedHerder will enqueue the request to be processed within its next tick() invocation. As part of the tick() methodthe herder's main thread loop. During this main thread loop, the herder will dequeue all pending restart requests and for each request use the current connector status and the herder’s current assignments to determine which of its Connector and Task instances are to be restarted, and will then stop and restart them. Note that because this is done within the tick() methodmain thread loop, the herder will not concurrently process any assignment changes while it is executing the restart requests.

The “restart request” written to the config topic, which already is where the connector and task config records, task state change records, and session key records are written. This topic also make sense since all records related to restarts and configuration changes are totally ordered, and are all processed within the herder's main thread loop. The "restart request" records will not conflict with any other types of config records, will be compatible with the compacted topic, and will look like:

    key: “restart-connector-<connectorName>”
    value: {“include-tasks”: <true|false>, "only-failed-only": <true|false>}

Compatibility, Deprecation, and Migration Plan

The proposed API changes are entirely backward compatible. Restarting a named connector with the default query parameters results in always restarting only the Connector instance, which is the same behavior as in previous releases.

Rejected Alternatives

Use REST API for Worker-to-Worker

...

Communication 

When a worker receives a restart request via the REST API, it could determine which Connector and Task instances are to be restarted, and then issues a REST API restart request to each worker to signal the instances that worker should restart. However, this fan-out pattern had several disadvantages. Although unlikely, it still is possible that the original restart request could time out if the delegated restart requests each take a long time. Second, the implementation would have been more complex to parallelize the delegated restart requests to each worker. Third, it is less reliable as network errors, rebalances, and other interruptions might result in only a subset of the targeted instances being restarted, especially when the number of workers workers is large.

On the other hand, the current approach is more reliable, since once the restart request is written to the config topic it will be eventually consumed by all workers. The current proposal also builds upon and reuses much more of the existing functionality in the worker, making the overall implementation more straightforward. There is also no chance for changing worker assignments to interfere with the restarts, since the current approach performs the restarts during the same herder tick method that thread loop that reacts to all rebalance changes. And, the new approach is more efficient, as some restart requests can be ignored if the worker will subsequently (re)start its assigned instances. For example, if a restart for a connector is requested but one of the worker is itself restarted (or joins the cluster), the worker as part of startup will start all of its assigned Connector and Task instances, making the restart unnecessary.

...

The current proposal makes the restart method asynchronous because making it synchronous has a few disadvantages. First, most of the other REST API methods that deal with stopping or starting connectors are asynchronous, because those operations can potentially be long-running. Second, this is exacerbated with connectors that have large numbers of tasks, or connector implementations that do not stop in an adequate time window. (Strictly speaking the latter is already addressed via other fixes, but even having the restart request be potentially-long running leads to potentially poor user experience.) Third, by combining this with the config topic based approach we can achieve much higher guarantees that the restart will be processed.

Persist the Restart Requests the Status Topic

As mentioned above, the proposal is to write the new "restart records" to the config topic. This makes a lot of sense, especially since it makes sense that the restart requests are totally ordered with respect to other configuration-related records in the config topic. Plus, there is precedence for other kinds of "non-config" records in the config topic.

However, it would also be possible to store the restart records in the status topic. Unfortunately, the StatusBackingStore interface does not define a listener mechanism, and adding that would require more effort and be more complex. Plus, it's not clear that persisting the restart requests in the status topic is any better than the config topic.