Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: Under DiscussionAccepted

Discussion thread: link

JIRA: KAFKA-2273

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note
Changes introduced in this KIP could potentially be superseded by changes to consumer rebalancing protocol.

...


Note
There was a bug in the partition assignment algorithm as described in this KIP that was fixed in KIP-341.

Motivation

In certain circumstances the round robin assignor, which produces better assignments compared to range assignor, fails to produce an optimal and balanced assignment of topic partitions to consumers. KIP-49 touches on some of these circumstances. In addition, when a reassignment occurs, none of the existing strategies consider what topic partition assignments were before reassignment, as if they are about to perform a fresh assignment. Preserving the existing assignments could reduce some of the overheads of a reassignment. For example, Kafka consumers retain pre-fetched messages for partitions assigned to them before a reassignment. Therefore, preserving the partition assignment could save on the number of messages delivered to consumers after a reassignment. Another advantage would be reducing the need to cleanup local partition state between rebalances.

...

  • the numbers of topic partitions assigned to consumers differ by at most one; or
  • if a consumer A has 2+ fewer topic partitions assigned to it compared to another consumer B, none of the topic partitions assigned to B A can be assigned to AB.

When starting a fresh assignment, the Sticky Assignor would distribute the partitions over consumers as evenly as possible. Even though this may sound similar to how round robin assignor works, the second example below shows that it results in a more balanced assignment.

...

Suppose there are three consumers C0, C1, C2, four topics t0, t1, t2, t3, and each topic has 2 partitions, resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. Each consumer is subscribed to all three four topics.

 

The assignment with both sticky and round robin assignors results in

...

Not only the sticky assignor preserves more assignments, it also results in a more balanced assignment.

 

The Algorithm

The inputs to the sticky partition assignment algorithm are

  • partitionsPerTopic: topics mapped to number of their partitions
  • subscriptions: mapping of consumer to subscribed topics
  • currentAssignment: preserved assignment of topic partitions to consumers calculated during the previous rebalance

The sticky partition assignment algorithm works by defining and maintaining a number of data structures

  • potential consumers of each topic partition (partition2AllPotentialConsumers)
  • topic partitions each consumer can consume from (consumer2AllPotentialPartitions)
  • the current consumer of each topic partition (currentPartitionConsumer)
  • sorted list of topic partitions (in ascending order) by how many consumers can consumer from them (sortedAllPartitions)
  • partitions that don't have a consumer (either because they are just created, or because their consumer no longer exists) and need new assignment (unassignedPartitions)
  • sorted list of consumers (in ascending order) based on how many partitions are currently assigned to them (sortedCurrentSubscriptions)
  • re-assignable partitions, or those with 2 or more potential consumers (reassignablePartitions)
  • partitions that cannot be assigned to another consumer (fixedPartitions)
  • current assignment of (fixed) consumers whose assignment cannot change (fixedAssignments)

 

The algorithm includes these main steps

  1. Update currentAssignment by removing consumers or partitions that no longer exist.
  2. Distribute unassignedPartitions among existing consumers: For each unassigned partition start from the consumer with fewest number of assignments and if the consumer can consume from this partition assign the partition to the consumer and update the necessary data structures. After this distribution all partitions that can be assigned to some consumer are, in fact, assigned. There maybe some that have no subscriber and remain unassigned (which is fine).
  3. Find consumers whose topic partition assignment cannot change and remove them from currentAssignment (to limit the scope of the problem).
  4. From partitions that can potentially be reassigned (sortedAllPartitions) start from one with fewest possible consumers and if it can go to another consumer to improve balance move it to that other consumer. Continue until all partitions are gone through this reassignment or we reach a balance (defined below).
  5. If reassignments occurred in previous step and we don't reach a better overall balance (for example, we get an equally balanced assignment) we revert the assignment in favor of stickiness since we could not improve fairness.
  6. Re-add fixed assignments to currentAssignment.

 

We call a consumer partition assignment balanced when

  • The minimum and maximum number of partitions assigned to any consumer differs by at most one; or
  • No topic partition can be moved from its current consumer to another of its potential consumers and improve the overall balance score at the same time.
    The overall balance score of a topic partition assignment is defined as the sum of assigned partitions size difference of all consumer pairs. A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0. Lower balance score indicates a more balanced assignment.

 

Note that due to the complex nature of the problem and the heuristics (e.g. steps 2 and 4) applied in the algorithm above we expect to find an optimum (and not necessarily the best) sticky partition assignment.

 

