Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP proposes to add a new query parameter "timeout"  to the following Kafka Connect REST API endpoints:

  • PUT /connector-plugins/{pluginName}/config/validate 
  • POST /connectors 
  • PUT /connectors/{connector}/config 

The POST /connectors  and PUT /connectors/{connector}/config  endpoints internally do a config validation first (and only proceed to connector creation / updation if the validation passes), which is why the "timeout"  parameter is relevant for these endpoints too.

A new Kafka Connect worker configuration - rest.api.max.request.timeout.ms  which will default to the existing REST API request timeout of 90 seconds.be added to configure an upper bound for the timeout  query parameter on the above 3 REST API endpoints. The default value for this config will be 600000 (10 minutes) and it will be marked as a low importance config

Proposed Changes

The value of the new worker config rest.apitimeout here will be updated to use the value from the timeout query parameter if specified (else fallback to the current default 90 seconds) for the aforementioned endpoints. If the value for the "timeout" parameter is invalid (<= 0 or > rest.api.max.request.timeout.ms), a 400 Bad Request response will be returned.

Note that a higher/lower configured timeout will be read in the RestServer class and will be used to configure the request timeout of each of its resources (each resource essentially represents a group of related Connect REST APIs under a common top level path) via ConnectResource::requestTimeout. Note that this doesn't change how long requests actually run in the herder - currently, if a request exceeds the default timeout of 90 seconds we simply return with the 500 response but the request isn't interrupted or cancelled and is allowed to continue to completion. Furthermore, each connector config validation is anyway done on its own thread via a cached thread pool executor in the herder (create / update connector calls are done asynchronously by simply writing a record to the Connect cluster's config topic, so config validations are the only relevant operation here).

This KIP also proposes to change the behavior of the POST /connectors  and the PUT /connectors/{connector}/config  endpoints on request timeouts - currently, even if the connector config validation takes too long and causes a timeout response to be returned to the user, the connector create/update request is still made if the config validation completes successfully eventually. This can be pretty confusing to users and is generally a poor user experience because a 500 Internal Server Error  response should mean that the request couldn't be fulfilled. This behavior will be changed to be more intuitive - if the config validation takes too long and exceeds the timeout (either configured via the proposed new timeout query parameter or the default 90 seconds), the request will be aborted and the connector won't be created/updated.

Another small improvement will be made to avoid double connector config validations when Connect is running in distributed mode - currently, if a request to POST /connectors or PUT /connectors/{connector}/config is made on a worker that isn't the leader of the group, a config validation is done first, and the request is forwarded to the leader if the config validation is successful (only the leader is allowed to do writes to the config topic, which is what a connector create / update entails). The forwarded request results in another config validation before the write to the config topic can finally be done on the leader. The only benefit of this approach is that it avoids request forwarding to the leader for requests with invalid connector configs. However, it can be argued that it's cheaper and more optimal overall to forward the request to the leader at the outset, and allow the leader to do a single config validation before writing to the config topic. Since config validations are done on their own thread and are typically short lived operations, it should not be an issue to allow the leader to do all config validations arising from connector create / update requests (the only situation where we're adding to the leader's load is for requests with invalid configs, since the leader today already has to do a config validation for forwarded requests with valid configs). Note that the PUT /connector-plugins/{pluginName}/config/validate endpoint doesn't do any request forwarding and can be used if frequent validations are taking place (i.e. they can be made on any worker in the cluster to avoid overloading the leader).

Compatibility, Deprecation, and Migration Plan

The proposed changes are fully backward compatible since we're just introducing a new worker config for REST API request timeouts which will default to the existing REST API request timeout of 90 secondsoptional query parameter to 3 REST API endpoints along with a new worker configuration that has a default value.

Test Plan

A simple integration test will be added to ensure that a validate REST API request for a connector that takes longer than the default REST API request timeout (90 seconds) doesn't fail on a worker configured with rest.request.timeout.ms  if the query parameter timeout is set to a higher value. Unit tests will be added wherever applicable.


Rejected Alternatives

...

Configure the timeout via a worker configuration

Summary: The Kafka Connect REST server initializes multiple "resources" including the ConnectorsResource  (serving APIs with the path /connectors) and the ConnectorPluginsResource  (serving APIs with the path /connector-plugins) among others. We could allow configuring the request timeouts for each of these resources individually via separate Connect worker properties A Kafka Connect worker configuration could be introduced to control the request timeouts.

Rejected because: This would require the introduction of multiple new Kafka Connect worker properties with negligible additional valuedoesn't allow for per request timeout configuration and also requires a worker restart if changes are requested. Configuring the timeout via a request query parameter allows for much more fine-grained control.


Allow configuring timeouts for ConnectClusterStateImpl

Summary: Currently, ConnectClusterStateImpl  is configured in the RestServer and passed to REST extensions via the context object (see here). ConnectClusterStateImpl takes a request timeout parameter for its operations such as list connectors and get connector config (implemented as herder requests). This timeout is set to the minimum of ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS (90 seconds) and DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG  (defaults to 60 seconds). We could use the value of the new worker config proposed in this KIP instead of ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS in the minimum calculationallow configuring these timeouts too.

Rejected because: The overall behavior would be confusing to end users (they'll need to tweak two configs to increase the overall timeout) and there is seemingly no additional value here (as the herder requests should not take longer than the current configured timeout anyway).

...

Summary: ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS is also used as the timeout for producer zombie fencings done in the worker for exactly once source tasks (see here). We could allow configuring this timeout as well via the new worker config proposed in this KIP.

Rejected because: Zombie fencing is an internal operation for Kafka Connect and users shouldn't be able to configure it.

...