Status
Current state: Accepted
Discussion thread: link
JIRA: KAFKA-2273
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Changes introduced in this KIP could potentially be superseded by changes to consumer rebalancing protocol. |
There was a bug in the partition assignment algorithm as described in this KIP that was fixed in KIP-341. |
In certain circumstances the round robin assignor, which produces better assignments compared to range assignor, fails to produce an optimal and balanced assignment of topic partitions to consumers. KIP-49 touches on some of these circumstances. In addition, when a reassignment occurs, none of the existing strategies consider what topic partition assignments were before reassignment, as if they are about to perform a fresh assignment. Preserving the existing assignments could reduce some of the overheads of a reassignment. For example, Kafka consumers retain pre-fetched messages for partitions assigned to them before a reassignment. Therefore, preserving the partition assignment could save on the number of messages delivered to consumers after a reassignment. Another advantage would be reducing the need to cleanup local partition state between rebalances.
This assignment strategy, which is implemented for the new consumer, would add a StickyAssignor class that can be used as org.apache.kafka.clients.consumer.StickyAssignor for the value of the consumer property partition.assignment.strategy. It would not affect the default value of this consumer property.
Add a Sticky Assignor option to the potential assignment strategies of the new consumer. The Sticky Assignor serves two purposes.
First, it guarantees an assignment that is as balanced as possible, meaning either:
When starting a fresh assignment, the Sticky Assignor would distribute the partitions over consumers as evenly as possible. Even though this may sound similar to how round robin assignor works, the second example below shows that it results in a more balanced assignment.
Second, during a reassignment the Sticky Assignor would perform the reassignment in such a way that in the new assignment,
Of course, the first goal above takes precedence over the second one. This means it is possible that a few topic partitions cannot remain assigned to the same consumer and have to switch to another consumer in order to guarantee the most balanced assignment possible.
With the Sticky Assignor, the reassignment is performed by
Upon each partition assignment calculation, the full partition assignments are preserved for each consumer (as user data). The embedded protocol used by consumer groups opting the sticky partition assignment would be
ProtocolMetadata => Version Subscription UserData
Version => int16
Subscription => [Topic]
Topic => string
UserData => CurrentAssignments
CurrentAssignments => [Topic [Partition]]
Topic => string
Partition => int32
This is specially helpful when a consumer group leader, who is in charge of performing the partition assignment, dies and the leadership has to be given to some other group member. In such circumstances, the new leader has access to the most recent partition assignment and can easily take over the rebalance that is triggered when the former leader disappears, as well as future rebalances.
Suppose there are three consumers C0, C1, C2, four topics t0, t1, t2, t3, and each topic has 2 partitions, resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. Each consumer is subscribed to all four topics.
The assignment with both sticky and round robin assignors results in
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0, t1p1, t3p0 |
C1 | t0p1, t2p0, t3p1 |
C2 | t1p0, t2p1 |
Now, let's assume that consumer C1 is removed and a reassignment occurs. The round robin assignor would produce
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0, t1p0, t2p0, t3p0 |
C2 | t0p1, t1p1, t2p1, t3p1 |
The sticky assignor, on the other hand, would result in
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0, t1p1, t3p0, t2p0 |
C2 | t1p0, t2p1, t0p1, t3p1 |
preserving 5 of the previous assignments (unlike the round robin assignor which preserves only 3).
There are three consumers C0, C1, C2, and three topics t0, t1, t2, with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
The round robin assignor would result in the following assignment:
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0 |
C1 | t1p0 |
C2 | t1p1, t2p0, t2p1, t2p2 |
which is not as balanced as the assignment produced by the sticky assignor:
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0 |
C1 | t1p0, t1p1 |
C2 | t2p0, t2p1, t2p2 |
Now, if consumer C0 is removed, these two assignors would produce the following assignments.
Round Robin (preserves 3 partition assignments):
Consumer | Assigned Topic Partitions |
---|---|
C1 | t0p0, t1p1 |
C2 | t1p0, t2p0, t2p1, t2p2 |
Sticky (preserves 5 partition assignments):
Consumer | Assigned Topic Partitions |
---|---|
C1 | t1p0, t1p1, t0p0 |
C2 | t2p0, t2p1, t2p2 |
Not only the sticky assignor preserves more assignments, it also results in a more balanced assignment.
There are two consumers C0, C1, and two topics t0, t1, with 2 partitions each. Therefore, the partitions are t0p0, t0p1,t1p0, t1p1. Both consumers are subscribed to both topics.
The range, round robin, and sticky assignors all result in the following assignment:
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0, t1p0 |
C1 | t0p1, t1p1 |
Now, if consumer C2 subscribed to both topics comes on board, the range assignor would produce the following (which is not a fair assignment):
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0, t1p0 |
C1 | t0p1, t1p1 |
C2 |
The round robin assignor would produce this (which is not the stickiest assignment):
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0, t1p1 |
C1 | t0p1 |
C2 | t1p0 |
The sticky assignor would move one of the partitions to the new consumer producing something like this (preserving 3 partition assignments):
Consumer | Assigned Topic Partitions |
---|---|
C0 | t0p0, t1p0 |
C1 | t0p1 |
C2 | t1p1 |
The inputs to the sticky partition assignment algorithm are
The sticky partition assignment algorithm works by defining and maintaining a number of data structures
The algorithm includes these main steps
We call a consumer partition assignment balanced when
Note that due to the complex nature of the problem and the heuristics (e.g. steps 2 and 4) applied in the algorithm above we expect to find an optimum (and not necessarily the best) sticky partition assignment.
This proposal would add a new option to existing assignment strategies of the new consumer. The only impact is to those new consumers that have some partition cleanup code in their onPartitionsRevoked()
callback listeners. That cleanup code is rightfully placed in that callback listener because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. The listener code would look like this:
class MyOldRebalanceListener { void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition: partitions) { commitOffsets(partition); cleanupState(partition); } } void onPartitionsAssigned(Collection<TopicPartition> partitions) { for (TopicPartition partition: partitions) { initializeState(partition); initializeOffset(partition); } } } |
One of the advantages of the sticky assignor is that, in general, it reduces the number of partitions that are actually de-assigned from a consumer. Because of that, consumers now can do their cleanup more efficiently. Of course, they still can perform the partition cleanup in the onPartitionsRevoked()
listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the cleanup after the rebalance on the actual partitions they lost (which is normally not a whole lot). The code snippet below clarifies this point.
class MyNewRebalanceListener { Collection<TopicPartition> lastAssignment = Collections.emptyList(); void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition: partitions) commitOffsets(partition); } void onPartitionsAssigned(Collection<TopicPartition> assignment) { for (TopicPartition partition: difference(lastAssignment, assignment) cleanupState(partition); for (TopicPartition partition: difference(assignment, lastAssignment) initializeState(partition); for (TopicPartition partition: assignment) initializeOffset(partition); this.lastAssignment = assignment; } } |
Looking back at Example 2, when Consumer C0 is removed a rebalance occurs and with the round robin assignment the remaining consumers would perform partition clean up on all of their partitions before the rebalance (a total of 5). With the sticky assignor and the change proposed above, since both C1 and C2 preserve all their assigned partitions there would be no need to do any cleanup.