Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The idea behind this KIP was initiated as a result of the discussion on the pull request for KAFKA-3664. The original issue reported in the JIRA was about offsets of partitions not being committed when a consumer unsubscribes from them. Specifically, when users are using group management, if they call consumer.subscribe() or consumer.unsubscribe() to change the subscription, the removed subscriptions will be immediately removed and their offset will not be committed. The fix provided in the corresponding pull request includes performing a commitAsync() in subscribe() and unsubscribe() methods to trigger an offset commit only when auto commit is enabled for the consumer. This solution maintains the current invariants as far as consistency between the assignment and offset commits, and it addresses the main problem from the JIRA, which is basically that users will see duplicates when they change subscriptions with auto commit enabled. For users who are using manual commit, they will have to call commitSync() prior to changing their subscription, but that seems reasonable.

...

Fixing this behavior would likely change the solution provided in the JIRA's pull request.

Example: Topic Subscription

Consider the following use case according to current consumer partition assignment semantics:

...

  1. Consumer c subscribes to topic t with one partition p.
  2. Consumer c consumes some messages from p.
  3. Consumer c unsubscribes from t.
  4. Partition p is marked for removal during the next rebalance.
  5. During the next rebalance, the offset of c's assigned partitions (including p) is committed, and then updated (resulting in p's removal).

Existing Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

Suggested Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

 

Example: Regex Subscription

Existing Semantics (Follows Suggested Semantics)

Code Block
consumer.subscribe(Pattern.compile("..."))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Pattern.compile("f.."))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

...

The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do not cause an immediate assignment update (this is how the regex subscribe interface is implemented). This is more about fixing an inconsistent behavior.

 

Compatibility, Deprecation, and Migration Plan

  • Impacted users: Only those who currently rely on passing a ConsumerRebalanceListener, making a call to consumer's topics subscribe interface, and expecting an updated assignment in the callback are impacted. The number of users who rely on this specific use case is expected to be minimal to zero.
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?


Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?


Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.