Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka Connect suffers from a lack of Quality of Service (QoS) features that are typical of multi-tenant systems. We identify two related problems in this space:

First, This is often described as “the noisy neighbor problem”: one bad-behaving Task can cause issues for other Tasks on the same node or in the same cluster, by exhausting shared resources such as CPU and network bandwidth. An easy way to combat this problem in multi-tenant systems is to enforce quotas on individual tenants, but Connect does not support this.

Second, the "firehose" problem: Connect tends to run full-throttle and can overwhelm downstream systems. This is especially problematic with MirrorMaker, which will try to replicate entire Kafka clusters as quickly as possible.

These problems are unfortunate particularly problematic given Connect’s common use-case: a system for connecting disparate systems, including across regions and cloud providers, often using plugins from third-parties. It is impractical to assume that each such external system has some quota mechanism of its own. Moreover, it is impractical to assume that each Connector implementation handles such external limits gracefully.

For example, at Twitter, we have observed Connectors that “retry storm” external systems when they reach some saturation point, e.g. a network quota. This makes a bad situation much worse: as various Connectors approach some collective saturation point, one bad-behaving Connector can push all the others over the limit.

As a secondary concern, we want Connect to be useful as the arbiter of bandwidth between sources/sinks, including between regions and cloud providers. This is a natural use-case for Connect in theory, but Connect does not include the required QoS features.

Finally, we We have found that Kafka’s own quota mechanism is insufficient to solve these problems for us. This is because the limits we are trying to honor are often in external systems -- not Kafka. For example, if we want to limit total throughput to some cloud region, a specific on-prem Kafka broker’s per-topic and per-client quotas are largely irrelevant.

For this KIP, we focus on the simplest limit that commonly affects external systems: throughput in terms of records-per-second. Additionally, we mitigate bad-behaving Tasks with max poll and put (send) rates.

Public Interfaces

Public Interfaces

We propose the following new public interface:

Code Block
/**
 * Throttles ConnectRecords based on configurable rate limits.
 *
 */
public interface RateLimiter<R extends ConnectRecord> extends Closeable, Configurable {

    /**
     * Initialize and start rate-limiting based on the provided Time source.
     *
     */
    void start(Time time);

    /**
     * Account for the given recent record batch when subsequently throttling.
     */
    void accumulate(Collection<R> records);

    /**
     * How long to pause (throttle) in order to achieve the desired throughput.
     *
     */
    long throttleTime();

    /** Configuration specification for this RateLimiter */
    ConfigDef config();

    /** Signal that this instance will no longer will be used. */
    @Override
    void close();
}


Additionally, we We propose to add the following configuration properties and metrics to Kafka Connect.

...

These apply to all Connectors (same as `name`, `tasks.max`, etc).

  • rate.limiters: list of enabled RateLimiters (class names). Defaults to all baked-in RateLimiters: RecordRateLimiter, RecordBatchRateLimiter.
  • task.record.rate.limit: max : Max records-per-second throughput allowed for any individual Task created by for each of this Connector

Source Connector Configuration

  • 's Tasks.
  • record.batchtask.poll.rate.max: Max pollslimit: max record-batches-per-second allowed for any individual Task created by this Connector

Sink Connector Configuration

  • task.put.rate.max: Max puts/sends-per-second allowed for any individual Task created by this Connectorthroughput for each of this Connector's Tasks.

Task Metrics

  • record-rate-avg
  • record-rate-max
  • throttled-ratio: Fraction of time spent throttling for any reason

...

Proposed Changes

We propose to expose and enforce the above new limits when provided. This will involve the Worker tracking record throughput for each Task and introducing artificial pauses as necessary.

Where possible, the Worker will pause only for the minimum amount of time required to avoid overstepping any limit. This means that operating at one or more limits will not have detrimental effects to overall performance of a Connect cluster. In other words, we do not penalize Tasks with long pauses when they hit a threshold, but instead allow them to operate as close to their limits as possible. This is necessary for our goal of using Connect as an “arbiter of bandwidth” between various sources/sinks.

