Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussionAccepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-7689

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Resetting the offsets of a consumer group is a relatively common operation. It's usually done to allow an application to reprocess data or otherwise to skip/drop records. Currently (thanks to KIP-122) it can be done using the kafka-consumer-group.sh tool however it would be nice to be able to to it directly with the AdminClient. Applications can more easily depend on the AdminClient API rather than running the ConsumerGroupsCommand class or worse reimplementing the logic. The ability to retrieve committed offsets is already available in the AdminClient and has proved very useful. Additionally KIP-496 is adding the ability to delete committed offsets. This justifies integrating the consumer group reset/alter functionality directly in the AdminClient.

Public Interfaces

Plan is to add 4 new methods to the AdminClient public API:

...

Code Block
languagejava
titleAdminClient.java
    /**
     * <p>Commits<p>Alters offsets for the specified group.
     *
     * <p>This is a convenience method for {@link #commitOffsets#alterConsumerGroupOffsets(String, Map, CommitOffsetsOptionsAlterOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to commitalter offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The CommitOffsetsResultAlterOffsetsResult.
     */
    public CommitOffsetsResultAlterOffsetsResult commitConsumerGroupOffsetsalterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
        return commitOffsetsalterOffsets(groupId, offsets, new CommitOffsetsOptionsAlterOffsetsOptions());
    }

    /**
     * <p>Commits<p>Alters offsets for the specified group.
     *
     * <p>This operation is not transactional so it may succeed for some partitionpartitions while fail for others.
     *
     * @param groupId The group for which to commitalter offsets.
     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
     * @param options The options to use when committingaltering the offsets.
     * @return The CommitOffsetsResultAlterOffsetsResult.
     */
    public abstract CommitOffsetsResultAlterOffsetsResult commitConsumerGroupOffsetsalterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, CommitOffsetsOptionsAlterOffsetsOptions options);

    /**
     * <p>List offset for the specified partitions.
     *
     * <p>This is a convenience method for {@link #listOffsets(Collection, ListOffsetsOptions)} which returns the
     * latest uncommitted offsets.
     *
     * @param topicPartitionOffsets The mapping from partition to the timestampOffsetSpec to look up.
     * @return The ListOffsetsResult.
     */
    public ListOffsetsResult listOffsets(Map<TopicPartition, Long>OffsetSpec> topicPartitionOffsets) {
        return listOffsets(topicPartitionOffsets, new ListOffsetsOptions.latestUncommitted());
    }


    /**
     * <p>List offset for the specified partitions.
     *
     * @param topicPartitionOffsets The mapping from partition to the timestampOffsetSpec to look up.
     * @param options The options to use when retrieving the offsets
     * @return The ListOffsetsResult.
     */
    public abstract ListOffsetsResult listOffsets(Map<TopicPartition, Long>OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);

...

We will also add companion objects under org.apache.kafka.clients.admin for these methods:

  • CommitOffsetsResultAlterOffsetsResult:
Code Block
languagejava
titleCommitOffsetsResultAlterOffsetsResult.java
/**
 * The result of the {@link AdminClient#commitOffsetsAdminClient#alterOffsets(String, Map)} call.
 *
 * The API of this class is evolving, see {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class CommitOffsetsResultAlterOffsetsResult {

    public CommitOffsetsResultAlterOffsetsResult(Map<TopicPartitionKafkaFuture<Map<TopicPartition, KafkaFuture<Void>>KafkaFuture<Void>>> futures) {}

    public Map<TopicPartitionKafkaFuture<Map<TopicPartition, KafkaFuture<Void>>KafkaFuture<Void>>> values() {}

    public KafkaFuture<Map<TopicPartition, KafkaFuture<Void>Void>> all() {}
}


  • CommitOffsetsOptions

    AlterOffsetsOptions

    Code Block
    languagejava
    titleCommitOffsetsOptionsAlterOffsetsOptions.java
    /**
     * Options for the {@link AdminClient#commitOffsets(String, org.apache.kafka.clients.consumer.OffsetAndMetadata, CommitOffsetsOptionsAdminClient#alterOffsets() call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class CommitOffsetsOptionsAlterOffsetsOptions extends AbstractOptions<CommitOffsetsOptions>AbstractOptions<AlterOffsetsOptions> {
    }


  • ListOffsetsResult

    Code Block
    languagejava
    titleListOffsetsResult.java
    /**
     * The result of the {@link AdminClient#listOffsets(String)} call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsResult {
    
        public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {}
    
        /**
         * Return a map from TopicPartition to futures which can be used to retrieve the offsets
         */
        public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {}
    
        /**
         * Return a future which succeeds only if offsets for all specified partitions have been successfully
         * retrieved.
         */
        public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {}
    
        static public class ListOffsetsResultInfo {
    
            ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {}
    
            long offset() {}
            long timestamp() {}
            Optional<Integer> leaderEpoch() {}
        }
    }


  • ListOffsetsOptions

    Code Block
    languagejava
    titleListOffsetsOptions.java
    /**
     * Options for {@link AdminClient#listOffsets()}.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> {
    
        private final StringIsolationLevel isolationLevel;
    
        // isolationLevel default to READ_UNCOMMITTED
        public ListOffsetsOptions() {}
        public ListOffsetsOptions(StringIsolationLevel isolationLevel) {}
    
        public StringIsolationLevel isolationLevel() {}
    }


  • OffsetSpec

    Code Block
    languagejava
    titleOffsetSpec.java
    /**  
     * This class allows to specify the desired query when retrieving offsets
     */
    public class OffsetSpec {
    
        public OffsetSpec latest() {}
    
        public OffsetSpec earliest() {}
    
        public OffsetSpec forTimestamp(long timestamp) {}
    }


  • IsolationLevel (The existing IsolationLevel class is not part of the public API, hence we are creating this new file. The old class could then be removed)

    Code Block
    languagejava
    titleIsolationLevel.java
    /**
     * The isolation level 
     */
    public enum IsolationLevel {
        READ_UNCOMMITTED((byte) 0),
        READ_COMMITTED((byte) 1);
    }


Proposed Changes

The proposal of this KIP is to add methods to commit alter offsets for a group in the AdminClient. Alongside, methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions will be added.

The 4 new methods in AdminClient:

  • public CommitOffsetsResult commitConsumerGroupOffsetsAlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {}
  • public abstract CommitOffsetsResult commitConsumerGroupOffsetsAlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, CommitOffsetsOptions AlterOffsetsOptions options);
  • public ListOffsetsResult listOffsets(Map<TopicPartition, Long> OffsetSpec> topicPartitionOffsets) {}
  • public abstract ListOffsetsResult listOffsets(Map<TopicPartition, Long> OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);

Regarding the implementation, instead of relying on KafkaConsumer to perform the commitalter, I propose to instead directly send OffsetCommit requests. This is principaly principally for 2 reasons

  • Be more performant. In order to commit alter offsets, a KafkaConsumer has to be created for each group. It first needs to subscribe or be assigned topic/partitions before being able to commit alter offsets. Additionaly Additionally we've need to pause consumption so calls to poll to perform the commit alter operation don't return records. That would require more connections/disconnections to the cluster, more network requests.
  • Avoid having the AdminClient depend on KafkaConsumer. Currently AdminClient does not depend on another client.

Compatibility, Deprecation, and Migration Plan

No impact or migration plan required as this proposal is only adding new methods and not changing any current behaviour.

Rejected Alternatives

  • Implement offset reset like it is currently done in ConsumerGroupCommand. As mentioned in the motivation section, this is inefficient.