Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Draft Under Discussion

Discussion thread: here

...

  • If the consumer subscribes to a topic using the default group id ("") it will get an invalid group id error from the broker during a poll (JoinGroupRequest fails group validation if group id is "").

    Code Block
    languagejava
    titleInvalid use of consumer with default group id
    String topic = "foo";
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put("key.deserializer"ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer"ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
    KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Collections.singleton(topic));
    
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord record: records)
            System.out.println(String.format("%s: %s", record.key(), record.value()));
    }
  • For a stand-alone consumer, that assigns partitions to itself, using the default group id ("") does not cause an error. It can seek to a partition offset and fetch messages without having to worry about the empty string group id.

    Code Block
    languagejava
    titleValid use of consumer with default group id
    String topic = "foo";
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put("key.deserializer"ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer"ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
    
    KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
    
    
    TopicPartition tp = new TopicPartition(topic, 0);
    consumer.assign(Arrays.asList(tp));
    consumer.seekToBeginning(Arrays.asList(tp));
    System.out.println(consumer.position(tp));
    consumer.seek(tp, 5);
    System.out.println(consumer.position(tp));
    
    
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord record: records)
            System.out.println(String.format("%s: %s", record.key(), record.value()));
    }

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

This can be confusing to users as they would likely expect the consumer treat the default group id the same way in these two cases. Using the default empty string group id is troublesome, and consumers are not normally expected to do that in a production setting. That is why this KIP suggests some limitation to be enforced in this situation.

Public Interfaces

KafkaConsumer semantics will be slightly modified for the stand-alone consumer (that uses the assign(...) interface) to disallow committing offsets for the default group id ("").

Proposed Changes

Consumers with the default group id ("") that use assign(...) to consume from hand-picked partitions will not be able to commit offsets. If they attempt an offset commit they will receive an error. Stand-alone consumers using the default group id will have to seek to an offset before start fetching; or the auto.offset.commit value will be used to determine the starting point of fetch. They will no longer be able to start from a previously committed fetch positionDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

...

Stand-alone consumers that used the default group id for fetching messages and committing offsets would no longer be able to do that. This should be acceptable because, in general, consumers are not expected to use the default group id for offset commit, which is a dangerous and troublesome practice. This KIP aims at closing the existing gap in using the default group id. Therefore, it is advised that these consumers start using valid group names for committing offsets.

Rejected Alternatives

  • Changing the default group id from "" to null: This means still allowing the empty string be allowed for offset commit, and stand-alone consumers that relied on "" as their group id will be required to specifically specify that group id in consumer configuration. The option proposed in the KIP seems to be cleaner as it disallows a bad practice and should have minimal to no backward compatibility impact.