This is in contrast to the existing backoff/retry mechanism for failures. Instead of penalizing Tasks that throw an exception (which, conceivably, could be due to some external quota limit), we limit how frequently poll/put is called, regardless of success or failure. This helps us to operate bad-behaving Connectors without starving other Tasks, and without inflicting “retry storm” on external systems.

a new pluggable interface (RateLimiter) which enables operators to control throughput on a per-Task basis. Two built-in RateLimiters (RecordRateLimiter, RecordBatchRateLimiter) will provide basic functionality suitable for most use-cases.

By default, both built-in RateLimiters will be enabled but effectively no-ops unless `record.rate.limit` or `record.batch.rate.limit` are configured.

Additional custom RateLimiters can be enabled by implementing the RateLimiter interface and including related jars in the Connect plugin path (same as custom Connectors, SMTs, etc)We do not consider these to be "hard" limits – it's possible to poll() more than task.record.rate.max records in a given second. Instead, we use past record rate to predict how much to stall before the next poll()/put(), and thus approach the limit at steady-state.

Compatibility, Deprecation, and Migration Plan

...

The proposed rate limiting will not step on the existing backoff/retry mechanism for failures, though these may interact to some degree. For example (an extreme case), if a SinkTask is limited to one put batch per hour, this will supersede a backoff/retry max delay of 1 minute.

Future Work

These changes will help us operate Connect at scale, but additional work is required to ensure QoS in centralized Connect clusters. Some ideas here:

  • limit throughput in terms of bytes
  • limit other API method call rate (flush, commit, etc)
  • prioritize some tasks over others
  • stall source tasks that return zero records
  • Connector-wide and cluster-wide limits

Rejected Alternatives

  • We could attempt to rate-limit throughput based on bytes instead of record count. This would be more immediately useful for operating Connect, but there are some thorny issues with tracking byterate, since Connect doesn’t have any view into the actual bytes sent to external systems. We anticipate that future work will address this. In the meantime, record rate is a good proxy for overall throughput.
  • We could introduce an SMT to provide rate-limiting without modifying the Worker. This is slightly less user-friendly and limits what we can expose via metrics. For example, we want to ensure that `throttled-ratio` is emitted even if a rate-limiting SMT is not enabled for some Connector. Additionally, any pauses introduced via SMTs would be additive, but a collection of RateLimiters should only sleep by the max throttleTime returned from each RateLimiter.
  • We could enforce limits across an entire Connector, instead of on a per-Task basis. This would require more coordination between Tasks than exists today. Given that tasks.max is already required, we think it is reasonable to expect an operator to multiply per-task limits by tasks.max to arrive at an approximate Connector-wide limit. However, we acknowledge that future work could make this friendlier.
  • We could divide these limits by the number of actual tasks in a Connector in order to approximate Connector-wide limits. For example, a limit of 100 records/second with 20 tasks could yield 5 records/second per task. However, this assumes that workloads are perfectly balanced across tasks, which is often not the case in practice. So instead, we don’t address Connector-wide limits for now, with the expectation that task-level limits are good enough. Again, we acknowledge that future work could improve this.
  • We could leverage the existing Connector and Task state machine to automatically pause Connectors or Tasks that overstep a limit. However, we feel this is way too heavy-handed for throttling at high-frequency.
  • We could throttle at very low frequency, e.g. by pausing a task for a long period of time if it oversteps a limit. This helps mitigate bad-behaving Tasks, but does not help us control the throughput of well-behaving Tasks operating near some external quota.
  • We could just use broker quotas to limit throughput. However, broker quotas are meant to protect throughput to/from a specific broker, not an entire external system. It’s much easier to reason about limiting bytes between Sources and Sinks.
  • We could build backpressure and rate limiting into every Connector implementation we operate. This would work, but we feel all Connectors should benefit from these features out-of-the-box.
  • We could drop the pluggable interface. This was my plan at first, but we foresee the need to implement custom RateLimiters, e.g. to better integrate with external control planes, or to provide more dynamic or adaptive rate limiting.