...
Resetting the offsets of a consumer group is a relatively common operation. It's usually done to allow an application to reprocess data or otherwise to skip/drop records. Currently (thanks to KIP-122) it can be done using the kafka-consumer-group.sh
tool however it would be nice to be able to to it directly with the AdminClient
. Applications can more easily depend on the AdminClient API rather than running the ConsumerGroupsCommand
class or worse reimplementing the logic.
Additionally, at the moment, the kafka-consumer-group.sh
can only run against a single group at a time and needs to starts a new consumer every time which is rather inefficient when resetting many groups. This tool could even be updated afterwards to use the AdminClient instead of having its own logic.
Public Interfaces
We will Plan is to add 4 new methods to the AdminClient public API:
...
CommitOffsetsOptions
Code Block language java title CommitOffsetsOptions.java /** * Options for the {@link AdminClient#commitOffsets(String, org.apache.kafka.clients.consumer.OffsetAndMetadata, CommitOffsetsOptions) call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class CommitOffsetsOptions extends AbstractOptions<CommitOffsetsOptions> { }
ListOffsetsResult
Code Block language java title ListOffsetsResult.java /** * The result of the {@link AdminClient#listOffsets(String)} call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsResult { public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {} /** * Return a map from TopicPartition to futures which can be used to retrieve the offsets */ public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {} /** * Return a future which succeeds only if offsets for all specified partitions have been successfully * retrieved. */ public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {} static public class ListOffsetsResultInfo { ListOffsetsResultInfo(long offset, long timestamp) {} long getOffset() {} long getTimestamp() {} } }
ListOffsetsOptions
Code Block language java title ListOffsetsOptions.java /** * Options for {@link AdminClient#listOffsets()}. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> { private final long timestamp; private final String isolationLevel; public static ListOffsetsOptions latestUncommitted() {} public static ListOffsetsOptions latestCommitted() {} public static ListOffsetsOptions earliestUncommitted() {} public static ListOffsetsOptions earliestCommitted() {} public ListOffsetsOptions(long timestamp, String isolationLevel) {} public long timestamp() {} public String isolationLevel() {} }
Proposed Changes
I propose adding The proposal of this KIP is to add methods to commit offsets for a group in the AdminClient
. Alongside, I will add methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions will be added.
The 4 new methods in AdminClient
:
...