This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateAccepted (3.3.0)

Discussion thread: here 

JIRA: NA, PR: https://github.com/apache/kafka/pull/12337

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

OffsetFetchRequest is used in both consumer (via consumer#committed ) and admin (via admin#listConsumerGroupOffsets ), and it has a boolean requireStable flag indicating whether to tolerate pending transactional offset commits in the group coordinator.

Today, admin client's listConsumerGroupOffsets call always set this flag to false, which makes it not useful for getting committed offsets with exactly once semantics.

This proposal aims at opening this option in admin client's ListConsumerGroupOffsetsOptions, so that admin client can also request only getting stable committed offsets that does not have pending transactional offsets.

Public Interfaces

As titled, we propose to add setter and getter for this boolean flag inside the ListConsumerGroupOffsetsOptions, so that the admin client can correspondingly construct the OffsetFetchRequest with this flag set when sending to the group coordinator.


public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {

    private List<TopicPartition> topicPartitions = null;
    private boolean requireStable = false;

    /**
     * Set the topic partitions to list as part of the result.
     * {@code null} includes all topic partitions.
     *
     * @param topicPartitions List of topic partitions to include
     * @return This ListGroupOffsetsOptions
     */
    public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) {
        this.topicPartitions = topicPartitions;
        return this;
    }

     public List<TopicPartition> topicPartitions() {
        return topicPartitions;
    }

    // new API
    public ListConsumerGroupOffsetsOptions requireStable(final boolean requireStable) {
        this.requireStable = requireStable;
        return this;
    }

    // new API 
    public boolean requireStable() {
        return requireStable;
    }
}



Proposed Changes

See above.

Compatibility, Deprecation, and Migration Plan

  • The added flag's default value is false , to be compatible with the old ListConsumerGroupOffsetsOption.


  • No labels