...
Fixing this behavior would likely change the solution provided in the JIRA's pull request.
Public Interfaces
There are no new public interfaces or changes to existing interfaces suggested by this KIP.
Proposed Changes
The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do not cause an immediate assignment update (this is how the regex subscribe interface is implemented). Instead, the assignment remains valid until it has been revoked in the next rebalance. This is mostly about fixing an inconsistent behavior. The examples below shows this inconsistency and how this KIP proposes to resolve it.
Example: Topic Subscription
Existing Semantics
Code Block |
---|
consumer.subscribe(Arrays.asList("foo", "bar")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Arrays.asList("foo")) System.out.println(consumer.assignment()); // prints [(foo, 0)] consumer.poll(0) --> onPartitionsRevoked([(foo, 0)]) --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
...
Code Block |
---|
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []
consumer.poll(0)
--> onPartitionsRevoked([])
--> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
consumer.poll(0)
--> onPartitionsRevoked([(foo, 0), (bar, 0)])
--> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)] |
Example: Regex Subscription
...
Code Block |
---|
consumer.subscribe(Pattern.compile("...")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Pattern.compile("f..")) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.poll(0) --> onPartitionsRevoked([(foo, 0)]) --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
Suggested Semantics
Code Block |
---|
consumer.subscribe(Pattern.compile("...")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Pattern.compile("f..")) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.poll(0) --> onPartitionsRevoked([(foo, 0), (bar, 0)]) --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
Public Interfaces
There are no new public interfaces or changes to existing interfaces suggested by this KIP.
Proposed Changes
The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do not cause an immediate assignment update (this is how the regex subscribe interface is implemented). This is more about fixing an inconsistent behavior.
...
Compatibility, Deprecation, and Migration Plan
...