Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated the KIP to keep it aligned with the final implementation.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Accepted

...

This generation is not currently exposed to any of partition assignors. It will be exposed as part of the assignor.assignonAssignment(...) call in ConsumerCoordinator.java:

Code Block
languagejava
titleConsumerCoordinator::performAssignmentonJoinComplete
// current code
...
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions)onAssignment(assignment);
...

// revised code
...
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions, latestGeneration().generationId)onAssignment(assignment, generation);
...

Note that when this call is made the group is rebalancing and, therefore, the call to existing generation() method will return null. A new latestGeneration() method will be created in AbstractCoordinator to return the latest generation irrespective of the group status.


As a result of above change to the assignor.assignonAssignment(...) call, a new method is introduced in the interface PartitionAssignor:

Code Block
languagejava
titlePartitionAssignor
default Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                 Map<String, Subscription> subscriptions,
                                                 void onAssignment(Assignment assignment, int generation) {
    return assign(partitionsPerTopic, subscriptionsonAssignment(assignment);
}

The existing assignonAssignment(...) method will remain to support classes that already implement this interface.

It is expected that classes that implement this interface implement at least one of the two assign(...) methods. A default implementation (in the interface class) can be is provided for both methods to allow this, and to keep this methods to keep assignors that do not make use of group generation (e.g. RangeAssignor and RoundRobinAssignor classes) intact.

Note: During the implementation of this KIP there was a back and forth on whether this generation argument should be an optional argument (i.e. Optional<Integer> ). It turned out that the only edge case where the optional argument would be useful is when an old client makes use of the updated sticky assignor. It was decided that this supposedly rare case would not worth the complexity and the noise that comes with the optional type.

Compatibility, Deprecation, and Migration Plan

...