Status
Current state: Under Discussion
Discussion thread: here
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
If partition reassignment involves a lot of replicas, then it could put too much overhead on the brokers.
Say you have a replication factor of 4 and you trigger a reassignment which moves all replicas to new brokers. Now 8 replicas are fetching at the same time which means you need to account for 8 times the current producer load plus the catch-up replication. To make matters worse, the replicas won't all become in-sync at the same time; in the worst case, you could have 7 replicas in-sync while one is still catching up. Currently, the old replicas won't be disabled until all new replicas are in-sync. This makes configuring the throttle tricky since ISR traffic is not subject to it.
Rather than trying to bring all 4 new replicas online at the same time, a friendlier approach would be to do it incrementally: bring one replica online, bring it in-sync, then remove one of the old replicas. Repeat until all replicas have been changed. This would reduce the impact of a reassignment and make configuring the throttle easier at the cost of a slower overall reassignment.
Furthermore since the controller has a good knowledge about the cluster it makes sense to improve its reassignment feature to allow to internally batch the given reassignment. Therefore this KIP aims to change the controller to accommodate the internal batching rather than adding a new tool.
Public Interfaces
Two configs will be added.
Config name | Type | Default | Valid values | Importance | Dynamic update mode |
---|---|---|---|---|---|
reassignment.parallel.leader.movements | int | Int.MAX | [1,...] | medium | cluster-wide |
reassignment.parallel.partition.count | int | Int.MAX | [1,...] | medium | cluster-wide |
reassignment.parallel.replica.count | int | Int.MAX | [1,...] | medium | cluster-wide |
Proposed Changes
As explained above, the goal would be to incrementally add new partitions, a batch at a time to avoid putting much pressure on the brokers. The only exception is the first step where (if needed) we add that many replicas that is enough to fulfil the min.insync.replicas requirement set on the broker. For instance if there are 4 brokers, min.insync.replicas set to 3 but there are only 1 in-sync replica, then we immediately add 2 other in one step, so the producers are able to continue.
Furthermore in the first step we'll elect the new preferred leader (if the reassignment requires it) to unload pressure from the current leader.
The two configs are aiming to control batching on partition and topic levels. We practically default to the current behaviour to remain backward compatible, although as a future work it might make sense to lower the defaults based on feedback.
Batches will be created based on the input reassignment. That means that the order of the input could be used to have some control over how batches are created. For instance in case of a reassignment for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we would form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and would execute the reassignment in these increments. For multiple partitions it would work in a similar fashion but the reassignment.parallel.replica.count would control how many replicas of that partition can be reassigned concurrently. On top of these we would control how many leaders could be reassigned in parallel. That means that after we calculated the possible reassignment steps we disallow those which would involve leader movement over the limit and instead if possible add reassignments that involve no leader movement. It might be possible that we can't fill their place and we won't fill reassignment.parallel.partition.count.
As an addition these values could be changed dynamically to somewhat "throttle" the reassignment. It might be better to throttle certain reassignment on a much more advanced way but it could also exceed the scope of this KIP.
Algorithm
Calculating a Reassignment Step
- For calculating a reassignment step, always the final target replica (FTR) set and the current replica (CR) set is used.
- Calculate the replicas to be dropped (DR):
- Calculate
n = max(reassignment.parallel.replica.count, size(FTR) - size(CR))
- Filter those replicas from CR which are not in FTR, this is the excess replica (ER) set
- Take the first reassignment.parallel.replica.count replicas of ER, that will be the set of dropped replicas
- Calculate
- Calculate the new replicas (NR) to be added
- Calculate that if the partition has enough online replicas to fulfil the min.insync.replicas config so the producers are able to continue.
- If the preferred leader is different in FTR and it is not yet reassigned, then add it to the NR
- If
size(NR) < min.insync.replicas
then takemin(min.insync.replicas, reassignment.parallel.replica.count) - size(NR)
replicas from FTR - Otherwise take as many replica as reassignment.parallel.replica.count allows
- Create the target replica (TR) set: CR + NR - DR
- If this is the last step, then order the replicas as specified by FTR. This means that the last step is always equals to FTR
Performing a Reassignment Step
Performing a reassignment step is somewhat similar in big picture to the currently existing algorithm.
- Calculate the next reassignment step.
- Wait until this step is not identical to the current assignment and there is at least one replica ISR.
- The calculation would result in identical steps if it's not able to add new replicas to the TR set. This could be because target brokers are offline.
- We are not able to reassign partitions if at least one replica is not in-sync.
- Set all new replicas to OnlineReplica when they are in ISR (if this is not the first increment we're in).
- Update CR in Zookeeper (/kafka/brokers/topics/{topic}) with TR for the given partition.
- Send LeaderAndIsr requests to all replicas in CR+NR.
- Start new replicas in NR by moving them into the NewReplica state.
- Set CR to TR in memory.
Send LeaderAndIsr request with a potential new leader (if current leader not in TR) and a new CR (using TR) and same isr to every broker in TR
Replicas in DR -> Offline (force those replicas out of isr)
Replicas in DR -> NonExistentReplica (force those replicas to be deleted)
Update the /admin/reassign_partitions path in ZK to remove this partition if the reassignment is completed.
Example
The following code block shows how a transition happens from (0, 1, 2, 3, 4)
into (5, 6, 7, 8, 9)
where the initial leader is 0.
(0, 1, 2, 3, 4) // starting assignment | (5, 0, 1, 2, 3, 4) // +5, new leader (5) is elected | (5, 6, 2, 3, 4) // -[0,1] +[6] | (5, 6, 7, 8, 4) // -[2,3] +[7,8] | (5, 6, 7, 8, 9) // -[4] +[9], requested order is matched, reassignment finished
Compatibility, Deprecation, and Migration Plan
Since these changes won't affect any public interfaces, neither Zookeeper, there will be no compatibility issues.
Test Plan
- The existing unit tests will be parameterized so they would run with both modes
- Extra unit tests will be added to cover those cases that are not covered with the existing tests
- Ducktape tests would be parameterized to run with both modes
- If needed, extra ducktapes could be added to cover cases that are needed
Future Work
Replication Throttling
It would be useful to give an upper cap on the bandwidth of the replication so users won't overwhelm their cluster. This throttle could be controlled overall for all partition and perhaps it would make sense to do it on a per partition basis and only specify a max capacity at the overall level. KIP-73 covers some related tasks but that isn't specifically tailored strictly to reassignment but a bit more general.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.