Alternatives Yet to be Considered

  • The sticky partition assignment algorithm described above, as mentioned earlier in this KIP, favors fairness over stickiness (we may call it the fair yet sticky or stickiest fair strategy). Therefore, some partitions may change their consumer towards a fair assignment. This strategy is supposedly more complex to implement due to complex nature of calculating the most balanced assignment.
    An alternative strategy is to implement it so that stickiness take precedence and all previously assigned partitions are preserved; while the new consumers or partitions that need to be assigned do so towards the fairest possible assignment. We may call this alternative the sticky yet fair or fairest sticky strategy. This strategy would be more efficient as it exhibits less complexity and variance. After all, the previous valid assignments remain as they are and only a few new assignments have to be calculated.
    The fair strategy discussed in KIP-49 considers fairness only and does not take into consideration the partition assignments before the rebalance. Fairness only can also be supported as part of this KIP by switching off stickiness and always starting from an empty currentAssignment.

 

Notes

...

Example 3

There are two consumers C0, C1, and two topics t0, t1, with 2 partitions each. Therefore, the partitions are t0p0, t0p1,t1p0, t1p1. Both consumers are subscribed to both topics.

 

The range, round robin, and sticky assignors all result in the following assignment:

ConsumerAssigned Topic Partitions
C0t0p0, t1p0
C1t0p1, t1p1

 

Now, if consumer C2 subscribed to both topics comes on board, the range assignor would produce the following (which is not a fair assignment):

ConsumerAssigned Topic Partitions
C0t0p0, t1p0
C1t0p1, t1p1
C2 

 

The round robin assignor would produce this (which is not the stickiest assignment):

ConsumerAssigned Topic Partitions
C0t0p0, t1p1
C1t0p1
C2t1p0

 

The sticky assignor would move one of the partitions to the new consumer producing something like this (preserving 3 partition assignments):

ConsumerAssigned Topic Partitions
C0t0p0, t1p0
C1t0p1
C2t1p1

 

The Algorithm

The inputs to the sticky partition assignment algorithm are

  • partitionsPerTopic: topics mapped to number of their partitions
  • subscriptions: mapping of consumer to subscribed topics
  • currentAssignment: preserved assignment of topic partitions to consumers calculated during the previous rebalance

The sticky partition assignment algorithm works by defining and maintaining a number of data structures

  • potential consumers of each topic partition (partition2AllPotentialConsumers)
  • topic partitions each consumer can consume from (consumer2AllPotentialPartitions)
  • the current consumer of each topic partition (currentPartitionConsumer)
  • sorted list of topic partitions (in ascending order) by how many consumers can consumer from them (sortedAllPartitions)
  • partitions that don't have a consumer (either because they are just created, or because their consumer no longer exists) and need new assignment (unassignedPartitions)
  • sorted list of consumers (in ascending order) based on how many partitions are currently assigned to them (sortedCurrentSubscriptions)
  • re-assignable partitions, or those with 2 or more potential consumers (reassignablePartitions)
  • partitions that cannot be assigned to another consumer (fixedPartitions)
  • current assignment of (fixed) consumers whose assignment cannot change (fixedAssignments)

 

The algorithm includes these main steps

  1. Update currentAssignment by removing consumers or partitions that no longer exist.
  2. Distribute unassignedPartitions among existing consumers: For each unassigned partition start from the consumer with fewest number of assignments and if the consumer can consume from this partition assign the partition to the consumer and update the necessary data structures. After this distribution all partitions that can be assigned to some consumer are, in fact, assigned. There maybe some that have no subscriber and remain unassigned (which is fine).
  3. Find consumers whose topic partition assignment cannot change and remove them from currentAssignment (to limit the scope of the problem).
  4. From partitions that can potentially be reassigned (sortedAllPartitions) start from one with fewest possible consumers and if it can go to another consumer to improve balance move it to that other consumer. Continue until all partitions are gone through this reassignment or we reach a balance (defined below).
  5. If reassignments occurred in previous step and we don't reach a better overall balance (for example, we get an equally balanced assignment) we revert the assignment in favor of stickiness since we could not improve fairness.
  6. Re-add fixed assignments to currentAssignment.

 

We call a consumer partition assignment balanced when

  • The minimum and maximum number of partitions assigned to any consumer differs by at most one; or
  • No topic partition can be moved from its current consumer to another of its potential consumers and improve the overall balance score at the same time.
    The overall balance score of a topic partition assignment is defined as the sum of assigned partitions size difference of all consumer pairs. A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0. Lower balance score indicates a more balanced assignment.

 

