Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: Under DiscussionAccepted

Discussion thread: link

JIRA: KAFKA-2273

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Note
Changes introduced in this KIP could potentially be superseded by changes to consumer rebalancing protocol.


Note
There was a bug in the partition assignment algorithm as described in this KIP that was fixed in KIP-341.

Motivation

In certain circumstances the round robin assignor, which produces better assignments compared to range assignor, fails to produce an optimal and balanced assignment of topic partitions to consumers. KIP-49 touches on some of these circumstances. In addition, when a reassignment occurs, none of the existing strategies consider what topic partition assignments were before reassignment, as if they are about to perform a fresh assignment. Preserving the existing assignments could reduce some of the overheads of a reassignment. For example, Kafka consumers retain pre-fetched messages for partitions assigned to them before a reassignment. Therefore, preserving the partition assignment could save on the number of messages delivered to consumers after a reassignment. Another advantage would be reducing the need to cleanup local partition state between rebalances.

...

  • the numbers of topic partitions assigned to consumers differ by at most one; or
  • if a consumer A has 2+ fewer topic partitions assigned to it compared to another consumer B, none of the topic partitions assigned to B A can be assigned to AB.

When starting a fresh assignment, the Sticky Assignor would distribute the partitions over consumers as evenly as possible. Even though this may sound similar to how round robin assignor works, the second example below shows that it results in a more balanced assignment.

...

Suppose there are three consumers C0, C1, C2, four topics t0, t1, t2, t3, and each topic has 2 partitions, resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. Each consumer is subscribed to all three four topics.

 

The assignment with both sticky and round robin assignors results in

...

The round robin assignor would produce this (which is not the stickies stickiest assignment):

ConsumerAssigned Topic Partitions
C0t0p0, t1p1
C1t0p1
C2t1p0

...

This proposal would add a new option to existing assignment strategies of the new consumer. The only impact is to those new consumers that have some partition cleanup code in their onPartitionsRevoked() callback listeners. That cleanup code is rightfully placed in that callback listener because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. The listener code would look like this:

Code Block
class MyOldRebalanceListener {

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      commitOffsets(partition);
      cleanupState(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      initializeState(partition);
      initializeOffset(partition);
    }
  }
}

 

One One of the advantages of the sticky assignor is that, in general, it reduces the number of partitions that are actually de-assigned from a consumer. Because of that, consumers now can do their cleanup more efficiently. Of course, they still can perform the partition cleanup in the onPartitionsRevoked() listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the cleanup after the rebalance on the actual partitions they lost (which is normally not a whole lot). The code snippet below clarifies this point.

Code Block
public class StickyAssignorConsumerRebalanceListener implements ConsumerRebalanceListener {class MyNewRebalanceListener {
  Collection<TopicPartition> lastAssignment = Collections.emptyList();

  void  ...
 onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions)
    private Collection<TopicPartition> revokedPartitions;
 commitOffsets(partition);
    @Override
  }

  public void onPartitionsRevokedonPartitionsAssigned(Collection<TopicPartition> partitionsassignment) {
    for (TopicPartition partition: difference(lastAssignment, assignment)
    this.revokedPartitions = partitionscleanupState(partition);

    }
for 
(TopicPartition partition:   @Overridedifference(assignment, lastAssignment)
    public void onPartitionsAssignedinitializeState(Collection<TopicPartition> partitions) {partition);

    for (TopicPartition   this.revokedPartitions.removeAll(partitions);partition: assignment)
        cleanup(this.revokedPartitionsinitializeOffset(partition);

    }

    ...this.lastAssignment = assignment;
  }
}

Looking back at Example 2, when Consumer Cis removed a rebalance occurs and with the round robin assignment the remaining consumers would perform partition clean up on all of their partitions before the rebalance (a total of 5). With the sticky assignor and the change proposed above, since both C1 and C2 preserve all their assigned partitions there would be no need to do any cleanup.

...