...
Fixing this behavior would likely change the solution provided in the JIRA's pull request.
Examples below on topic subscription and regex subscription should further clarify the problem.
...
The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do not cause an immediate assignment update (this is how the regex subscribe interface is implemented). Instead, the assignment remains valid until it has been revoked in the next rebalance. This is mostly about fixing an inconsistent behavior. The examples below shows this inconsistency and how this KIP proposes to resolve it.
Examples
In the following examples assume the cluster contains only two topics foo
and bar
, each with a single partition.
...
Topic Subscription
Existing Semantics
Code Block |
---|
consumer.subscribe(Arrays.asList("foo", "bar")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Arrays.asList("foo")) System.out.println(consumer.assignment()); // prints [(foo, 0)] consumer.poll(0) --> onPartitionsRevoked([(foo, 0)]) --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
Suggested Semantics
Code Block |
---|
consumer.subscribe(Arrays.asList("foo", "bar")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Arrays.asList("foo")) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] # notice the change consumer.poll(0) --> onPartitionsRevoked([(foo, 0), (bar, 0)]) # notice the change --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
...
Regex Subscription
Existing Semantics
Code Block |
---|
consumer.subscribe(Pattern.compile("...")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Pattern.compile("f..")) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.poll(0) --> onPartitionsRevoked([(foo, 0)]) --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
Suggested Semantics
Code Block |
---|
consumer.subscribe(Pattern.compile("...")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Pattern.compile("f..")) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.poll(0) --> onPartitionsRevoked([(foo, 0), (bar, 0)]) # notice the change --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
...