Note that due to the complex nature of the problem and the heuristics (e.g. steps 2 and 4) applied in the algorithm above we expect to find an optimum (and not necessarily the best) sticky partition assignment.

 

Alternatives Yet to be Considered

  • The sticky partition assignment algorithm described above, as mentioned earlier in this KIP, favors fairness over stickiness (we may call it the fair yet sticky or stickiest fair strategy). Therefore, some partitions may change their consumer towards a fair assignment. This strategy is supposedly more complex to implement due to complex nature of calculating the most balanced assignment.
    An alternative strategy is to implement it so that stickiness take precedence and all previously assigned partitions are preserved; while the new consumers or partitions that need to be assigned do so towards the fairest possible assignment. We may call this alternative the sticky yet fair or fairest sticky strategy. This strategy would be more efficient as it exhibits less complexity and variance. After all, the previous valid assignments remain as they are and only a few new assignments have to be calculated.
    The fair strategy discussed in KIP-49 considers fairness only and does not take into consideration the partition assignments before the rebalance. Fairness only can also be supported as part of this KIP by switching off stickiness and always starting from an empty currentAssignment.

 

Notes

  • KAFKA-2019: This JIRA describes an issue with the round robin assignor of the old consumer, which stems from multiple threads that each (old) consumer may have and how the threads of each consumer are assigned first before assigning partitions to other consumer threads. Since the new consumer is single threaded there is no such problem in its round robin strategy. It simply considers consumers one by one for each partition assignment, and when one consumer is assigned a partition, the next assignment starts by considering the next consumer in the list (and not from the start of the list again not from the same consumer that was just assigned). This removes the possibility of the issue reported in KAFKA-2019 surfacing in the new consumer's round robin assignor. In the sticky strategy this issue does not exist either, since every time an assignment is about to happen it starts with the consumer with least number of assignments. So there is no scenario in which a consumer is repeatedly assigned partitions as in KAFKA-2019 (unless that consumer is lagging behind other consumers on the number of partitions assigned).

Compatibility, Deprecation, and Migration Plan

This proposal would add a new option to existing assignment strategies of the new consumer. The only impact is to those new consumers that have some partition cleanup code in their onPartitionsRevoked() callback listeners. That cleanup code is rightfully placed in that callback listener because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. The listener code would look like this:

Code Block
class MyOldRebalanceListener {

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      commitOffsets(partition);
      cleanupState(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      initializeState(partition);
      initializeOffset(partition);
    }
  }
}

 

Compatibility, Deprecation, and Migration Plan

This proposal would add a new option to existing assignment strategies of the new consumer. The only impact is to those new consumers that have some partition cleanup code in their onPartitionsRevoked() callback listeners. That cleanup code is rightfully placed in that callback listener because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. One of the advantages of the sticky assignor is that, in general, it reduces the number of partitions that are actually de-assigned from a consumer. Because of that, consumers now can do their cleanup more efficiently. Of course, they still can perform the partition cleanup in the onPartitionsRevoked() listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the cleanup after the rebalance on the actual partitions they lost (which is normally not a whole lot). The code snippet below clarifies this point.

Code Block
public class StickyAssignorConsumerRebalanceListener implements ConsumerRebalanceListener {

    ...
 
    private Collection<TopicPartition> revokedPartitions;
 
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.revokedPartitions = partitions;
    }
 
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.revokedPartitions.removeAll(partitions);class MyNewRebalanceListener {
  Collection<TopicPartition> lastAssignment = Collections.emptyList();

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions)
      commitOffsets(partition);
  }

  void onPartitionsAssigned(Collection<TopicPartition> assignment) {
    for (TopicPartition partition: difference(lastAssignment, assignment)
      cleanupState(partition);

    for (TopicPartition partition: difference(assignment, lastAssignment)
      initializeState(partition);

    for (TopicPartition partition: assignment)
        cleanup(this.revokedPartitionsinitializeOffset(partition);

    }

    ...this.lastAssignment = assignment;
  }
}

Looking back at Example 2, when Consumer Cis removed a rebalance occurs and with the round robin assignment the remaining consumers would perform partition clean up on all of their partitions before the rebalance (a total of 5). With the sticky assignor and the change proposed above, since both C1 and C2 preserve all their assigned partitions there would be no need to do any cleanup.

...