Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Fixing this behavior would likely change the solution provided in the JIRA's pull request.

Example: Topic Subscription

Consider the following use case according to current consumer partition assignment semantics:

  1. Consumer c subscribes to topic t with one partition p.
  2. Consumer c consumes some messages from p.
  3. Consumer c unsubscribes from t.
  4. Partition p is immediately removed from c's assigned partitions.
  5. During the next rebalance, the offset of c's assigned partitions is committed, but since p is no longer among those partitions, c's progress on p will not be committed (and will be lost). Hence KAFKA-3664.

 

This KIP proposes that this use case is handled according to the following semantics:

  1. Consumer c subscribes to topic t with one partition p.
  2. Consumer c consumes some messages from p.
  3. Consumer c unsubscribes from t.
  4. Partition p is marked for removal during the next rebalance.
  5. During the next rebalance, the offset of c's assigned partitions (including p) is committed, and then updated (resulting in p's removal).

 

Existing Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []


consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0)]
consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

 

Suggested Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []


consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

...