Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The impact of these problems is that it is difficult to rely on lag monitoring using committed offsets. If a consumer is not subscribed to a topic, the lag will just grow. Also, it prevents the broker from cleaning up unused state.

Public Interfaces

Request/Response

We will add a new API to delete committed offsets.

...

Code Block
public enum Errors {
	...
	GROUP_SUBSCRIBED_TO_TOPIC(86, "The consumer group is actively subscribed to the topic", GroupSubscribedToTopicException::new);
	...
}

public class GroupSubscribedToTopicException extends ApiException {
    public GroupSubscribedToTopicException(String message) {
        super(message);
    }
}

Admin Client

This capability will be exposed through the admin client in the following API:

Code Block
interface Admin {

  /** 
   * Delete committed offsets for a set of partitions in a consumer group. This will
   * succeed at the partition level only if the group is not actively subscribed
   * to the corresponding topic.
   */
  DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
     String groupId, 
     Set<TopicPartition> partitions, 
     DeleteConsumerGroupOffsetsOptions options) 
}

@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsResult {
  public KafkaFuture<Void> partitionResult(TopicPartition partition);
  public KafkaFuture<Void> all();
}

@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsOptions extends AbstractOptions<DeleteConsumerGroupOffsetsOptions> {
}
    

Consumer Group Delete Offset options

Main action

--delete-offsets

This action should be executed independently from other Consumer Group actions.

Required Arguments

ID

Argument

Type

Description

1.--bootstrap-serverRequiredServer to connect to.
2.--groupRequiredConsumer Group ID.
3.--topicRequired

Topics/Partitions:

--topic <topic name>:<partition numbers>
ex: --topic topic1 --topic topic2:0,1,2

Execution

Group Error

Code Block
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group unknown --topic test:0,1 --topic foo

Error: Deletion of offsets failed due to: <error message>

Topic/Partition

Code Block
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group group1 --topic test:0,1 --topic foo

TOPIC                          PARTITION       STATUS
foo                            Not Provided    Error: <error message>
test                           0               Error: <error message>
test                           1               Error: <error message>

Metrics

We are also taking this opportunity to address a gap in metrics reporting. Currently we do not have any visibility about the rate of writes to the internal __consumer_offsets topic. With this KIP, there are four cases to distinguish, so we will add one meter for each.

...