Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Resetting the offsets of a consumer group is a relatively common operation. It's usually done to allow an application to reprocess data or otherwise to skip/drop records. Currently (thanks to KIP-122) it can be done using the kafka-consumer-group.sh tool however it would be nice to be able to to it directly with the AdminClient. Applications can more easily depend on the AdminClient API rather than running the ConsumerGroupsCommand class or worse reimplementing the logic. Additionally, this allows to update a consumer group position without having to join the group, seek all partitions and finally commit which is slow and requires many requests. Instead we can directly rely on the protocol and send a single commit request to the correct coordonator.
Additionally, at the moment, the kafka-consumer-group.sh can only run against a single group at a time and needs to starts a new consumer every time which is rather inefficient when resetting many groups. This tool could even be updated afterwards to use the AdminClient instead of having its own logicThis justifies integrating the consumer group reset functionality directly in the AdminClient.

Public Interfaces

Plan is to add 4 new methods to the AdminClient public API:

...

  • public CommitOffsetsResult commitConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {}
  • public abstract CommitOffsetsResult commitConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, CommitOffsetsOptions options);
  • public ListOffsetsResult listOffsets(Map<TopicPartition, Long> topicPartitionOffsets) {}
  • public abstract ListOffsetsResult listOffsets(Map<TopicPartition, Long> topicPartitionOffsets, ListOffsetsOptions options);

Regarding the implementation, instead of relying on KafkaConsumer to perform the commit, I propose to instead directly send OffsetCommit requests. This is principaly for 2 reasons

  • Be more performant. In order to commit offsets, a KafkaConsumer has to be created for each group. It first needs to subscribe or be assigned topic/partitions before being able to commit offsets. Additionaly we've need to pause consumption so calls to poll to perform the commit operation don't return records. That would require more connections/disconnections to the cluster, more network requests.
  • Avoid having the AdminClient depend on KafkaConsumer. Currently AdminClient does not depend on another client.

Compatibility, Deprecation, and Migration Plan

No impact or migration plan required as this proposal is only adding new methods and not changing any current behaviour.

Rejected Alternatives

  • Implement offset reset like it is currently done in ConsumerGroupCommand. As mentioned in the motivation section, this is inefficient.