Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Relates to: KIP-13: QuotasKIP-74: Add Fetch Response Size Limit in Bytes

Mail Thread: here

Released: 0.10.1.0

Revision History

  • 10th Aug 2016: Switched from a delay-based approach, which uses dedicated throttled fetcher threads, to an inclusion-based approach, which puts throttled and unthrottled replicas in the same request/response

Motivation

  • 25th Sept 2016: Split throttled replica list into two properties. One for leader side. One for follower side.
  • 30th Sept 2016: Split the quota property into two values, one for the leader and one for the follower. This adds consistency with the replicas property changed previously. 

Motivation

Currently data intensive admin operations like rebalancing partitions, adding a broker, removing a Currently data intensive admin operations like rebalancing partitions, adding a broker, removing a broker or bootstrapping a new machine create an unbounded load on inter-cluster traffic. This affects clients interacting with the cluster when a data movement occurs.

...

The tool, kafka-reassign-partitions.sh, calculates a mapping of topic -> partition-replica for each replica that is either (a) a move origin or (b) a move destination. The union of these are added to the topic level config by the script.A leader throttle is applied to all existing replicas that are moving. A follower throttle is applied to replicas that are being created as part of the reassignment process (i.e. move destinations). There are independent configs for the leader and follower, but this is wrapped in kafka-reassign-partitions.sh so the admin only need be aware of these if they alter them directly via kafka-configs.sh. 

leader.replication.throttled.

throttled-

replicas = [partId]:[replica], [partId]:[replica]...

When the tool completes all configs are removed from Zookeeper.  

Public Interfaces

FetchRequest

follower.replication.throttled.replicas = [partId]:[replica], [partId]:[replica]...

leader.replication.throttled.rate = 1000000

follower.replication.throttled.rate = 1000000

The admin removes the the throttle from Zookeeper by running the --verify command after completion of the rebalance.  

Alternatively each property can be set independently using kafka-configs.sh (see below)

Public Interfaces

FetchRequest

A new field is required in the fetch request to bound the A new field is required in the fetch request to bound the total number of bytes within the fetch response. This is covered by KIP-74

...

Topic-level dynamic config (these properties cannot be set through the Kafka config file, or on topic creation)

bin/kafka-configs bin/kafka-configs … --alter
--add-config 'leader.replication.throttled-.replicas=[partId]-[replica], [partId]-[replica]...'

--entity-type topic
--entity-name topic-name

Broker-level dynamic config,  unit=B/s

bin/kafka-configs … --alter
--add-config 'follower.replication-quota=10000.throttled.replicas=[partId]-[replica], [partId]-[replica]...'

--entity-type brokertopic
--entity-name brokerIdtopic-name

 

Broker-level dynamic config (these properties cannot be set through the Kafka config file),  unit=B/sWildcard support is also provided for setting a throttle to all replicas:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled-.replicas=*10000'
--entity-type topic

And to set a ubiquitous throttle value to all brokers:

broker
--entity-name brokerId

bin/kafka-

bin/kafka-

configs … --alter
--add-config '

replication-quota

follower.replication.throttled.replicas=10000'
--entity-type broker

The ‘replication-quota’ is only applied to ‘throttled-replicas’.

Here we add the concept of a dynamic config, applied at a broker level. This is equivalent, in implementation, to the existing entity-type = client  configuration, but applied at a broker level and available for dynamic change.

NB - whilst it is possible to change throttle configs in this way, there should not be any requirement for admins to use kafka-configs directly when rebalancing partitions, as this will be handled internally within kafka-reassign-partitions.sh. The admin would be required to use this interface to throttle a bootstrapping broker. The mechanism for doing this is described in the Q&A below.

...

--entity-name brokerId

 

Wildcard support is also provided for setting a throttle to all replicas:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled.replicas=*'
--entity-type topic

And to set a ubiquitous throttle value to all brokers:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled.rate=10000'
--entity-type broker

The ‘replication-quota’ is only applied to ‘throttled-replicas’.

Here we add the concept of a dynamic config, applied at a broker level. This is equivalent, in implementation, to the existing entity-type = client  configuration, but applied at a broker level and available for dynamic change.

