THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * <p>Resets offsets for the specified group. * * <p>This is a convenience method for {@link #resetConsumerGroupOffsets(String, Map, ResetOffsetsOptions)} with default options. * See the overload for more details. * * @param groupId The group for which to reset offsets. * @param offsets A map of offsets by partition with associated metadata. * @return The ResetOffsetsResult. */ public ResetOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) { return resetOffsets(groupId, offsets, new ResetOffsetsOptions()); } /** * <p>Resets offsets for the specified group. * * <p>This operation is not transactional so it may succeed for some partitions while fail for others. * * @param groupId The group for which to reset offsets. * @param offsets A map of offsets by partition with associated metadata. * @param options The options to use when resetting the offsets. * @return The ResetOffsetsResult. */ public abstract ResetOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, ResetOffsetsOptions options); /** * <p>List offset for the specified partitions. * * <p>This is a convenience method for {@link #listOffsets(Collection, ListOffsetsOptions)} which returns the * latest uncommitted offsets. * * @param topicPartitionOffsets The mapping from partition to the timestampOffsetSpec to look up. * @return The ListOffsetsResult. */ public ListOffsetsResult listOffsets(Map<TopicPartition, Long>OffsetSpec> topicPartitionOffsets) { return listOffsets(topicPartitionOffsets, new ListOffsetsOptions.latestUncommitted()); } /** * <p>List offset for the specified partitions. * * @param topicPartitionOffsets The mapping from partition to the timestampOffsetSpec to look up. * @param options The options to use when retrieving the offsets * @return The ListOffsetsResult. */ public abstract ListOffsetsResult listOffsets(Map<TopicPartition, Long>OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options); |
...
CommitOffsetsOptions
Code Block language java title CommitOffsetsOptions.java /** * Options for the {@link AdminClient#resetOffsets(String, org.apache.kafka.clients.consumer.OffsetAndMetadata, ResetOffsetsOptions) call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ResetOffsetsOptions extends AbstractOptions<ResetOffsetsOptions> { }
ListOffsetsResult
Code Block language java title ListOffsetsResult.java /** * The result of the {@link AdminClient#listOffsets(String)} call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsResult { public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {} /** * Return a map from TopicPartition to futures which can be used to retrieve the offsets */ public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {} /** * Return a future which succeeds only if offsets for all specified partitions have been successfully * retrieved. */ public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {} static public class ListOffsetsResultInfo { ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {} long offset() {} long timestamp() {} Optional<Integer> leaderEpoch() {} } }
ListOffsetsOptions
Code Block language java title ListOffsetsOptions.java /** * Options for {@link AdminClient#listOffsets()}. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> { private final String isolationLevel; public ListOffsetsOptions(String isolationLevel) {} public String isolationLevel() {} }
OffsetSpec
Code Block language java /** * This class allows to specify the desired query when retrieving offsets */ public class OffsetSpec { public OffsetSpec latest() {} public OffsetSpec earliest() {} public OffsetSpec forTimestamp(long timestamp) {} }
Proposed Changes
The proposal of this KIP is to add methods to reset offsets for a group in the AdminClient
. Alongside, methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions will be added.
...
public ResettOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {}
public abstract ResettOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, ResetOffsetsOptions options);
public ListOffsetsResult listOffsets(Map<TopicPartition, Long> OffsetSpec>
topicPartitionOffsets
) {}public abstract ListOffsetsResult listOffsets(Map<TopicPartition, Long> OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
...