You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current stateDraft

Discussion threadN/A (will be added once KIP is published)

JIRAN/A (will be added before KIP is published)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Connect framework uses an internal REST endpoint to relay task configurations from follower worker nodes to the leader. This endpoint is unique in that it is only meant to be invoked by Connect workers; every other endpoint is documented as part of the public REST API. This in turn leads to a problem when the Connect REST API is secured and authentication is required; every other kind of request will read user-supplied credentials for authentication and use those credentials when forwarding requests to the leader. However, since workers query this internal endpoint entirely on their own, they can't rely on authentication credentials that would be supplied in a user's REST request.

Successful requests this endpoint allow for arbitrary rewrite of task configurations, which is a significant security vulnerability that could lead to leaking of topic data, writing arbitrary data to topics, and other serious problems. As a result, it is imperative that access to this endpoint be restricted in order for any Kafka Connect cluster to be considered truly secure.

An ideal restriction of this endpoint would guarantee that requests made to it come exclusively from another work in the same cluster. Although mutual authentication via TLS, for example, may seem like a viable approach, this only accomplishes authentication and not authorization; that is, it verifies that the request comes from a trusted party with a given identity, but it does not make distinctions about whether party should be allowed to perform actions on the cluster. If mutual authentication is used for the Connect REST API, then this endpoint is still effectively unsecured since any user of the public REST API may also access this endpoint.

Public Interfaces

There will be four new configurations added for distributed workers:

  • internal.request.verification
    • Purpose: control whether the internal Connect REST endpoint is restricted
    • Type: boolean
    • Default: true
  • internal.key.rotation.interval.ms
    • Purpose: how often to force a rotation of the internal key used for request validation, or 0 if forced rotation should never occur
    • Type: long
    • Default: 3600000 (one hour)
  • internal.key.signature.algorithm
    • Purpose: the algorithm to use to sign internal requests when sent from a follower worker to the leader
    • Type: string
    • Default: "HmacSHA256"
  • internal.key.verification.algorithms
    • Purpose: a list of supported algorithms for verifying internal requests that are received by the leader from a follower
    • Type: list
    • Default: "HmacSHA256"

Additionally, although not part of the public API, the PUT /connectors/<name>/tasks endpoint will be effectively disabled for public use. This endpoint should never be called by users, but since until now there hasn't been anything to prevent them from doing so, it should still be noted that anything that relies that endpoint will no longer work after these changes are made. The expected impact of this is low, however; the Connect framework (and the connectors it runs) handle the generation and storage of task configurations and there's no discernible reason for using that endpoint directly instead of going through the public Connect REST API.

Proposed Changes

A new Connect protocol, sessioned, will be implemented that will be identical to the cooperative incremental protocol but with the addition of a session-key field to the assignment schema, which will then be retained by follower workers for use in request signing and by the leader for use in request verification. One downside of this approach is that the use of cooperative incremental assignments will be required in order to enable this new security behavior; however, given the lack of any serious complaints about the new rebalancing protocol thus far, this seems preferable to trying to enable this behavior across both assignment styles. In addition, periodically forcing a rebalance in order to rotate keys would incur a heavy performance penalty on a cluster using eager assignment; this approach isn't really practical in that case.

If the internal.request.verification property is set to true, the worker will advertise this new sessioned protocol to the Kafka group coordinator as a supported (and, currently, most preferable) protocol. If that protocol is then agreed on by the cluster during group coordination, a session key will be randomly generated during each rebalance and distributed by the leader to each follower node. This key will be used by followers to sign requests to the internal endpoint, and verified by the leader to ensure that the request came from a current group member. It is imperative that inter-worker communication have some kind of transport layer security; otherwise, this session key will be leaked during rebalance to anyone who can eavesdrop on request traffic.

