THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Fixing this behavior would likely change the solution provided in the JIRA's pull request.
Example: Topic Subscription
Consider the following use case according to current consumer partition assignment semantics:
Consumerc
subscribes to topict
with one partitionp
.Consumerc
consumes some messages fromp
.Consumerc
unsubscribes fromt
.Partitionp
is immediately removed fromc
's assigned partitions.During the next rebalance, the offset ofc
's assigned partitions is committed, but sincep
is no longer among those partitions,c
's progress onp
will not be committed (and will be lost). Hence KAFKA-3664.
This KIP proposes that this use case is handled according to the following semantics:
Consumerc
subscribes to topict
with one partitionp
.Consumerc
consumes some messages fromp
.Consumerc
unsubscribes fromt
.Partitionp
is marked for removal during the next rebalance.During the next rebalance, the offset ofc
's assigned partitions (includingp
) is committed, and then updated (resulting inp
's removal).
Existing Semantics
Code Block |
---|
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []
consumer.poll(0)
--> onPartitionsRevoked([])
--> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0)]
consumer.poll(0)
--> onPartitionsRevoked([(foo, 0)])
--> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)] |
Suggested Semantics
Code Block |
---|
consumer.subscribe(Arrays.asList("foo", "bar")) System.out.println(consumer.assignment()); // prints [] consumer.poll(0) --> onPartitionsRevoked([]) --> onPartitionsAssigned([(foo, 0), (bar,0)]) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.subscribe(Arrays.asList("foo")) System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] consumer.poll(0) --> onPartitionsRevoked([(foo, 0), (bar, 0)]) --> onPartitionsAssigned([(foo, 0)]) System.out.println(consumer.assignment()); // prints [(foo, 0)] |
...