Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated the KIP to keep it aligned with the final implementation.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion Accepted

Discussion thread: here

Vote threadhere

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7026

...

The only addition is a numeric generation marker that increases during each improvement. This lets the leader detect the highest generation and ignore a consumer's reported assignment (if necessary) when that assignment belongs to older generations.

Generation Marker

There are two methods for implementing this generation marker:

  1. A Sticky Assignor specific generation marker with the scope limited to Sticky Assignor (on the client side) only. This would mean that the generation marker of a sticky assignment may occasionally reset to 0 if no consumer in the group reports a previous assignment. But that does not jeopardize the logic presented above in resolving the scenarios that could lead to duplicate assignments. The upside of this method is the solution has limited scope and is specific to Sticky Assignor and other assignors or the AbstractPartitionAssignor class will not be impacted.
  2. The consumer group's generation number can also be used for this purpose. The upside of this approach is that the assignment generation marker will be in-sync with consumer group's generation number. Also, the group's generation number, once available in the assignors, could later be used by other stateful partition assignors. The downside is added implementation complexity in exposing the consumer group's generation number to the Sticky Assignor which would potentially impact all existing assignor classes.  

Consumer group generation will be used for the new user data field. On top of addressing the cons described below in Rejected Alternatives section for a local generation marker, this option also makes it possible for other stateful assignors to use consumer group generation.

This generation is not currently exposed to any of partition assignors. It will be exposed as part of the assignor.onAssignment(...) call in ConsumerCoordinator.java:

Code Block
languagejava
titleConsumerCoordinator::onJoinComplete
// current code
...
assignor.onAssignment(assignment);
...

// revised code
...
assignor.onAssignment(assignment, generation);
...


As a result of above change to the assignor.onAssignment(...) call, a new method is introduced in the interface PartitionAssignor:

Code Block
languagejava
titlePartitionAssignor
default void onAssignment(Assignment assignment, int generation) {
    onAssignment(assignment);
}

The existing onAssignment(...) method will remain to support classes that already implement this interface.

It is expected that classes that implement this interface implement at least one of the two assign(...) methods. A default implementation is provided for this methods to keep assignors that do not make use of group generation (e.g. RangeAssignor and RoundRobinAssignor classes) intact.

Note: During the implementation of this KIP there was a back and forth on whether this generation argument should be an optional argument (i.e. Optional<Integer> ). It turned out that the only edge case where the optional argument would be useful is when an old client makes use of the updated sticky assignor. It was decided that this supposedly rare case would not worth the complexity and the noise that comes with the optional typeOne of these two methods will be used in the implementation.

Compatibility, Deprecation, and Migration Plan

The new user data protocol can be implemented in a way that supports backward and forward compatibility.

  • Backward compatibility: If the leader uses the new protocol it first tries to deserialize reported assignments of consumers using the new protocol. If that does not work it falls back to using the old protocol assuming a default (e.g. -1) generation number for the assignment.
  • Forward compatibility: If the leader uses the old protocol it deserializes only the array portion of the reported assignment byte array ignoring the potential integer that could be following. Generation markers reported by new clients will all be ignored in this case.

Rejected Alternatives

...

  • Using a local generation marker within the scope of Sticky Assignor: This option was voted against because 1) it could cause confusion with the consumer group generation; 2) it could reset to 0 from time to time (if no consumer in the group has a previous assignment); 3) there are corner cases where it may not lead to the expected optimum assignment (example