Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state: Under SiscussionAdopted

Jira:  KAFKA-1464.   here is no specific config for the number of throttled replica fetcher threads as the config for the number of replica 

Relates to: KIP-13: QuotasKIP-74: Add Fetch Response Size Limit in Bytes

Mail Thread: here

Released: 0.10.1.0

Revision History

  • 8th Aug 2016: Initial version

  • 10th Aug 2016: Switched from a delay-based approach, which uses dedicated throttled fetcher threads, to an inclusion-based approach, which puts throttled and unthrottled replicas in the same request/response

Motivation

  • 25th Sept 2016: Split throttled replica list into two properties. One for leader side. One for follower side.
  • 30th Sept 2016: Split the quota property into two values, one for the leader and one for the follower. This adds consistency with the replicas property changed previously. 

Motivation

Currently data intensive admin operations like rebalancing partitions, adding a broker, removing a broker or bootstrapping a new machine create an unbounded load on inter-cluster traffic. This affects clients interacting with the cluster when a data movement occurs.

...

The tool, kafka-reassign-partitions.sh, calculates a mapping of topic -> partition-replica for each replica that is either (a) a move origin or (b) a move destination. The union of these are added to the topic level config by the script.A leader throttle is applied to all existing replicas that are moving. A follower throttle is applied to replicas that are being created as part of the reassignment process (i.e. move destinations). There are independent configs for the leader and follower, but this is wrapped in kafka-reassign-partitions.sh so the admin only need be aware of these if they alter them directly via kafka-configs.sh. 

leader.replication.throttled.

throttled-

replicas = [partId]

-

:[replica], [partId]

-

:[replica]...

When the tool completes all configs are removed from Zookeeper.  

Public Interfaces

FetchRequest

follower.replication.throttled.replicas = [partId]:[replica], [partId]:[replica]...

leader.replication.throttled.rate = 1000000

follower.replication.throttled.rate = 1000000

The admin removes the the throttle from Zookeeper by running the --verify command after completion of the rebalance.  

Alternatively each property can be set independently using kafka-configs.sh (see below)

Public Interfaces

FetchRequest

A new field is required in the A new field is required in the fetch request to bound the total number of bytes within itthe fetch response. This is covered by KIP-74

...

  • LeaderReplicationThrottledRate: The rate of throttled replication for transmitted bytes from a broker. 
  • LeaderOverThrottledRate: The difference between the throttled rate calculated by the leader, when partitions the response is formed from the log, and the rate actually sent to the follower.  
  • FollowerReplicationThrottledRate: The rate of throttled replication for FollowerReplicationThrottledRate: The rate of throttled replication for transmitted bytes into a broker.
  • PartitionBytesInRate: Equivalent to BytesInPerSec, but at a partition level (i.e. total traffic - throttled and not throttled). This is required for estimating how long a rebalance will take to complete. B/s. See usability section below.
  • SumReplicaLag: This is the sum of all replica lag values on the broker. This metric is used to monitor progress of a rebalance and is particularly useful for determining if the rebalance has become stuck due to an overly harsh throttle value (as the metric will stop decreasing).

Config & Zookeeper

Topic-level dynamic config (these properties cannot be set through the Kafka config file, or on topic creation)

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled-.replicas=[partId]-[replica], [partId]-[replica]...'

--entity-type topic
--entity-name topic-name

Broker-level dynamic config,  unit=B/s

bin/kafka-configs … --bin/kafka-configs … --alter
--add-config 'follower.replication-quota=10000.throttled.replicas=[partId]-[replica], [partId]-[replica]...'

--entity-type brokertopic
--entity-name brokerId

Wildcard support is also provided for setting a throttle to all replicas:

topic-name

 

Broker-level dynamic config (these properties cannot be set through the Kafka config file),  unit=B/s

bin/kafka-configs bin/kafka-configs … --alter
--add-config 'leader.replication.throttled-.replicas=*10000'
--entity-type topic

...

broker
--entity-name brokerId

bin/kafka-configs … --alter
--add-config '

replication-quota

follower.replication.throttled.replicas=10000'
--entity-type broker

The ‘replication-quota’ is only applied to ‘throttled-replicas’.

Here we add the concept of a dynamic config, applied at a broker level. This is equivalent, in implementation, to the existing entity-type = client  configuration, but applied at a broker level and available for dynamic change.

NB - whilst it is possible to change throttle configs in this way, there should not be any requirement for admins to use kafka-configs directly when rebalancing partitions, as this will be handled internally within kafka-reassign-partitions.sh. The admin would be required to use this interface to throttle a bootstrapping broker. The mechanism for doing this is described in the Q&A below.

These are reflected in zookeeper via a new Zookeeper path: /config/broker/[broker-id]

--entity-name brokerId

 

Wildcard support is also provided for setting a throttle to all replicas:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled.replicas=*'
--entity-type topic

And to set a ubiquitous throttle value to all brokers:

bin/kafka-configs … --alter
--add-config 'leader.replication.throttled.rate=10000'
--entity-type broker

The ‘replication-quota’ is only applied to ‘throttled-replicas’.

Here we add the concept of a dynamic config, applied at a broker level. This is equivalent, in implementation, to the existing entity-type = client  configuration, but applied at a broker level and available for dynamic change.

