Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Fixing this behavior would likely change the solution provided in the JIRA's pull request.

Examples below on topic subscription and regex subscription should further clarify the problem.

...

The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do not cause an immediate assignment update (this is how the regex subscribe interface is implemented). Instead, the assignment remains valid until it has been revoked in the next rebalance. This is mostly about fixing an inconsistent behavior. The examples below shows this inconsistency and how this KIP proposes to resolve it.

Examples

In the following examples assume the cluster contains only two topics foo and bar, each with a single partition.

...

Topic Subscription

Existing Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

Suggested Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] # notice the change

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) # notice the change
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)] 

...

Regex Subscription

Existing Semantics

Code Block
consumer.subscribe(Pattern.compile("..."))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Pattern.compile("f.."))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

Suggested Semantics

Code Block
consumer.subscribe(Pattern.compile("..."))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Pattern.compile("f.."))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) # notice the change
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

...