Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This proposal would add a new option to existing assignment strategies of the new consumer. The only impact is to those new consumers that have some partition cleanup code in their onPartitionsRevoked() callback listeners. That cleanup code is rightfully placed in that callback listener because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. The listener code would look like this:

Code Block
class MyOldRebalanceListener {

  void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      commitOffsets(partition);
      cleanupState(partition);
    }
  }

  void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions) {
      initializeState(partition);
      initializeOffset(partition);
    }
  }
}

 

One of the advantages of the sticky assignor is that, in general, it reduces the number of partitions that are actually de-assigned from a consumer. Because of that, consumers now can do their cleanup more efficiently. Of course, they still can perform the partition cleanup in the onPartitionsRevoked() listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the cleanup after the rebalance on the actual partitions they lost (which is normally not a whole lot). The code snippet below clarifies this point.

Code Block
public class StickyAssignorConsumerRebalanceListener implements ConsumerRebalanceListener {class MyNewRebalanceListener {
  Collection<TopicPartition> lastAssignment = Collections.emptyList();

  void  ...
 onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition partition: partitions)
    private Collection<TopicPartition> revokedPartitions;
 commitOffsets(partition);
    @Override
  }

  public void onPartitionsRevokedonPartitionsAssigned(Collection<TopicPartition> partitionsassignment) {
    for (TopicPartition partition: difference(lastAssignment, assignment)
    this.revokedPartitions = partitionscleanupState(partition);

    }
for 
(TopicPartition partition:   @Overridedifference(assignment, lastAssignment)
    public void onPartitionsAssignedinitializeState(Collection<TopicPartition> partitions) {partition);

    for (TopicPartition   this.revokedPartitions.removeAll(partitions);partition: assignment)
        cleanup(this.revokedPartitionsinitializeOffset(partition);

    }

    ...this.lastAssignment = assignment;
  }
}

Looking back at Example 2, when Consumer Cis removed a rebalance occurs and with the round robin assignment the remaining consumers would perform partition clean up on all of their partitions before the rebalance (a total of 5). With the sticky assignor and the change proposed above, since both C1 and C2 preserve all their assigned partitions there would be no need to do any cleanup.

...