Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussionAccepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-7689

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleAdminClient.java
    /**
     * <p>Alters offsets for the specified group.
     *
     * <p>This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The AlterOffsetsResult.
     */
    public AlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
        return alterOffsets(groupId, offsets, new AlterOffsetsOptions());
    }

    /**
     * <p>Alters offsets for the specified group.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterOffsetsResult.
     */
    public abstract AlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterOffsetsOptions options);

    /**
     * <p>List offset for the specified partitions.
     *
     * <p>This is a convenience method for {@link #listOffsets(Collection, ListOffsetsOptions)}
     *
     * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
     * @return The ListOffsetsResult.
     */
    public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
        return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
    }


    /**
     * <p>List offset for the specified partitions.
     *
     * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
     * @param options The options to use when retrieving the offsets
     * @return The ListOffsetsResult.
     */
    public abstract ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);

...

Code Block
languagejava
titleCommitOffsetsResultAlterOffsetsResult.java
/**
 * The result of the {@link AdminClient#alterOffsets(String, Map)} call.
 *
 * The API of this class is evolving, see {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class AlterOffsetsResult {

    public AlterOffsetsResult(Map<TopicPartitionKafkaFuture<Map<TopicPartition, KafkaFuture<Void>>KafkaFuture<Void>>> futures) {}

    public Map<TopicPartitionKafkaFuture<Map<TopicPartition, KafkaFuture<Void>>KafkaFuture<Void>>> values() {}

    public KafkaFuture<Void>KafkaFuture<Map<TopicPartition, Void>> all() {}
}


  • AlterOffsetsOptions

    Code Block
    languagejava
    titleCommitOffsetsOptionsAlterOffsetsOptions.java
    /**
     * Options for the {@link AdminClient#alterOffsets() call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class AlterOffsetsOptions extends AbstractOptions<AlterOffsetsOptions> {
    }


  • ListOffsetsResult

    Code Block
    languagejava
    titleListOffsetsResult.java
    /**
     * The result of the {@link AdminClient#listOffsets(String)} call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsResult {
    
        public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {}
    
        /**
         * Return a map from TopicPartition to futures which can be used to retrieve the offsets
         */
        public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {}
    
        /**
         * Return a future which succeeds only if offsets for all specified partitions have been successfully
         * retrieved.
         */
        public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {}
    
        static public class ListOffsetsResultInfo {
    
            ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {}
    
            long offset() {}
            long timestamp() {}
            Optional<Integer> leaderEpoch() {}
        }
    }


  • ListOffsetsOptions

    Code Block
    languagejava
    titleListOffsetsOptions.java
    /**
     * Options for {@link AdminClient#listOffsets()}.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> {
    
        private final StringIsolationLevel isolationLevel;
    
        // isolationLevel default to readREAD_uncommittedUNCOMMITTED
        public ListOffsetsOptions() {}
        public ListOffsetsOptions(StringIsolationLevel isolationLevel) {}
    
        public StringIsolationLevel isolationLevel() {}
    }


  • OffsetSpec

    Code Block
    languagejava
    titleOffsetSpec.java
    /**  
     * This class allows to specify the desired query when retrieving offsets
     */
    public class OffsetSpec {
    
        public OffsetSpec latest() {}
    
        public OffsetSpec earliest() {}
    
        public OffsetSpec forTimestamp(long timestamp) {}
    }


  • IsolationLevel (The existing IsolationLevel class is not part of the public API, hence we are creating this new file. The old class could then be removed)

    Code Block
    languagejava
    titleIsolationLevel.java
    /**
     * The isolation level 
     */
    public enum IsolationLevel {
        READ_UNCOMMITTED((byte) 0),
        READ_COMMITTED((byte) 1);
    }


Proposed Changes

The proposal of this KIP is to add methods to alter offsets for a group in the AdminClient. Alongside, methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions will be added.

...