Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  "Under Discussion" Committed

Discussion thread: here

JIRA: here KAFKA-4033

Released: <Kafka Version>0.10.1.0

This KIP was co-authored with Jason Gustafson and Ewen Cheslack-Postava.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The new consumer's implementation of topics subscribe and unsubscribe interfaces are modified so that they do . In the case of topic subscribe, it does not cause an immediate assignment update (this is how the regex subscribe interface is implemented). Instead; instead, the assignment remains valid until it has been revoked in the next rebalance. In the case of unsubscribe, consumed offsets are committed immediately before clearing the assignment. This is mostly about fixing an inconsistent behavior. The examples below shows show this inconsistency and how this KIP proposes to resolve it.

...

Code Block
consumer.subscribe(Arrays.asList("foo", "bar"))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Arrays.asList("foo"))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)] # notice the change

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) // # notice the change
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)] 

...

Code Block
consumer.subscribe(Pattern.compile("..."))
System.out.println(consumer.assignment()); // prints []

consumer.poll(0)
  --> onPartitionsRevoked([]) 
  --> onPartitionsAssigned([(foo, 0), (bar,0)])
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]
 
consumer.subscribe(Pattern.compile("f.."))
System.out.println(consumer.assignment()); // prints [(foo, 0), (bar, 0)]

consumer.poll(0)
  --> onPartitionsRevoked([(foo, 0), (bar, 0)]) // # notice the change
  --> onPartitionsAssigned([(foo, 0)])
System.out.println(consumer.assignment()); // prints [(foo, 0)]

...