Periodically (with frequency dictated by the internal.key.rotation.interval.ms property), the leader will force a rebalance by requesting to rejoin the group and, in the process, compute a new session key and distribute it to each follower worker. The performance impact of these rebalances should be negligible given that all Connect clusters with this new feature will already support incremental cooperative rebalancing. Every time a rebalance occurs, the next scheduled rebalance for key rotation will be reset; that is, if the rotation interval is one hour, and a rebalance occurs thirty minutes after the most recent key rotation, the next key rotation will be rescheduled for one hour after the rebalance, as opposed to remaining at one hour after the most recent rotation.

The default algorithm used to sign requests will be HmacSHA256; this algorithm is guaranteed to be supported on all implementations of the Java Platform (source). However, users will be able to configure their cluster to use other algorithms if, for example, the default is not suitable for compliance with an existing security standard.

Each signed request will include two headers:

  • X-Connect-Authorization: the signature of the request body
  • X-Connect-Key-Algorithm: the key algorithm used to sign the request

The leader will only accept requests signed with the most current key. This should not cause any major problems; if a follower attempts to make a request with an expired key (which should be quite rare and only occur if the request is made during an in-progress rebalance), the initial request will fail, but will be subsequently retried after a backoff period. This backoff period should leave sufficient room for the rebalance to complete. One potential downside is that, should this occur, an error-level log message of "Failed to reconfigure connector's tasks, retrying after backoff: " followed by a stack trace will be generated. This can be mitigated by altering the log message or the generated exception to include a note that this may not be an issue if key rotation is enabled, and/or logging an info-level log message after successfully completing task reconfiguration that potentially includes a note that any above error messages related to task reconfiguration may be safely disregarded.

Compatibility, Deprecation, and Migration Plan

All of the proposed configurations here have default values, making them backwards compatible.

The group coordination protocol will be used to ensure that all workers in a cluster support verification of internal requests before this behavior is enabled; therefore, a rolling upgrade of the cluster will be possible. In line with the regression plan for KIP-415: Incremental Cooperative Rebalancing in Kafka Connect, if it is desirable to disable this behavior for some reason, the internal.request.verification configuration can be set to false for one (or more) workers, and it will automatically be disabled.

If a new signature algorithm should be used, a rolling upgrade will be possible with the following steps (assuming a new algorithm of HmacSHA489):

  1. Add HmacSHA489 to the internal.key.verification.algorithms list for each worker, and restart them one-by-one
  2. Change the internal.key.signature.algorithm property for each worker to HmacSHA489, and restart them one-by-one
  3. (Optional) Remove the old algorithm from the internal.key.verification.algorithms list for each worker, and restart them one-by-one

Rejected Alternatives

Configurable inter-worker headers

Summary: A new worker configuration would be added that would control auth headers used by workers when making requests to the internal endpoint.

Rejected because: The additional complexity of another required configuration would be negative for users; security already isn't simple to implement with Kafka Connect, and requiring just one more thing for them to add should be avoided if possible. Also, the use of static headers isn't guaranteed to cover all potential auth mechanisms, and would require manual rotation by reconfiguring the worker.

Replace endpoint with Kafka topic

Summary: The REST endpoint could be removed entirely and replaced with a Kafka topic. Either an existing internal Connect topic (such as the configs topic) could be used, or a new topic could be added to handle all non-forwarded follower-to-leader communication.

Rejected because: it would require both reworking the Connect group coordination protocol and an internal Connect topic. There would be no clear way to achieve consensus amongst the workers in a cluster on whether to switch to this new behavior without also reworking the group coordination protocol; at that point, we might as well just use the group coordination protocol to distribute keys instead.

Open Questions

  • Will it be necessary to support multiple keys at once, in the event that a follower worker makes a request to the internal endpoint during a rebalance (in which case the follower and worker would be using different keys)? Is this event even possible?
    • The DistributedHerder class appears to retry infinitely when failures are encountered in task reconfiguration. If this happens on a separate thread from (or just doesn't block) the rebalance logic (which would be responsible for updating the key used by the herder) then it's possible this is fine. However, if this happens on the same thread as (and effectively blocks) the rebalance logic, then there will be deadlock as the worker will have to successfully complete the request for task reconfiguration before receiving its new key, and it will have to receive its new key before it can successfully complete the request for task reconfiguration.
  • No labels