Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state: Under SiscussionAdopted

Jira:  KAFKA-1464.   here is no specific config for the number of throttled replica fetcher threads as the config for the number of replica 

Relates to: KIP-13: QuotasKIP-74: Add Fetch Response Size Limit in Bytes

Mail Thread: here

Released: 0.10.1.0

Revision History

...

  • 10th Aug 2016: Switched from a delay-based approach, which uses dedicated throttled fetcher threads, to an inclusion-based approach, with which puts throttled and unthrottled replcias replicas in the same request/response

Motivation

  • 25th Sept 2016: Split throttled replica list into two properties. One for leader side. One for follower side.
  • 30th Sept 2016: Split the quota property into two values, one for the leader and one for the follower. This adds consistency with the replicas property changed previously. 

Motivation

Currently data intensive admin operations like rebalancing Currently data intensive admin operations like rebalancing partitions, adding a broker, removing a broker or bootstrapping a new machine create an unbounded load on inter-cluster traffic. This affects clients interacting with the cluster when a data movement occurs.

...

When the leader receives the fetch request it processes the partitions in the defined order, up to the response's size limit. If the inclusion of a partition, listed in the leader's throttled-replicas list, causes the LeaderQuotaRate to be exceeded, that partition is omitted from the response (aka returns 0 bytes). Logically, this is of the form:

if (throttled(partition))
   var includedInFetchResponse: Boolean = quota.recordAndEvaluate(var bytesAllowedForThrottledPartition = quota.recordAndMaybeAdjust(bytesRequestedForPartition) 

...

The tool, kafka-reassign-partitions.sh, calculates a mapping of topic -> partition-replica for each replica that is either (a) a move origin or (b) a move destination. The union of these are added to the topic level config by the script.A leader throttle is applied to all existing replicas that are moving. A follower throttle is applied to replicas that are being created as part of the reassignment process (i.e. move destinations). There are independent configs for the leader and follower, but this is wrapped in kafka-reassign-partitions.sh so the admin only need be aware of these if they alter them directly via kafka-configs.sh. 

leader.replication.throttled.

throttled-

replicas = [partId]

-

:[replica], [partId]

-

:[replica]...

When the tool completes all configs are removed from Zookeeper.  

Public Interfaces

FetchRequest

follower.replication.throttled.replicas = [partId]:[replica], [partId]:[replica]...

leader.replication.throttled.rate = 1000000

follower.replication.throttled.rate = 1000000

The admin removes the the throttle from Zookeeper by running the --verify command after completion of the rebalance.  

Alternatively each property can be set independently using kafka-configs.sh (see below)

Public Interfaces

FetchRequest

A new field is required in the fetch request to bound the total number of bytes within the fetch responseA new field is required in the fetch request to bound the total number of bytes within it. This is covered by KIP-74

...

  • LeaderReplicationThrottledRate: The rate of throttled replication for transmitted bytes from a broker. LeaderDelayQueue
  • FollowerReplicationThrottledRate: The number  The rate of throttled replication requests queued for future publication. FollowerReplicationThrottledRate: The rate of throttled replication for transmitted bytes into a broker.
  • PartitionBytesInRate: Equivalent to BytesInPerSec, but at a partition level (i.e. total traffic - throttled and not throttled). This is required for estimating how long a rebalance will take to complete. B/s. See usability section below.
  • SumReplicaLag: This is the sum of all replica lag values on the broker. This metric is used to monitor progress of a rebalance and is particularly useful for determining if the rebalance has become stuck due to an overly harsh throttle value (as the metric will stop decreasing).

...

Topic-level dynamic config (these properties cannot be set through the Kafka config file, or on topic creation)

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled-.replicas=[partId]-[replica], [partId]-[replica]...'

--entity-type topic
--entity-name topic-name

...

bin/kafka-configs … --alter
--add-config 'replicationfollower.replication.throttled.replicas=[partId]-[replica], [partId]-[replica]...'

--entity-type topic-quota=10000'
--entity-type broker
--entity-name topic-name brokerId

 

Broker-level dynamic config (these properties cannot be set through the Kafka config file),  unit=B/sWildcard support is also provided for setting a throttle to all replicas:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled-.replicas=*10000'
--entity-type topic

...

broker
--entity-name brokerId

bin/kafka-configs … --alter
--add-config '

replication-quota

follower.replication.throttled.replicas=10000'
--entity-type broker

The ‘replication-quota’ is only applied to ‘throttled-replicas’.

Here we add the concept of a dynamic config, applied at a broker level. This is equivalent, in implementation, to the existing entity-type = client  configuration, but applied at a broker level and available for dynamic change.

NB - whilst it is possible to change throttle configs in this way, there should not be any requirement for admins to use kafka-configs directly when rebalancing partitions, as this will be handled internally within kafka-reassign-partitions.sh. The admin would be required to use this interface to throttle a bootstrapping broker. The mechanism for doing this is described in the Q&A below.

These are reflected in zookeeper via a new Zookeeper path: /config/broker/[broker-id]

...

--entity-name brokerId

 

Wildcard support is also provided for setting a throttle to all replicas:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled.replicas=*'
--entity-type topic

And to set a ubiquitous throttle value to all brokers:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled.rate=10000'
--entity-type broker

The ‘replication-quota’ is only applied to ‘throttled-replicas’.

Here we add the concept of a dynamic config, applied at a broker level. This is equivalent, in implementation, to the existing entity-type = client  configuration, but applied at a broker level and available for dynamic change.

NB - whilst it is possible to change throttle configs in this way, there should not be any requirement for admins to use kafka-configs directly when rebalancing partitions, as this will be handled internally within kafka-reassign-partitions.sh. The admin would be required to use this interface to throttle a bootstrapping broker. The mechanism for doing this is described in the Q&A below.

These are reflected in zookeeper via a new Zookeeper path: /config/broker/[broker-id]

//Sample configuration for throttled replication-quotareplicas
{
"version":1,
"config": {
"replication-quota":"1000000leader.replication.throttled.replicas":"0:0,0:1,0:2,1:0,1:1,1:2"
 }
}
//Sample Changeconfiguration notificationfor forthrottled replication-quotareplicas
{
"version":1,
"entity_pathconfig": "/config/broker/"
}

Inline with client-quota, two configs are provided to control the window used for ThrottledRateIn/Out.  

...

replication.quota.window.num = The number of samples to retain in memory (default 11)

replication.quota.window.size.seconds = The time span of each sample (default 1)

 {
"follower.replication.throttled.replicas":"0:0,0:1,0:2,1:0,1:1,1:2"
 }
}
//Sample configuration for replication-quota
{
"version":1,
"config": {
"replication-quota":"1000000"
}
}
// Change notification for replication-quota
{
"version":1,
"entity_path": "/config/broker/"
}

