...
The impact of these problems is that it is difficult to rely on lag monitoring using committed offsets. If a consumer is not subscribed to a topic, the lag will just grow. Also, it prevents the broker from cleaning up unused state.
Public Interfaces
Request/Response
We will add a new API to delete committed offsets.
...
Code Block |
---|
public enum Errors { ... GROUP_SUBSCRIBED_TO_TOPIC(86, "The consumer group is actively subscribed to the topic", GroupSubscribedToTopicException::new); ... } public class GroupSubscribedToTopicException extends ApiException { public GroupSubscribedToTopicException(String message) { super(message); } } |
Admin Client
This capability will be exposed through the admin client in the following API:
Code Block |
---|
interface Admin {
/**
* Delete committed offsets for a set of partitions in a consumer group. This will
* succeed at the partition level only if the group is not actively subscribed
* to the corresponding topic.
*/
DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
String groupId,
Set<TopicPartition> partitions,
DeleteConsumerGroupOffsetsOptions options)
}
@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsResult {
public KafkaFuture<Void> partitionResult(TopicPartition partition);
public KafkaFuture<Void> all();
}
@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsOptions extends AbstractOptions<DeleteConsumerGroupOffsetsOptions> {
}
|
Consumer Group Delete Offset options
Main action
--delete-offsets
This action should be executed independently from other Consumer Group actions.
Required Arguments
ID | Argument | Type | Description |
---|---|---|---|
1. | --bootstrap-server | Required | Server to connect to. |
2. | --group | Required | Consumer Group ID. |
3. | --topic | Required | Topics/Partitions: --topic <topic name>:<partition numbers> |
Execution
Group Error
Code Block |
---|
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group unknown --topic test:0,1 --topic foo
Error: Deletion of offsets failed due to: <error message> |
Topic/Partition
Code Block |
---|
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group group1 --topic test:0,1 --topic foo TOPIC PARTITION STATUS foo Not Provided Error: <error message> test 0 Error: <error message> test 1 Error: <error message> |
Metrics
We are also taking this opportunity to address a gap in metrics reporting. Currently we do not have any visibility about the rate of writes to the internal __consumer_offsets topic. With this KIP, there are four cases to distinguish, so we will add one meter for each.
...