Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Move the examples into the proposed changes section

...

Fixing this behavior would likely change the solution provided in the JIRA's pull request.

 

Public Interfaces

There are no new public interfaces or changes to existing interfaces suggested by this KIP.

 

Proposed Changes

The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do not cause an immediate assignment update (this is how the regex subscribe interface is implemented). Instead, the assignment remains valid until it has been revoked in the next rebalance. This is mostly about fixing an inconsistent behavior. The examples below shows this inconsistency and how this KIP proposes to resolve it.

Example: Topic Subscription

Existing Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

...

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)] 

Example: Regex Subscription

...

Code Block
consumer.subscribe(Pattern.compile("..."))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Pattern.compile("f.."))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

Suggested Semantics

Code Block
consumer.subscribe(Pattern.compile("..."))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Pattern.compile("f.."))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

 

 

Public Interfaces

There are no new public interfaces or changes to existing interfaces suggested by this KIP.

 

Proposed Changes

The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do not cause an immediate assignment update (this is how the regex subscribe interface is implemented). This is more about fixing an inconsistent behavior.

...

Compatibility, Deprecation, and Migration Plan

...