Inline with client-quota, two configs are provided to control the window used for ThrottledRateIn/Out.  

replication.quota.window.num = The number of samples to retain in memory (default 11)

replication.quota.window.size.seconds = The time span of each sample (default 1)

...

Script: kafka-reassign-partitions.sh

...

Then replicas should move at close to (but no more than) than the quota dictated rate.

 

...

Then replicas should move at close to (but no more than) than ( the quota dictated rate - the inbound rate).

 

Given a three node cluster (p=100, r=2)

When data is moved from one node to the other two nodes

Then replicas should move at close to (but no more than) than the the quota dictated rate.

[repeat with produce requests]

...

Then replicas should move at close to (but no more than) than the quota dictated rate.

[repeat with produce requests]

the quota dictated rate.

[repeat with produce requests]

 

Amendments

 

While testing KIP-73, we found an issue described in https://issues.apache.org/jira/browse/KAFKA-4313. Basically, when there are mixed high-volume and low-volume partitions, when replication throttling is specified, ISRs for those low volume partitions could thrash. KAFKA-4313 fixes this issue by avoiding throttling those replicas in the throttled replica list that are already in sync. Those in-sync replicas traffic will still be accounted for the throttled traffic though.

Q&A

1. How should an admin set the throttle value when rebalancing partitions?

...

MoveTime = MoveRatio x TotalLogSizePerBroker x #Brokers / (replication-quota - max(BytesInPerSec))

If the quota is set aggressively it , compared to the inbound rate,  it is still possible for you to get into a situation where you don’t will never make progress on one or more brokers. Specifically as (replication-quota - max(BytesInPerSec)) -> 0. This could happen if replicas moved in such a way that max(BytesInPerSec) increased or followers became concentrated on a single broker such that their total “keep up” traffic exceeds the throttle value or simply because you have had an unexpected increase in load. This should be relatively rare and is easy to deal with. The administrator monitors a new metric SumReplicaLag. If this stops reducing, before rebalancing completes, then the admin must simply increase the throttle value a little to compensate.

...

replica.fetch.response.max.bytesmax bytes < QuotaLeader x total-window-size

...

There appear to be two sensible approaches to this problem: (1) omit partitions from fetch requests (follower) / fetch responses (leader) when they exceed their quota (2) delay them, as the existing quota mechanism does, using separate fetchers. Both appear to be valid approaches with slightly different design tradeoffs. The former was chosen to be more inline with the existing quota implementationas the underlying code changes are simpler (based on explorations of each). The details of the later are discussed here.

We also considered a more pessimistic approach which quota's the follower's fetch request, then applies an adjustment when the response returns. This mechanism has some advantages, most notably it is conservative, meaning the throttle value will never be exceeded. However, whilst this approach should work, the adjustment process adds some complexity when compared to the optimistic approaches. Thus this proposal was rejected (This is discussed in full here).