Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Example: Topic Subscription

Consider the following use case according to current consumer partition assignment semantics:

  1. Consumer c subscribes to topic t with one partition p.
  2. Consumer c consumes some messages from p.
  3. Consumer c unsubscribes from t.
  4. Partition p is immediately removed from c's assigned partitions.
  5. During the next rebalance, the offset of c's assigned partitions is committed, but since p is no longer among those partitions, c's progress on p will not be committed (and will be lost). Hence KAFKA-3664.

 

This KIP proposes that this use case is handled according to the following semantics:

  1. Consumer c subscribes to topic t with one partition p.
  2. Consumer c consumes some messages from p.
  3. Consumer c unsubscribes from t.
  4. Partition p is marked for removal during the next rebalance.
  5. During the next rebalance, the offset of c's assigned partitions (including p) is committed, and then updated (resulting in p's removal).

Existing Semantics

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

...

Example: Regex Subscription

Existing Semantics (Already Follows Suggested Semantics)

Code Block
consumer.subscribe(Pattern.compile("..."))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Pattern.compile("f.."))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) 
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

...