Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

Discussion thread: here

Vote thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect currently defines a default REST API request timeout of 90 seconds which isn't configurable. If a REST API request takes longer than this timeout value, a 500 Internal Server Error  response is returned with the message "Request timed out". In exceptional scenarios, a longer timeout may be required for operations such as connector config validation / connector creation (which internally does a config validation first) to complete successfully. Consider a database / data warehouse

The POST /connectors  and the PUT /connectors/{connector}/config  endpoints that are used to create or update connectors internally do a connector configuration validation (the details of which vary depending on the connector plugin) before proceeding to create or update the connector. If the configuration validation takes longer than 90 seconds, the connector is still eventually created after the config validation completes (even though a 500 Internal Server Error  response is returned to the user) which leads to a fairly confusing user experience.

Furthermore, this situation is exacerbated by the potential for config validations occurring twice for a single request. If Kafka Connect is running in distributed mode, requests to create or update a connector are forwarded to the Connect worker which is currently the leader of the group, if the initial request is made to a worker which is not the leader. In this case, the config validation occurs both on the initial worker, as well as the leader (assuming that the first config validation is successful) - this means that if a config validation takes longer than 45 seconds to complete each time, it will result in the original create / update connector request timing out.

Slow config validations can occur in certain exceptional scenarios - consider a database connector which has elaborate validation logic involving querying information schema to get a list of tables /and views to validate the user's connector configuration. If the database / data warehouse has a very high number of tables / and views and the database / data warehouse is under a heavy load in terms of query volume, such information schema queries can end up taking longer than 90 seconds which will cause connector config validation / creation REST API calls to timeout. 

Public Interfaces

This KIP proposes to add a new Kafka Connect worker configuration - rest.api.request.timeout.ms  which will default to the existing REST API request timeout of 90 seconds.

Proposed Changes

being considerably slow to complete.

Public Interfaces

The behavior of the existing POST /connectors  and the PUT /connectors/{connector}/config  endpoints will be modified in cases where the configuration validation exceeds the request timeout. Instead of proceeding with the connector create / update operation (which is the current behavior), we will abort the request.

Proposed Changes

After the configuration validation completes for a request to POST /connectors or PUT /connectors/{connector}/config, a check will be made to verify that the request timeout hasn't already been exceeded. If it has, the connector create / update request will be aborted.

Another change that will be made with this KIP is to avoid the double connector config validation issue in Connect's distributed mode. Workers will directly forward requests to create or update a connector to the leader without performing any config validation first. The only small benefit of the existing approach is that it avoids request forwarding to the leader for requests with invalid connector configs. However, it can be argued that it's cheaper and more optimal overall to forward the request to the leader at the outset, and allow the leader to do a single config validation before writing to the config topic. Since config validations are done on their own thread and are typically short lived operations, it should not be an issue even with large clusters to allow the leader to do all config validations arising from connector create / update requests (the only situation where we're adding to the leader's load is for requests with invalid configs, since the leader today already has to do a config validation for forwarded requests with valid configs). Note that the PUT /connector-plugins/{pluginName}/config/validate endpoint doesn't do any request forwarding and can be used if frequent validations are taking place (i.e. they can be made on any worker in the cluster to avoid overloading the leaderThe value of the new worker config rest.api.request.timeout.ms will be read in the RestServer class and will be used to configure the request timeout of each of its resources (each resource essentially represents a group of related Connect REST APIs under a common top level path) via ConnectResource::requestTimeout. Note that this doesn't change how long requests actually run in the herder - currently, if a request exceeds the default timeout of 90 seconds we simply return with the 500 response but the request isn't interrupted or cancelled and is allowed to continue to completion. Furthermore, each connector config validation is anyway done on its own thread via a cached thread pool executor in the herder (create connector calls are done asynchronously by simply writing a record to the Connect cluster's config topic, so config validations are the only relevant operation here).

Compatibility, Deprecation, and Migration Plan

The proposed changes are fully backward compatible since we're just introducing a new worker config for REST API request timeouts which will default to the existing REST API request timeout of 90 secondsshouldn't have any backward compatibility concerns outside of the unrealistic scenario where users are relying on the current behavior of connector create / update requests proceeding to completion even when config validation causes the request to exceed the timeout value. Note that this would still be possible by manually writing the connector's configuration to the Connect cluster's config topic.

Test Plan

...

  • Add an integration test

...

  • to

...

  • verify that a connector is not created if config validation exceeds the request timeout.
  • Add an integration test to verify that config validation only occurs a single time when requests to create or update a connector are made to a worker which is not the leader.
  • Add unit tests wherever applicable.


Rejected Alternatives

Introduce a new internal endpoint to persist a connector configuration without doing a config validation

Summary: Instead of forwarding all create / update requests to the leader directly, we could do a config validation on the non-leader worker first and if the validations pass forward the request to a new internal-only endpoint on the leader which will just do the write to the config topic without doing a config validation first.

Rejected because: Introduces additional complexity with very little benefit as opposed to simply delegating all config validations from create / update requests to the leader. Furthermore, this could have security implications where the internal endpoint could be abused to bypass config validation (although the internal endpoint could potentially be secured using the mechanism introduced in KIP-507: Securing Internal Connect REST Endpoints)

Rejected Alternatives

Allow configuring timeouts for each REST resource

Summary: The Kafka Connect REST server initializes multiple "resources" including the ConnectorsResource  (serving APIs with the path /connectors) and the ConnectorPluginsResource  (serving APIs with the path /connector-plugins) among others. We could allow configuring the request timeouts for each of these resources individually via separate Connect worker properties.

Rejected because: This would require the introduction of multiple new Kafka Connect worker properties with negligible additional value.

Allow configuring timeouts for ConnectClusterStateImpl

Summary: Currently, ConnectClusterStateImpl  is configured in the RestServer and passed to REST extensions via the context object (see here). ConnectClusterStateImpl takes a request timeout parameter for its operations such as list connectors and get connector config (implemented as herder requests). This timeout is set to the minimum of ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS (90 seconds) and DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG  (defaults to 60 seconds). We could use the value of the new worker config proposed in this KIP instead of ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS in the minimum calculation.

Rejected because: The overall behavior would be confusing to end users (they'll need to tweak two configs to increase the overall timeout) and there is seemingly no additional value here (as the herder requests should not take longer than the current configured timeout anyway).

Allow configuring producer zombie fencing admin request timeout

Summary: ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS is also used as the timeout for producer zombie fencings done in the worker for exactly once source tasks (see here). We could allow configuring this as well via the new worker config proposed in this KIP.

Rejected because: Zombie fencing is an internal operation for Kafka Connect and users shouldn't be able to configure it.