Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAdopted

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8730
. Some of the metrics are documented in 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3556
.

...

The impact of these problems is that it is difficult to rely on lag monitoring using committed offsets. If a consumer is not subscribed to a topic, the lag will just grow. Also, it prevents the broker from cleaning up unused state.

Public Interfaces

Request/Response

We will add a new API to delete committed offsets.

...

Code Block
{
  "apiKey": X,
  "type": "response",
  "name": "OffsetDeleteResponse",
  "validVersions": "0",
  "fields": [    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]OffsetDeleteResponseTopic", "versions": "0+",
      "about": "The responses for each topic.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]OffsetDeleteResponsePartition", "versions": "0+",
        "about": "The responses for each partition in the topic.",  "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
      ]}
    ]}
  ]
}

The error sent back to the client when a group is actively subscribed to the topic is documented bellow:    

Code Block
public enum Errors {
	...
	GROUP_SUBSCRIBED_TO_TOPIC(86, "The consumer group is actively subscribed to the topic", GroupSubscribedToTopicException::new);
	...
}

public class GroupSubscribedToTopicException extends ApiException {
    public GroupSubscribedToTopicException(String message) {
        super(message);
    }
}

Admin Client

This capability will be exposed through the admin client in the following API:

Code Block
interface Admin {

  /** 
   * Delete committed offsets for a set of partitions in a consumer group. This will
   * succeed at the partition level only if the group is not actively subscribed
   * to the corresponding topic.
   */
  DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
     String groupId, 
     Set<TopicPartition> partitions, 
     DeleteConsumerGroupOffsetsOptions options) 
}

@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsResult {
  public KafkaFuture<Void> partitionResult(TopicPartition partition);
  public KafkaFuture<Void> all();
}

@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsOptions extends AbstractOptions<DeleteConsumerGroupOffsetsOptions> {
}
    

Consumer Group Delete Offset options

Main action

--delete-offsets

This action should be executed independently from other Consumer Group actions.

Required Arguments

ID

Argument

Type

Description

1.--bootstrap-serverRequiredServer to connect to.
2.--groupRequiredConsumer Group ID.
3.--topicRequired

Topics/Partitions:

--topic <topic name>:<partition numbers>
ex: --topic topic1 --topic topic2:0,1,2

Execution

Group Error

Code Block
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group unknown --topic test:0,1 --topic foo

Error: Deletion of offsets failed due to: <error message>

Topic/Partition

Code Block
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group group1 --topic test:0,1 --topic foo

TOPIC                          PARTITION       STATUS
foo                            Not Provided    Error: <error message>
test                           0               Error: <error message>
test                           1               Error: <error message>

Metrics

We are also taking this opportunity to address a gap in metrics reporting. Currently we do not have any visibility about the rate of writes to the internal __consumer_offsets topic. With this KIP, there are four cases to distinguish, so we will add one meter for each.

  • `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-commits`: Marked for every committed offset. (Note this is different than the offset commit request rate which only gives visibility at the request level)
  • `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-expirations`: Marked for every expired offset
  • `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-deletions`: Marked for every administrative deletion
  • `kafka.coordinatorserver:type=group-coordinator-metrics,name=group-completed-rebalances`: Marked every time a rebalance completes (which causes a group metadata entry to be appended to the log).

...