NB - whilst it is possible to change throttle configs in this way, there should not be any requirement for admins to use kafka-configs directly when rebalancing partitions, as this will be handled internally within kafka-reassign-partitions.sh. The admin would be required to use this interface to throttle a bootstrapping broker. The mechanism for doing this is described in the Q&A below.

These are reflected in zookeeper via a new Zookeeper path: /config/broker/[broker-id]

//Sample configuration for throttled replicas
{
"version":1,
"config": {
"leader.replication.throttled.replicas":"0:0,0:1,0:2,1:0,1:1,1:2"
 }
}
//Sample configuration for throttled replicas
{
"version":1,
"config": {
"follower.replication.throttled.replicas":"0:0,0:1,0:2,1:0,1:1,1://Sample configuration for throttled replicas
{
"version":1,
"config": {
"Throttled-replicas":"0-0,0-1,0-2,1-0,1-1,1-2"
 }
}
//Sample configuration for replication-quota
{
"version":1,
"config": {
"replication-quota":"1000000"
}
}

...

replication.quota.window.num = The number of samples to retain in memory (default 11)

replication.quota.window.size.seconds = The time span of each sample (default 1)

There is no specific config for the number of throttled replica fetcher threads as the config for the number of replica fetcher threads (num.replica.fetchers) is reused.

Script: kafka-reassign-partitions.sh

This will be altered to add an additional, optional parameter:

...

Script: kafka-reassign-partitions.sh

This will be altered to add an additional, optional parameter:

kafka-reassign-partitions.sh --execute … --replication-quota 1000

Where the replication-quota is: the maximum bandwidth, in B/s, allocated to moving replicas. This parameter is only valid when used in conjunction with --execute. If omitted, no quota will be set.  

In addition the kafka-reassign-partitions script will include in its output (from --generate option only) a MoveRatio:

MoveRatio = #partitions-to-be-moved / #total-partition-count

This is simply a number which the admin can use to estimate how long a rebalance will take. See the Q&A section below.  

Test Plan

System tests to include:

Given a static two node cluster (p=100, r=2)

When data is moved from one node to the other

Then replicas should move at the quota dictated rate.

 

Given a two node cluster with incoming produce requests (p=100, r=2)

When data is moved from one node to the other

Then replicas should move at the quota dictated rate - the inbound rate.

 

Given a three

Where the replication-quota is: the maximum bandwidth, in B/s, allocated to moving replicas. This parameter is only valid when used in conjunction with --execute. If omitted, no quota will be set.  

In addition the kafka-reassign-partitions script will include in its output (from --generate option only) a MoveRatio:

...

MoveRatio = #partitions-to-be-moved / #total-partition-count

This is simply a number which the admin can use to estimate how long a rebalance will take. See the Q&A section below.  

Test Plan

System tests to include:

Given a static two node cluster (p=100, r=2)

When data is moved from one node to the other two nodes

Then replicas should move at close to (but no more than) than the quota dictated rate.

[repeat with produce requests]

 

Given a two three node cluster with incoming produce requests (p=100, r=2)

When data is moved from one node to the othertwo nodes to a single node

Then replicas should move at close to (but no more than) than (at the quota dictated rate - the inbound rate).

[repeat with produce requests]

 

...

Amendments

 

While testing KIP-73, we found an issue described in https://issues.apache.org/jira/browse/KAFKA-4313. Basically, when there are mixed high-volume and low-volume partitions, when replication throttling is specified, ISRs for those low volume partitions could thrash. KAFKA-4313 fixes this issue by avoiding throttling those replicas in the throttled replica list that are already in sync. Those in-sync replicas traffic will still be accounted for the throttled traffic though.

Given a three node cluster (p=100, r=2)

When data is moved from one node to the other two nodes

Then replicas should move at close to (but no more than) than the quota dictated rate.

[repeat with produce requests]

 

Given a three node cluster (p=100, r=2)

When data is moved from two nodes to a single node

Then replicas should move at close to (but no more than) than the quota dictated rate.

[repeat with produce requests]

Q&A

1. How should an admin set the throttle value when rebalancing partitions?

...

MoveTime = MoveRatio x TotalLogSizePerBroker x #Brokers / (replication-quota - max(BytesInPerSec))

If the quota is set aggressively it , compared to the inbound rate,  it is still possible for you to get into a situation where you don’t will never make progress on one or more brokers. Specifically as (replication-quota - max(BytesInPerSec)) -> 0. This could happen if replicas moved in such a way that max(BytesInPerSec) increased or followers became concentrated on a single broker such that their total “keep up” traffic exceeds the throttle value or simply because you have had an unexpected increase in load. This should be relatively rare and is easy to deal with. The administrator monitors a new metric SumReplicaLag. If this stops reducing, before rebalancing completes, then the admin must simply increase the throttle value a little to compensate.

...

replica.fetch.response.max.bytesmax bytes < QuotaLeader x total-window-size

...

There appear to be two sensible approaches to this problem: (1) omit partitions from fetch requests (follower) / fetch responses (leader) when they exceed their quota (2) delay them, as the existing quota mechanism does, using separate fetchers. Both appear to be valid approaches with slightly different design tradeoffs. The former was chosen to be more inline with the existing quota implementationas the underlying code changes are simpler (based on explorations of each). The details of the later are discussed here.

...