Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As explained in the corresponding JIRA, the consumer behavior is not consistent when it comes to using an empty ("")  group idgroup id (which is the default):

  • If the consumer subscribes to a topic using the default group id ("") it this group id it will get an invalid group id error from the broker during a poll (JoinGroupRequest fails group validation if group id is "").
    Code Block
    languagejava
    titleInvalid use of consumer with default group id
    String topic = "foo"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ":
    Exception in thread "main" org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singleton(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record: records) System.out.println(String.format("%s: %s", record.key(), record.value())); }errors.InvalidGroupIdException: The configured groupId is invalid
  • For a stand-alone consumer, that assigns partitions to itself, using the default group id ("") does not cause an error. It can fetch last committed offset, seek to a partition offset and , fetch messages, and commit offsets without having to worry about the empty string group id. Code Block
    languagejava
    titleValid use of consumer with default group id
    String topic = "foo"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<String, String>(properties); TopicPartition tp = new TopicPartition(topic, 0); consumer.assign(Arrays.asList(tp)); consumer.seekToBeginning(Arrays.asList(tp)); System.out.println(consumer.position(tp)); consumer.seek(tp, 5); System.out.println(consumer.position(tp)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record: records) System.out.println(String.format("%s: %s", record.key(), record.value())); }

This has been summarized in table below.


When using subscribe(...)
(group management)
When using assign(...)
(standalone consumer)
group.id=""InvalidGroupException
  • Can fetch (last committed) offset
  • Can seek to specific offset
  • Can fetch from specific offset
  • Can commit offset

This can be confusing to users as they would likely expect the consumer treat the default group id the same way in these two cases. Using the default empty string group id is of "" is troublesome, and consumers are not normally expected to do that in a production setting. That is why this While in the long term the usage of "" as group id should be disallowed, this KIP suggests some limitation improvements to be enforced in this situationhow this group id works.

Public Interfaces

  • KafkaConsumer semantics will be slightly modified for the stand-alone consumer

...

Proposed Changes

...

  • to disallow fetching or committing offsets for the empty group id (""). If there is no current offset, consumer will use the auto.offset.reset configuration to determine the next offset to fetch from.
  • The default group id will also change to null so that consumers that want to use an empty ("") group id for coordinator-free consumption would have to explicitly specify that group id.

Proposed Changes

This is a detailed list of improvements proposed by this KIP:

  1. Changing the default group id to null 
  2. A new OffsetCommit API version in which, committing offsets for group id "" leads to an error. 

Change Default group.id to null

The default value of consumer config group.id will be changed from "" to null. This means that if standalone consumers want to keep using the group id "" they would have to explicitly provide the config (e.g. props.put(ConsumerConfig.GROUP_ID_CONFIG, "");).

This is how these two group ids will work


When using subscribe(...)
(group management)

When using assign(...)
(standalone consumer)

group.id=null (default)InvalidGroupException
  • Cannot fetch offsets (uses auto.offset.reset config)
  • Cannot commit offsets (see updated OffsetCommit protocol below) and enable.auto.commit is automatically set to false.
group.id=""InvalidGroupException (like before)
  • Can seek to and fetch from offset (like before)
  • Cannot fetch offset (uses auto.offset.reset config)
  • Can no longer commit offset (see updated OffsetCommit protocol below) and enable.auto.commit is automatically set to false.
    OffsetCommit continues to work as before for older versions

OffsetCommit API Enhancement

The current OffsetCommit protocol looks like this.

Code Block
titleOffsetCommit Protocol - Version 3
OffsetCommit Request (Version: 3) => group_id generation_id member_id retention_time [topics] 
  group_id => STRING
  generation_id => INT32
  member_id => STRING
  retention_time => INT64
  topics => topic [partitions] 
    topic => STRING
    partitions => partition offset metadata 
      partition => INT32
      offset => INT64
      metadata => NULLABLE_STRING


OffsetCommit Response (Version: 3) => throttle_time_ms [responses] 
  throttle_time_ms => INT32
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code 
      partition => INT32
      error_code => INT16

The new version of the protocol is the same, with an additional constraint that committing offsets when group_id is "", results in a new type of error: ILLEGAL_OFFSET_COMMIT.

Code Block
titleOffsetCommit Protocol - Version 4
OffsetCommit Request (Version: 4) => group_id generation_id member_id retention_time [topics] 
  group_id => STRING
  generation_id => INT32
  member_id => STRING
  retention_time => INT64
  topics => topic [partitions] 
    topic => STRING
    partitions => partition offset metadata 
      partition => INT32
      offset => INT64
      metadata => NULLABLE_STRING


OffsetCommit Response (Version: 4) => throttle_time_ms [responses] 
  throttle_time_ms => INT32
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code 
      partition => INT32
      error_code => INT16


Code Block
languagejava
titleOffset commit error when standalone consumer commits offsets in a group with id=""
    ...
    ILLEGAL_OFFSET_COMMIT(7273, "The standalone consumer belonging to a group with default id an empty (\"\") or null group id is not allowed to commit offsets",
        new ApiExceptionBuilder() {
            @Override
            public ApiException build(String message) {
                return new IllegalOffsetCommitException(message);
            }
    }
    ...

Consumers with the default group id ("") that use assign(...) to consume from hand-picked partitions will not be able to commit offsets. If they attempt an offset commit they will receive an error. A new error will be introduced in org.apache.kafka.common.protocol.Errors for this purpose.

Stand-alone consumers using the default group id will have to seek to an offset before start fetching; or the auto.offset.commit value will be used to determine the starting point of fetch. They will no longer be able to start from a previously committed fetch position.

Compatibility, Deprecation, and Migration Plan

The empty group id will be deprecated and its support will be removed in the next major release. For now, a server-side warning will be logged to indicate this deprecation.

Stand-alone consumers that used the default group id for fetching messages and committing offsets would no longer be able to do that. This should be acceptable because, in general, consumers are not expected to use the default group id for offset commit, which is a dangerous and troublesome practice. This KIP aims at closing the existing gap in using the default group id. Therefore, it is advised that these consumers start using valid group names for committing offsets.

Rejected Alternatives

  • Changing Keeping "" as the default group id from "" to null: This means still allowing the empty string be allowed for offset commit, and stand-alone consumers that relied on "" as their group id will be required to specifically specify that group id in consumer configuration. The option proposed in the KIP seems to be cleaner as it disallows a bad practice and should have minimal to no backward compatibility impact: The reason for not choosing this option is that using the empty group id could be unknowingly done at the moment (because of it being the default), and since it is not a recommended practice and could lead to issues, it will no longer be the default. Anyone who wants to keep using it should explicitly specify it and be aware if the restrictions set around it by this KIP.