Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: Pause/resume: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-2370
, Restart: 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3506

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

New APIs

Pause connector

Method: POSTPUT

Path: /connectors/{connector}/pause

Description: This API asynchronously causes the connector and its tasks to suspend processing. If the connector is already paused, this is a no-op. The paused state is persistent, which means that the connector will stay paused even after cluster restartsrebalances or is restarted.

Response Codes: 202 (Accepted) on successful pause initiation or if the command is a no-op, 404 if the connector doesn't exist

Resume Connector

Method: POSTPUT

Path: /connectors/{connector}/resume

...

Response Codes: 202 (Accepted)  on on successful resume initiation or if the command is a no-op, 404 if the connector doesn't exist

...

Path: /connectors/{connector}/restart

Description: This API asynchronously synchronously restarts the connector task. This is a no-op if the cluster is rebalancing.

Response Codes: 202 (Accepted) on 204 on successful restart initiation, 404 if the connector doesn't exist, and 409 if the task is already being restarted

Query parameter: from_leader forward: indicates whether the restart was sent by the leadercan be forwarded

Restart Task

Method: POST

Path: /connectors/{connector}/tasks/{task}/restart

Description: This API asynchronously synchronously restarts a specific worker task. This is a no-op if the cluster is rebalancing.

Response Codes: 202 (Accepted) on 204 on successful restart initiation, 404 if the connector doesn't exist, and 409 if the task is already being restarted

Query parameter: from_leaderforward: indicates whether the restart was sent by the leadercan be forwarded

Proposed Changes

Pause/resume Connector

...

Currently, the configuration records stored in the config topic use simple delimited strings. For example, connector configs are stored using the key "connector-\{name\}" (where "\{name\}" indicates the name of the connector). We propose to follow this convention and use the key "connector-state-\{name\}." The record value will contain a simple object with a "state" property indicating the state of the connector and will be serialized using the internal value converter (the same as we do for config objects themselves).

Usage: Since this API is asynchronous, users of this endpoint will need to poll the connector's /status endpoint to verify that the expected state transition has completed. Pausing the connector requires all of the connector's tasks to transition and there may be an observable delay between the individual task transitions. Additionally, if the cluster is rebalancing at the time of the command, the transition won't take effect until after the rebalance has completed.

Restart Tasks

Task The restart API addresses the problem of restarting a failed or defunct connector. Task restarts are one-time commands which either complete or fail at the time of the request, so there is no need to persist them indefinitely. Additionally, when If the cluster is already in the process of rebalancing or if another user has already initiated a restart, we can simply ignore the command since rebalances force all tasks to restart anywaythen we return an error to the user so that the restart can be tried again. If we later change the rebalance behavior to restart tasks selectively (e.g. if we used a sticky partitioning approach), then we can also alter the behavior of this API to restart while the rebalance is in processprogress.

Since there is no persistence needed, we propose to use the existing HTTP forwarding mechanism to send the restart to the current worker which is hosting the task. This generally requires two hops: one to the leader since it is the only worker which knows the full task assignments, and one to the worker which hosts the task at the moment. When the restart request is received on the worker hosting the task, it responds to the request and begins the restart. Note that there is a risk of creating a request loop since a rebalance might cause the task to be reassigned before a pending request can be handled, but we can break the loop by including a flag in the request to indicate whether the request is from the current leader. If the request is sent by the leader, then the worker receiving it will not bother forwarding the request back to the leader and instead return a 404, which will be handled by the leader.

...

Restart Routing: Instead of depending on two hops to route the restart requests, we could distribute the full task assignments to all workers on every rebalance. Then each worker would know exactly where to route the restart request. Unfortunately, this makes the overhead of the rebalance protocol excessive as the number of workers increases (the order of the message size is O(n^2) where n is the number of workers).