You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: Under discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-7689

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Resetting the offsets of a consumer group is a relatively common operation. It's usually done to allow an application to reprocess data or otherwise to skip/drop records. Currently (thanks to KIP-122) it can be done using the kafka-consumer-group.sh tool however it would be nice to be able to to it directly with the AdminClient. Applications can more easily depend on the AdminClient API rather than running the ConsumerGroupsCommand class or worse reimplementing the logic.

Additionally, at the moment, the kafka-consumer-group.sh can only run against a single group at a time and needs to starts a new consumer every time which is rather inefficient when resetting many groups. This tool could even be updated afterwards to use the AdminClient instead of having its own logic.

Public Interfaces

We will add 4 new methods to the AdminClient public API:


AdminClient.java
    /**
     * <p>Commits offsets for the specified group.
     *
     * <p>This is a convenience method for {@link #commitOffsets(String, Map, CommitOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to commit offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The CommitOffsetsResult.
     */
    public CommitOffsetsResult commitOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
        return commitOffsets(groupId, offsets, new CommitOffsetsOptions());
    }

    /**
     * <p>Commits offsets for the specified group.
     *
     * <p>This operation is not transactional so it may succeed for some partition while fail for others.
     *
     * @param groupId The group for which to commit offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @param options The options to use when committing the offsets.
     * @return The CommitOffsetsResult.
     */
    public abstract CommitOffsetsResult commitOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, CommitOffsetsOptions options);

    /**
     * <p>List offset for the specified partitions.
     *
     * <p>This is a convenience method for {@link #listOffsets(Collection, ListOffsetsOptions)} which returns the
     * latest uncommitted offsets.
     *
     * @param topicPartitions A list of {@link TopicPartition} to retrieve offsets for.
     * @return The ListOffsetsResult.
     */
    public ListOffsetsResult listOffsets(Collection<TopicPartition> topicPartitions) {
        return listOffsets(topicPartitions, ListOffsetsOptions.latestUncommitted());
    }


    /**
     * <p>List offset for the specified partitions.
     *
     * @param topicPartitions A list of {@link TopicPartition} to retrieve offsets for.
     * @param options The options to use when retrieving the offsets
     * @return The ListOffsetsResult.
     */
    public abstract ListOffsetsResult listOffsets(Collection<TopicPartition> topicPartitions, ListOffsetsOptions options);



We will also add companion objects under org.apache.kafka.clients.admin for these methods:

  • CommitOffsetsResult:
CommitOffsetsResult.java
/**
 * The result of the {@link AdminClient#commitOffsets(String, Map)} call.
 *
 * The API of this class is evolving, see {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class CommitOffsetsResult {

    public CommitOffsetsResult(Map<TopicPartition, KafkaFuture<Void>> futures) {}

    public Map<TopicPartition, KafkaFuture<Void>> values() {}

    public KafkaFuture<Void> all() {}
}
  • CommitOffsetsOptions


    CommitOffsetsOptions.java
    /**
     * Options for the {@link AdminClient#commitOffsets(String, org.apache.kafka.clients.consumer.OffsetAndMetadata, CommitOffsetsOptions) call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class CommitOffsetsOptions extends AbstractOptions<CommitOffsetsOptions> {
    }
  • ListOffsetsResult


    ListOffsetsResult.java
    /**
     * The result of the {@link AdminClient#listOffsets(String)} call.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsResult {
    
        public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {}
    
        /**
         * Return a map from TopicPartition to futures which can be used to retrieve the offsets
         */
        public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {}
    
        /**
         * Return a future which succeeds only if offsets for all specified partitions have been successfully
         * retrieved.
         */
        public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {}
    
        static public class ListOffsetsResultInfo {
    
            ListOffsetsResultInfo(long offset, long timestamp) {}
    
            long getOffset() {}
            long getTimestamp() {}
        }
    }
  • ListOffsetsOptions


    ListOffsetsOptions.java
    /**
     * Options for {@link AdminClient#listOffsets()}.
     *
     * The API of this class is evolving, see {@link AdminClient} for details.
     */
    @InterfaceStability.Evolving
    public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> {
    
        private final long timestamp;
        private final String isolationLevel;
    
        public static ListOffsetsOptions latestUncommitted() {}
    
        public static ListOffsetsOptions latestCommitted() {}
    
        public static ListOffsetsOptions earliestUncommitted() {}
    
        public static ListOffsetsOptions earliestCommitted() {}
    
        public ListOffsetsOptions(long timestamp, String isolationLevel) {}
    
        public long timestamp() {}
    
        public String isolationLevel() {}
    }

Proposed Changes

I propose adding methods to commit offsets for a group in the AdminClient. Alongside, I will add methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions.

The 4 new methods in AdminClient:

  • public CommitOffsetsResult commitOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {}
  • public abstract CommitOffsetsResult commitOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, CommitOffsetsOptions options);
  • public ListOffsetsResult listOffsets(Collection<TopicPartition> topicPartitions) {}
  • public abstract ListOffsetsResult listOffsets(Collection<TopicPartition> topicPartitions, ListOffsetsOptions options);

Compatibility, Deprecation, and Migration Plan

No impact or migration plan required as this proposal is only adding new methods.

Rejected Alternatives

None

  • No labels