Status

Current stateUnder Discussion

Discussion thread: here

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect suffers from a lack of Quality of Service (QoS) features that are typical of multi-tenant systems. We identify two related problems in this space:

First, “the noisy neighbor problem”: one bad-behaving Task can cause issues for other Tasks on the same node or in the same cluster, by exhausting shared resources such as CPU and network bandwidth. An easy way to combat this problem in multi-tenant systems is to enforce quotas on individual tenants, but Connect does not support this.

Second, the "firehose" problem: Connect tends to run full-throttle and can overwhelm downstream systems. This is especially problematic with MirrorMaker, which will try to replicate entire Kafka clusters as quickly as possible.

These problems are unfortunate given Connect’s common use-case: a system for connecting disparate systems, including across regions and cloud providers, often using plugins from third-parties. It is impractical to assume that each such external system has some quota mechanism of its own. Moreover, it is impractical to assume that each Connector implementation handles such external limits gracefully.

We have found that Kafka’s own quota mechanism is insufficient to solve these problems for us. This is because the limits we are trying to honor are often in external systems -- not Kafka. For example, if we want to limit total throughput to some cloud region, a specific on-prem Kafka broker’s per-topic and per-client quotas are largely irrelevant.

Public Interfaces

We propose the following new public interface:

/**
 * Throttles ConnectRecords based on configurable rate limits.
 *
 */
public interface RateLimiter<R extends ConnectRecord> extends Closeable, Configurable {

    /**
     * Initialize and start rate-limiting based on the provided Time source.
     *
     */
    void start(Time time);

    /**
     * Account for the given recent record batch when subsequently throttling.
     */
    void accumulate(Collection<R> records);

    /**
     * How long to pause (throttle) in order to achieve the desired throughput.
     *
     */
    long throttleTime();

    /** Configuration specification for this RateLimiter */
    ConfigDef config();

    /** Signal that this instance will no longer will be used. */
    @Override
    void close();
}


Additionally, we propose to add the following configuration properties and metrics to Kafka Connect.

Connector Configuration

These apply to all Connectors (same as `name`, `tasks.max`, etc).

Task Metrics

Worker Metrics

Proposed Changes

We propose a new pluggable interface (RateLimiter) which enables operators to control throughput on a per-Task basis. Two built-in RateLimiters (RecordRateLimiter, RecordBatchRateLimiter) will provide basic functionality suitable for most use-cases.

By default, both built-in RateLimiters will be enabled but effectively no-ops unless `record.rate.limit` or `record.batch.rate.limit` are configured.

Additional custom RateLimiters can be enabled by implementing the RateLimiter interface and including related jars in the Connect plugin path (same as custom Connectors, SMTs, etc).

Compatibility, Deprecation, and Migration Plan

Defaults for these limits will be MAX_DOUBLE. Existing configuration will continue to work as it does today, unless these limits are changed to smaller values.

The proposed rate limiting will not step on the existing backoff/retry mechanism for failures, though these may interact to some degree. For example (an extreme case), if a SinkTask is limited to one batch per hour, this will supersede a backoff/retry max delay of 1 minute.

Rejected Alternatives