NB - whilst it is possible to change throttle configs in this way, there should not be any requirement for admins to use kafka-configs directly when rebalancing partitions, as this will be handled internally within kafka-reassign-partitions.sh. The admin would be required to use this interface to throttle a bootstrapping broker. The mechanism for doing this is described in the Q&A below.

These are reflected in zookeeper via a new Zookeeper path: /config/broker/[broker-id]

//Sample configuration for throttled replicas
{
"version":1,
"config": {
"leader.replication.throttled.replicas":"0:0,0:1,0:2,1:0,1:1,1:2"
 }
}
//Sample configuration for throttled replicas
{
"version":1,
"config": {
"follower.replication.throttled.replicas":"0:0,0:1,0:2,1:0,1:1,1:2"
 }
}
//Sample configuration for throttled replicasreplication-quota
{
"version":1,
"config": {
"throttledreplication-replicasquota":"0:0,0:1,0:2,1:0,1:1,1:2"
}
}
//Sample configuration for replication-quota
{
"version":1,
"config": {
"replication-quota":"1000000"
}
}
// Change notification for replication-quota
{
"version":1,
"entity_path": "/config/broker/"
}

Inline with client-quota, two configs are provided to control the window used for ThrottledRateIn/Out.  

...

replication.quota.window.num = The number of samples to retain in memory (default 11)

replication.quota.window.size.seconds = The time span of each sample (default 1)

"1000000"
}
}
// Change notification for replication-quota
{
"version":1,
"entity_path": "/config/broker/"
}

Inline with client-quota, two configs are provided to control the window used for ThrottledRateIn/Out.  

replication.quota.window.num = The number of samples to retain in memory (default 11)

replication.quota.window.size.seconds = The time span of each sample (default 1)

Script: kafka-reassign-partitions.sh

This will be altered to add an additional, optional parameter:

kafka-reassign-partitions.sh --execute … --replication-quota 1000

Where the replication-quota is: the maximum bandwidth, in B/s, allocated to moving replicas. This parameter is only valid when used in conjunction with --execute. If omitted, no quota will be set.  

In addition the kafka-reassign-partitions script will include in its output (from --generate option only) a MoveRatio:

MoveRatio = #partitions-to-be-moved / #total-partition-count

This is simply a number which the admin can use to estimate how long a rebalance will take. See the Q&A section below.  

Test Plan

System tests to include:

Given a static two node cluster (p=100, r=2)

When data is moved from one node to the other

Then replicas should move at the quota dictated rate.

 

Given a two node cluster with incoming produce requests (p=100, r=2)

When data is moved from one node to the other

Then replicas should move at the quota dictated rate - the inbound rate.

 

Given a three

Script: kafka-reassign-partitions.sh

This will be altered to add an additional, optional parameter:

...

Where the replication-quota is: the maximum bandwidth, in B/s, allocated to moving replicas. This parameter is only valid when used in conjunction with --execute. If omitted, no quota will be set.  

In addition the kafka-reassign-partitions script will include in its output (from --generate option only) a MoveRatio:

...

MoveRatio = #partitions-to-be-moved / #total-partition-count

This is simply a number which the admin can use to estimate how long a rebalance will take. See the Q&A section below.  

Test Plan

System tests to include:

Given a static two node cluster (p=100, r=2)

When data is moved from one node to the other two nodes

Then replicas should move at the quota dictated rate.

[repeat with produce requests]

 

Given a two three node cluster with incoming produce requests (p=100, r=2)

When data is moved from one node to the othertwo nodes to a single node

Then replicas should move at the quota dictated rate - the inbound rate.

 

Given a three node cluster (p=100, r=2)

When data is moved from one node to the other two nodes

Then replicas should move at the quota dictated rate.

[repeat with produce requests]

 

Given a three node cluster (p=100, r=2)

When data is moved from two nodes to a single node

Then replicas should move at the quota dictated rate.

...

.

[repeat with produce requests]

 

Amendments

 

While testing KIP-73, we found an issue described in https://issues.apache.org/jira/browse/KAFKA-4313. Basically, when there are mixed high-volume and low-volume partitions, when replication throttling is specified, ISRs for those low volume partitions could thrash. KAFKA-4313 fixes this issue by avoiding throttling those replicas in the throttled replica list that are already in sync. Those in-sync replicas traffic will still be accounted for the throttled traffic though.

Q&A

1. How should an admin set the throttle value when rebalancing partitions?

...