Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
...
The impact of these problems is that it is difficult to rely on lag monitoring using committed offsets. If a consumer is not subscribed to a topic, the lag will just grow. Also, it prevents the broker from cleaning up unused state.
Public Interfaces
Request/Response
We will add a new API to delete committed offsets.
...
Code Block |
---|
{ "apiKey": X, "type": "response", "name": "OffsetDeleteResponse", "validVersions": "0", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error." }, { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]OffsetDeleteResponseTopic", "versions": "0+", "about": "The responses for each topic.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The topic name." }, { "name": "Partitions", "type": "[]OffsetDeleteResponsePartition", "versions": "0+", "about": "The responses for each partition in the topic.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } ]} ]} ] } |
The error sent back to the client when a group is actively subscribed to the topic is documented bellow:
Code Block |
---|
public enum Errors {
...
GROUP_SUBSCRIBED_TO_TOPIC(86, "The consumer group is actively subscribed to the topic", GroupSubscribedToTopicException::new);
...
}
public class GroupSubscribedToTopicException extends ApiException {
public GroupSubscribedToTopicException(String message) {
super(message);
}
} |
Admin Client
This capability will be exposed through the admin client in the following API:
Code Block |
---|
interface Admin { /** * Delete committed offsets for a set of partitions in a consumer group. This will * succeed at the partition level only if the group is not actively subscribed * to the corresponding topic. */ DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets( String groupId, Set<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options) } @InterfaceStability.Evolving public class DeleteConsumerGroupOffsetsResult { public KafkaFuture<Void> partitionResult(TopicPartition partition); public KafkaFuture<Void> all(); } @InterfaceStability.Evolving public class DeleteConsumerGroupOffsetsOptions extends AbstractOptions<DeleteConsumerGroupOffsetsOptions> { } |
Consumer Group Delete Offset options
Main action
--delete-offsets
This action should be executed independently from other Consumer Group actions.
Required Arguments
ID | Argument | Type | Description |
---|---|---|---|
1. | --bootstrap-server | Required | Server to connect to. |
2. | --group | Required | Consumer Group ID. |
3. | --topic | Required | Topics/Partitions: --topic <topic name>:<partition numbers> |
Execution
Group Error
Code Block |
---|
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group unknown --topic test:0,1 --topic foo
Error: Deletion of offsets failed due to: <error message> |
Topic/Partition
Code Block |
---|
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group group1 --topic test:0,1 --topic foo
TOPIC PARTITION STATUS
foo Not Provided Error: <error message>
test 0 Error: <error message>
test 1 Error: <error message> |
Metrics
We are also taking this opportunity to address a gap in metrics reporting. Currently we do not have any visibility about the rate of writes to the internal __consumer_offsets topic. With this KIP, there are four cases to distinguish, so we will add one meter for each.
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-commits`: Marked for every committed offset. (Note this is different than the offset commit request rate which only gives visibility at the request level)
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-expirations`: Marked for every expired offset
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-deletions`: Marked for every administrative deletion
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=group-completed-rebalances`: Marked every time a rebalance completes (which causes a group metadata entry to be appended to the log).
...