Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAdminClient.java
    /**
     * <p>Resets offsets for the specified group.
     *
     * <p>This is a convenience method for {@link #resetConsumerGroupOffsets(String, Map, ResetOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to reset offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The ResetOffsetsResult.
     */
    public ResetOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
        return resetOffsets(groupId, offsets, new ResetOffsetsOptions());
    }

    /**
     * <p>Resets offsets for the specified group.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to reset offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @param options The options to use when resetting the offsets.
     * @return The ResetOffsetsResult.
     */
    public abstract ResetOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, ResetOffsetsOptions options);

    /**
     * <p>List offset for the specified partitions.
     *
     * <p>This is a convenience method for {@link #listOffsets(Collection, ListOffsetsOptions)} which returns the
     * latest uncommitted offsets.
     *
     * @param topicPartitionOffsets The mapping from partition to the timestampOffsetSpec to look up.
     * @return The ListOffsetsResult.
     */
    public ListOffsetsResult listOffsets(Map<TopicPartition, Long>OffsetSpec> topicPartitionOffsets) {
        return listOffsets(topicPartitionOffsets, new ListOffsetsOptions.latestUncommitted());
    }


    /**
     * <p>List offset for the specified partitions.
     *
     * @param topicPartitionOffsets The mapping from partition to the timestampOffsetSpec to look up.
     * @param options The options to use when retrieving the offsets
     * @return The ListOffsetsResult.
     */
    public abstract ListOffsetsResult listOffsets(Map<TopicPartition, Long>OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);

...

  • CommitOffsetsOptions


    Code Block
    languagejava
    titleCommitOffsetsOptions.java
    /**
     * Options for the {@link AdminClient#resetOffsets(String, org.apache.kafka.clients.consumer.OffsetAndMetadata, ResetOffsetsOptions) call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ResetOffsetsOptions extends AbstractOptions<ResetOffsetsOptions> {
    }


  • ListOffsetsResult


    Code Block
    languagejava
    titleListOffsetsResult.java
    /**
     * The result of the {@link AdminClient#listOffsets(String)} call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsResult {
    
        public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {}
    
        /**
         * Return a map from TopicPartition to futures which can be used to retrieve the offsets
         */
        public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {}
    
        /**
         * Return a future which succeeds only if offsets for all specified partitions have been successfully
         * retrieved.
         */
        public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {}
    
        static public class ListOffsetsResultInfo {
    
            ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {}
    
            long offset() {}
            long timestamp() {}
            Optional<Integer> leaderEpoch() {}
        }
    }


  • ListOffsetsOptions



    Code Block
    languagejava
    titleListOffsetsOptions.java
    /**
     * Options for {@link AdminClient#listOffsets()}.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> {
    
        private final String isolationLevel;
    
        public ListOffsetsOptions(String isolationLevel) {}
    
        public String isolationLevel() {}
    }


  • OffsetSpec


    Code Block
    languagejava
    /**  
     * This class allows to specify the desired query when retrieving offsets
     */
    public class OffsetSpec {
    
        public OffsetSpec latest() {}
    
        public OffsetSpec earliest() {}
    
        public OffsetSpec forTimestamp(long timestamp) {}
    }


Proposed Changes

The proposal of this KIP is to add methods to reset offsets for a group in the AdminClient. Alongside, methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions will be added.

...

  • public ResettOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {}
  • public abstract ResettOffsetsResult resetConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, ResetOffsetsOptions options);
  • public ListOffsetsResult listOffsets(Map<TopicPartition, Long> OffsetSpec> topicPartitionOffsets) {}
  • public abstract ListOffsetsResult listOffsets(Map<TopicPartition, Long> OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);

...