Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Switched from a delay-based approach, which uses dedicated throttled fetcher threads, to an inclusion-based approach, with puts throttled and unthrottled replcias in the same request/response

...

Relates to: KIP-13: Quotas  KIP-74: Add Fetch Response Size Limit in Bytes

Mail Thread: here

Revision History

  • 27th Jun 8th Aug 2016: Initial version

  • 10th Aug 2016: Switched from a delay-based approach, which uses dedicated throttled fetcher threads, to an inclusion-based approach, with puts throttled and unthrottled replcias in the same request/response

Motivation

Currently data intensive admin operations like rebalancing partitions, adding a broker, removing a broker or bootstrapping a new machine create an unbounded load on inter-cluster traffic. This affects clients interacting with the cluster when a data movement occurs.

...

This is best understood diagrammatically:

Image RemovedImage Added

 

So there are two quota mechanisms, backed by separate metrics. One on the follower, one on the leader. The leader tracks the rate of requested bytes (LeaderQuotaRate). The follower tracks the throttled bytes allocated to fetch responses for throttled replicas (FollowerQuotaRate).

On the The follower side, a new set of replica fetcher threads are used specifically for throttled replication. When the throttled-replicas list changes, those replicas are added to the ThrottledReplicaFetcher threads, and removed from the regular ReplicaFetcherThreads. Linearisability is ensured by guaranteeing that no single replica is included in more than one in-flight request, across threads. That’s to say, when replicas move from being throttled to not being throttled, or vice versa, we ensure the shift from one thread to another cannot create a reordering.

When the leader receives a fetch request for replicas in its local throttled-replicas list, it checks the proposed response size against the quota (note if a request arrives where only a subset of the partitions are throttled, the whole fetch request will be throttled).

...

responseSize = min(max(fetchRequestSize, max.message.bytes), logRemainingBytes)

quota.recordAndMaybeThrottle(responseSize)

This relates to 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3810
.

If the quota is exhausted a delay is imposed. This reuses the same process, and code, used by the existing client quota mechanism. That’s to say it imposes a delay of:

...

The follower uses a similar mechanism, tracking the bytes that arrive in fetch responses. If the fetch response causes the follower’s quota to be exhausted, the thread is delayed for an appropriate amount of time before the next fetch request is sent. This, again, is similar to the client quota mechanism, except no delay queue is required as the replica fetcher thread can simply be blocked.

This mechanism is optimistic. That’s to say a large cluster could exceed the quota with the first request, only then will a delay be imposed to bring the rate down to the desired throttle value. We address this concern by limiting the fetch request size for throttled replica fetch requests. This uses a new config: replica.fetch.request.max.bytes. This config must be tuned by the admin to ensure that the initial set of requests (a) does not cause the quota to be violated on the first request and (b) will return a response within the configured window. However, the default value, of 10MB, is small enough to support leader quotas over 1MB/s (see Q&A 5 below)

As the total request size is now bounded, fetch size (i.e. at a partition level) is ignored. Instead the fetch response is populated with bytes, one partition at a time, until it is filled up to replica.fetch.request.max.bytes (or max.message.bytes, whichever is larger). This relates to an associated client-side change tracked in KAFKA-2063.

The Invocation Process

The standard dynamic config mechanism is used to define which replicas will be throttled and to what throughput. This is covered by two separate configs:

  1. A list of replicas that should be throttled. This takes the form [partitionId]-[replicaId],[partitionId]-[replica-id]...

  2. The quota for a broker. For example 10MB/s.

The admin sets the throttle value when they initiate a rebalance:

...

The tool, kafka-reassign-partitions.sh, calculates a mapping of topic -> partition-replica for each replica that is either (a) a move origin or (b) a move destination. The union of these are added to the topic level config by the script.

...

When the tool completes all configs are removed from Zookeeper.  

Public Interfaces

FetchRequest

A new field is added to the fetch request to bound the total number of bytes within it. This field will only be used, initially, by replica fetcher threads, which have throttled partitions. There will be no impact to existing consumers.  

...

Fetch Request (Version: 3) => replica_id max_wait_time min_bytes [topics] 
replica_id => INT32
max_wait_time => INT32
min_bytes => INT32
max_bytes => INT32
topics => topic [partitions]
topic => STRING
partitions => partition fetch_offset max_bytes
partition => INT32
fetch_offset => INT64
max_bytes => INT32

makes a requests, using the fixed size of replica.fetch.response.max.bytes as per KIP-74. The order of the partitions in the fetch request are randomised to ensure fairness.

When the leader receives the fetch request it processes the partitions in the defined order, up to the response's size limit. If the inclusion of a partition, listed in the leader's throttled-replicas list, causes the LeaderQuotaRate to be exceeded, that partition is omitted from the response (aka returns 0 bytes). Logically, this is of the form:

var bytesAllowedForThrottledPartition = quota.recordAndMaybeAdjust(bytesRequestedForPartition) 

When the follower receives the fetch response, if it includes partitions in its throttled-partitions list, it increments the FollowerQuotaRate:

var includeThrottledPartitionsInNextRequest: Boolean = quota.recordAndEvaluate(previousResponseThrottledBytes) 

