Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The impact of these problems is that it is difficult to rely on lag monitoring using committed offsets. If a consumer is not subscribed to a topic, the lag will just grow. Also, it prevents the broker from cleaning up unused state.
Public Interfaces
Request/Response
We will add a new API to delete committed offsets.
...
Code Block |
---|
{
"apiKey": X,
"type": "response",
"name": "OffsetDeleteResponse",
"validVersions": "0",
"fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Topics", "type": "[]OffsetDeleteResponseTopic", "versions": "0+",
"about": "The responses for each topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]OffsetDeleteResponsePartition", "versions": "0+",
"about": "The responses for each partition in the topic.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]}
]}
]
} |
The error sent back to the client when a group is actively subscribed to the topic is documented bellow:
Code Block |
---|
public enum Errors {
...
GROUP_SUBSCRIBED_TO_TOPIC(86, "The consumer group is actively subscribed to the topic", GroupSubscribedToTopicException::new);
...
}
public class GroupSubscribedToTopicException extends ApiException {
public GroupSubscribedToTopicException(String message) {
super(message);
}
} |
Admin Client
This capability will be exposed through the admin client in the following API:
Code Block |
---|
interface Admin {
/**
* Delete committed offsets for a set of partitions in a consumer group. This will
* succeed at the partition level only if the group is not actively subscribed
* to the corresponding topic.
*/
DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
String groupId,
Set<TopicPartition> partitions,
DeleteConsumerGroupOffsetsOptions options)
}
@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsResult {
public KafkaFuture<Void> partitionResult(TopicPartition partition);
public KafkaFuture<Void> all();
}
@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsOptions extends AbstractOptions<DeleteConsumerGroupOffsetsOptions> {
}
|
Consumer Group Delete Offset options
Main action
--delete-offsets
This action should be executed independently from other Consumer Group actions.
Required Arguments
ID | Argument | Type | Description |
---|---|---|---|
1. | --bootstrap-server | Required | Server to connect to. |
2. | --group | Required | Consumer Group ID. |
3. | --topic | Required | Topics/Partitions: --topic <topic name>:<partition numbers> |
Execution
Group Error
Code Block |
---|
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group unknown --topic test:0,1 --topic foo Error: Deletion of offsets failed due to: <error message> |
Topic/Partition
Code Block |
---|
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group group1 --topic test:0,1 --topic foo
TOPIC PARTITION STATUS
foo Not Provided Error: <error message>
test 0 Error: <error message>
test 1 Error: <error message> |
Metrics
We are also taking this opportunity to address a gap in metrics reporting. Currently we do not have any visibility about the rate of writes to the internal __consumer_offsets topic. With this KIP, there are four cases to distinguish, so we will add one meter for each.
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-commits`: Marked for every committed offset. (Note this is different than the offset commit request rate which only gives visibility at the request level)
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-expirations`: Marked for every expired offset
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=offset-deletions`: Marked for every administrative deletion
- `kafka.coordinatorserver:type=group-coordinator-metrics,name=group-completed-rebalances`: Marked every time a rebalance completes (which causes a group metadata entry to be appended to the log).
...