Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: Under DiscussionAccepted

Discussion thread: link

JIRA: KAFKA-2273

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note
Changes introduced in this KIP could potentially be superseded by changes to consumer rebalancing protocol.


Note
There was a bug in the partition assignment algorithm as described in this KIP that was fixed in KIP-341.

Motivation

In certain circumstances the round robin assignor, which produces better assignments compared to range assignor, fails to produce an optimal and balanced assignment of topic partitions to consumers. KIP-49 touches on some of these circumstances. In addition, when a reassignment occurs, none of the existing strategies consider what topic partition assignments were before reassignment, as if they are about to perform a fresh assignment. Preserving the existing assignments could reduce some of the overheads of a reassignment. For example, Kafka consumers retain pre-fetched messages for partitions assigned to them before a reassignment. Therefore, preserving the partition assignment could save on the number of messages delivered to consumers after a reassignment. Another advantage would be reducing the need to cleanup local partition state between rebalances.

...

  • the numbers of topic partitions assigned to consumers differ by at most one; or
  • if a consumer A has 2+ fewer topic partitions assigned to it compared to another consumer B, none of the topic partitions assigned to B A can be assigned to AB.

When starting a fresh assignment, the Sticky Assignor would distribute the partitions over consumers as evenly as possible. Even though this may sound similar to how round robin assignor works, the second example below shows that it results in a more balanced assignment.

...

Suppose there are three consumers C0, C1, C2, four topics t0, t1, t2, t3, and each topic has 2 partitions, resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. Each consumer is subscribed to all three four topics.

 

The assignment with both sticky and round robin assignors results in

...

ConsumerAssigned Topic Partitions
C01t0p0, t1p1
C2t1p0, t2p0, t2p1, t2p2

...

ConsumerAssigned Topic Partitions
C01t1p0, t1p1, t0p0
C2t2p0, t2p1, t2p2

...

Not only the sticky assignor preserves more assignments, it also results in a more balanced assignment.

 

Example 3

There are two consumers C0, C1, and two topics t0, t1, with 2 partitions each. Therefore, the partitions are t0p0, t0p1,t1p0, t1p1. Both consumers are subscribed to both topics.

 

The range, round robin, and sticky assignors all result in the following assignment:

ConsumerAssigned Topic Partitions
C0t0p0, t1p0
C1t0p1, t1p1

 

Now, if consumer C2 subscribed to both topics comes on board, the range assignor would produce the following (which is not a fair assignment):

ConsumerAssigned Topic Partitions
C0t0p0, t1p0
C1t0p1, t1p1
C2 

 

The round robin assignor would produce this (which is not the stickiest assignment):

ConsumerAssigned Topic Partitions
C0t0p0, t1p1
C1t0p1
C2t1p0

 

The sticky assignor would move one of the partitions to the new consumer producing something like this (preserving 3 partition assignments):

ConsumerAssigned Topic Partitions
C0t0p0, t1p0
C1t0p1
C2t1p1

 

The Algorithm

The inputs to the sticky partition assignment algorithm are

...

This proposal would add a new option to existing assignment strategies of the new consumer. The only impact is to those new consumers that have some partition cleanup code in their onPartitionsRevoked() callback listeners. That cleanup code is rightfully placed in that callback listener because the consumer . It would not impact any existing functionality.has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. The listener code would look like this:

Code Block
class MyOldRebalanceListener {

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      commitOffsets(partition);
      cleanupState(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      initializeState(partition);
      initializeOffset(partition);
    }
  }
}

 

One of the advantages of the sticky assignor is that, in general, it reduces the number of partitions that are actually de-assigned from a consumer. Because of that, consumers now can do their cleanup more efficiently. Of course, they still can perform the partition cleanup in the onPartitionsRevoked() listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the cleanup after the rebalance on the actual partitions they lost (which is normally not a whole lot). The code snippet below clarifies this point.

Code Block
class MyNewRebalanceListener {
  Collection<TopicPartition> lastAssignment = Collections.emptyList();

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions)
      commitOffsets(partition);
  }

  void onPartitionsAssigned(Collection<TopicPartition> assignment) {
    for (TopicPartition partition: difference(lastAssignment, assignment)
      cleanupState(partition);

    for (TopicPartition partition: difference(assignment, lastAssignment)
      initializeState(partition);

    for (TopicPartition partition: assignment)
      initializeOffset(partition);

    this.lastAssignment = assignment;
  }
}

Looking back at Example 2, when Consumer Cis removed a rebalance occurs and with the round robin assignment the remaining consumers would perform partition clean up on all of their partitions before the rebalance (a total of 5). With the sticky assignor and the change proposed above, since both C1 and C2 preserve all their assigned partitions there would be no need to do any cleanup.

 

Rejected Alternatives

  • Having the consumer group leader store the calculated topic partition assignment in an internal topic for other consumers to retrieve in case of a leadership change. It was decided that passing the calculated assignments as user data to all consumers after each rebalance is a more viable option.