If the quota is exceeded, no throttled partitions will be included in the next fetch request emitted by this replica fetcher thread. 

This mechanism is optimistic. That’s to say a large cluster could exceed the quota with the first request, only then will a throttled partitions be omitted from future requests/responses, to bring the rate down to the desired throttle value. To bound this issue we limit the size of requests to a fixed and configurable bound. This uses a new config, covered in KIP-74: replica.fetch.response.max.bytes. This config must be tuned by the admin to ensure that the initial set of requests (a) does not cause the quota to be violated on the first request and (b) will return a response within the configured window. However, the default value, of 10MB, is small enough to support leader quotas over 1MB/s (see Q&A 5 below). 

The Invocation Process

The standard dynamic config mechanism is used to define which replicas will be throttled and to what throughput. This is covered by two separate configs:

  1. A list of replicas that should be throttled. This takes the form [partitionId]-[replicaId],[partitionId]-[replica-id]...

  2. The quota for a broker. For example 10MB/s.

The admin sets the throttle value when they initiate a rebalance:

kafka-reassign-partitions.sh --execute … --replication-quota 10000

The tool, kafka-reassign-partitions.sh, calculates a mapping of topic -> partition-replica for each replica that is either (a) a move origin or (b) a move destination. The union of these are added to the topic level config by the script.

throttled-replicas = [partId]-[replica], [partId]-[replica]...

When the tool completes all configs are removed from Zookeeper.  

Public Interfaces

FetchRequest

A new field is required in the fetch request to bound the total number of bytes within it. This is covered by KIP-74

The max_bytes field is set by the follower using the property:

...

This change will require an increase in both the the protocol for internal and external fetch requests. Again, it is related to the client-side change KAFKA-2063.

Metrics  

The metrics reuse the Kafka Metrics (rather than Yammer) to be inline (and reuse the windowing functionality of) the existing Client Quotas implementation. This is the list of metrics we add as part of this change:

  • LeaderReplicationThrottledRate: The rate of throttled replication for transmitted bytes from a broker. This value is calculated before any delay is imposed.LeaderThrottleTime: The average throttle delay imposed by the quota mechanism on the replication of throttled partitionsThe rate of throttled replication for transmitted bytes from a broker
  • LeaderDelayQueue: The number of replication requests queued for future publication. 
  • FollowerReplicationThrottledRate: The rate of throttled replication for transmitted bytes into a broker. This value is calculated before any delay is imposed.
  • FollowerThrottleTime: The average delay imposed by the quota mechanism on the replication of throttled partitions. 
  • PartitionBytesInRate: Equivalent to BytesInPerSec, but at a partition level (i.e. total traffic - throttled and not throttled). This is required for estimating how long a rebalance will take to complete. B/s. See usability section below.
  • SumReplicaLag: This is the sum of all replica lag values on the broker. This metric is used to monitor progress of a rebalance and is particularly useful for determining if the rebalance has become stuck due to an overly harsh throttle value (as the metric will stop decreasing).

...

Intuitively this means the throttle must be greater than the write rate, so there is bandwidth for both Catch-Up and Keep-Up traffic. Conversely, the throttle must be low enough to allow inbound user traffic (which will not be throttled) assuming all leaders have rebalanced (think bootstrapping broker as the most extreme case. Here IN/R is the proportion of inbound traffic for these leaders)).

5. Do I need to alter the default value for replica.fetch.

...

response.max.bytes?

Generally you should not need to alter the default fetch request size. The critical factor is that the initial set of requests, for throttled replicas, return in the configured window duration. By default the total window is 11 x 1s and replica.fetch.requestresponse.max.bytes = 10MB. Intuitively the constraint is:

#brokers x replica.fetch.requestresponse.max.bytes / min(QuotaLeader x #brokers, NetworkSpeed) < total-window-size

So, for the majority of use cases, where the sum of leader throttles < NetworkSpeed we can reduce this to:

replica.fetch.requestresponse.max.bytesmax < QuotaLeader x total-window-size

So consider a 5 node cluster, with a leader quota of 1MB/s, and a window of 10s, on a GbE network. The leader's throttle dominates, so the largest permissible replica.fetch.requestresponse.max.bytes would be 1MB/s * 10s = 10MB. Note that this calculation is independent of the number of brokers. However if we had a larger cluster, of 500 nodes, the network on the follower could become the bottleneck. Thus we would need to keep replica.fetch.requestresponse.max.bytes less than total-window-size * NetworkSpeed / #brokers =  10s * 100MB/s / 500 =  2MB.

...

There appear to be two sensible approaches to this problem: (1) omit partitions from fetch requests (follower) / fetch responses (leader) when they exceed their quota (2) delay them, as the existing quota mechanism does, using separate fetchers. Both appear to be valid approaches with slightly different design tradeoffs. The later former was chosen to be more inline with the existing quota implementation. The details of the later are discussed here.

We also considered a more pessimistic approach which quota's the follower's fetch request, then applies an adjustment when the response returns. This mechanism has some advantages, most notably it is conservative, meaning the throttle value will never be exceeded. However, whilst this approach should work, the adjustment process adds some complexity when compared to the optimistic approaches. Thus this proposal was rejected (This is discussed in full here).