Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The objective of this KIP is to modify the existing “connector restart” method in the Kafka Connect REST API to allows a user to issue one request to restart all or just the failed Connector and Task instances for a named connector. We need to keep However, any changes should use optional query parameter that default to the existing behavior of the method to just restarting just restart the Connector object must be retained for backward compatibility, but we can add optional query parameters that make the method more closely match the expectation of many userswhen those optional query parameters are not supplied by clients.

Public Interfaces

REST API

...

The state of each restarting instance will eventually transition to STOPPED when the assigned worker stops that instance, and then to RUNNING to RUNNING once when assigned worker (re)starts that instance. The user can monitor this progress with subsequent calls to the GET /connector/{connectorName}/status method, though a user using the REST API to monitor the status of the connector may not observe all of these transitions between RESTARTING and RUNNING.

...

A new RESTARTING state will be added to the AbstractStatus.State enumeration, and all corresponding code using this will be changed accordingly. Only the herders will explicitly set the RESTARTING state in the state store and in the metrics; the states of each instance will transition to STOPPED when to RUNNING when the instance is stopped, and to RUNNING when the instance is restartedrestarted. The details of how this happens differs in the Standalone and Distributed runtimes.

...

  1. Get the current states of the Connector and Task instances and determine which the herder will target for restart.

  2. If at least one instance is to be restarted:
    1. Stop and await all targeted instances, which will transition the states to STOPPED.

    2. Set the RESTARTING state for these targeted instances.

    3. Restart all targeted instances, which will transition the states to RUNNING.

  3. Build a ConnectorStateInfo result based upon the original status, with "state=RESTARTING" for all instances that were restarted.

...

The “restart request” written to the config topic will not conflict with any other types of config records, will be compatible with the compacted topic, and will look like:, which already is where the connector and task config records, task state change records, and session key records are written. This topic also make sense since all records related to restarts and configuration changes are totally ordered, and are all processed within the herder's `tick()` method. The "restart request" records will not conflict with any other types of config records, will be compatible with the compacted topic, and will look like:

    key: “restart-connector-<connectorName>”
    value: {“include-tasks”    key: “restart-connector-<connectorName>”
    value: {“include-tasks”: <true|false>, "only-failed": <true|false>}

...

The proposed API changes are entirely backward compatible. Restarting a named connector with the default query parameters results in always restarting only the Connector instance, which is the same behavior as in previous releases.

Rejected Alternatives

Use REST API for Worker-to-Worker

...

Communication 

When a worker receives a restart request via the REST API, it could determine which Connector and Task instances are to be restarted, and then issues a REST API restart request to each worker to signal the instances that worker should restart. However, this fan-out pattern had several disadvantages. Although unlikely, it still is possible that the original restart request could time out if the delegated restart requests each take a long time. Second, the implementation would have been more complex to parallelize the delegated restart requests to each worker. Third, it is less reliable as network errors, rebalances, and other interruptions might result in only a subset of the targeted instances being restarted, especially when the number of workers workers is large.

On the other hand, the current approach is more reliable, since once the restart request is written to the config topic it will be eventually consumed by all workers. The current proposal also builds upon and reuses much more of the existing functionality in the worker, making the overall implementation more straightforward. There is also no chance for changing worker assignments to interfere with the restarts, since the current approach performs the restarts during the same herder tick method that reacts to all rebalance changes. And, the new approach is more efficient, as some restart requests can be ignored if the worker will subsequently (re)start its assigned instances. For example, if a restart for a connector is requested but one of the worker is itself restarted (or joins the cluster), the worker as part of startup will start all of its assigned Connector and Task instances, making the restart unnecessary.

...

The current proposal makes the restart method asynchronous because making it synchronous has a few disadvantages. First, most of the other REST API methods that deal with stopping or starting connectors are asynchronous, because those operations can potentially be long-running. Second, this is exacerbated with connectors that have large numbers of tasks, or connector implementations that do not stop in an adequate time window. (Strictly speaking the latter is already addressed via other fixes, but even having the restart request be potentially-long running leads to potentially poor user experience.) Third, by combining this with the config topic based approach we can achieve much higher guarantees that the restart will be processed.

Persist the Restart Requests the Status Topic

As mentioned above, the proposal is to write the new "restart records" to the config topic. This makes a lot of sense, especially since it makes sense that the restart requests are totally ordered with respect to other configuration-related records in the config topic. Plus, there is precedence for other kinds of "non-config" records in the config topic.

However, it would also be possible to store the restart records in the status topic. Unfortunately, the StatusBackingStore interface does not define a listener mechanism, and adding that would require more effort and be more complex. Plus, it's not clear that persisting the restart requests in the status topic is any better than the config topic.