This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Vote
Discussion thread: here
JIRA: NA
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
OffsetFetchRequest is used in both consumer (via consumer#committed
) and admin (via admin#listConsumerGroupOffsets
), and it has a boolean requireStable flag indicating whether to tolerate pending transactional offset commits in the group coordinator.
Today, admin client's listConsumerGroupOffsets
call always set this flag to false, which makes it not useful for getting committed offsets with exactly once semantics.
This proposal aims at opening this option in admin client's ListConsumerGroupOffsetsOptions, so that admin client can also request only getting stable committed offsets that does not have pending transactional offsets.
Public Interfaces
As titled, we propose to add setter and getter for this boolean flag inside the ListConsumerGroupOffsetsOptions, so that the admin client can correspondingly construct the OffsetFetchRequest with this flag set when sending to the group coordinator.
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> { private List<TopicPartition> topicPartitions = null; private boolean requireStable = false; /** * Set the topic partitions to list as part of the result. * {@code null} includes all topic partitions. * * @param topicPartitions List of topic partitions to include * @return This ListGroupOffsetsOptions */ public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) { this.topicPartitions = topicPartitions; return this; } public List<TopicPartition> topicPartitions() { return topicPartitions; } // new API public ListConsumerGroupOffsetsOptions requireStable(final boolean requireStable) { this.requireStable = requireStable; return this; } // new API public boolean shouldRequireStable() { return requireStable; } }
Proposed Changes
See above.
Compatibility, Deprecation, and Migration Plan
- The added flag's default value is
false
, to be compatible with the old ListConsumerGroupOffsetsOption.