Table of Contents |
---|
Status
Current state: Under discussionAccepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-7689
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Alters offsets for the specified group.
*
* <p>This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata.
* @return The AlterOffsetsResult.
*/
public AlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
return alterOffsets(groupId, offsets, new AlterOffsetsOptions());
}
/**
* <p>Alters offsets for the specified group.
*
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterOffsetsResult.
*/
public abstract AlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterOffsetsOptions options);
/**
* <p>List offset for the specified partitions.
*
* <p>This is a convenience method for {@link #listOffsets(Collection, ListOffsetsOptions)}
*
* @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
* @return The ListOffsetsResult.
*/
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
}
/**
* <p>List offset for the specified partitions.
*
* @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
* @param options The options to use when retrieving the offsets
* @return The ListOffsetsResult.
*/
public abstract ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The result of the {@link AdminClient#alterOffsets(String, Map)} call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class AlterOffsetsResult { public AlterOffsetsResult(Map<TopicPartitionKafkaFuture<Map<TopicPartition, KafkaFuture<Void>>KafkaFuture<Void>>> futures) {} public Map<TopicPartitionKafkaFuture<Map<TopicPartition, KafkaFuture<Void>>KafkaFuture<Void>>> values() {} public KafkaFuture<Void>KafkaFuture<Map<TopicPartition, Void>> all() {} } |
AlterOffsetsOptions
Code Block language java title CommitOffsetsOptionsAlterOffsetsOptions.java /** * Options for the {@link AdminClient#alterOffsets() call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class AlterOffsetsOptions extends AbstractOptions<AlterOffsetsOptions> { }
ListOffsetsResult
Code Block language java title ListOffsetsResult.java /** * The result of the {@link AdminClient#listOffsets(String)} call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsResult { public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {} /** * Return a map from TopicPartition to futures which can be used to retrieve the offsets */ public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {} /** * Return a future which succeeds only if offsets for all specified partitions have been successfully * retrieved. */ public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {} static public class ListOffsetsResultInfo { ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {} long offset() {} long timestamp() {} Optional<Integer> leaderEpoch() {} } }
ListOffsetsOptions
Code Block language java title ListOffsetsOptions.java /** * Options for {@link AdminClient#listOffsets()}. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> { private final StringIsolationLevel isolationLevel; // isolationLevel default to readREAD_uncommittedUNCOMMITTED public ListOffsetsOptions() {} public ListOffsetsOptions(StringIsolationLevel isolationLevel) {} public StringIsolationLevel isolationLevel() {} }
OffsetSpec
Code Block language java title OffsetSpec.java /** * This class allows to specify the desired query when retrieving offsets */ public class OffsetSpec { public OffsetSpec latest() {} public OffsetSpec earliest() {} public OffsetSpec forTimestamp(long timestamp) {} }
IsolationLevel (The existing IsolationLevel class is not part of the public API, hence we are creating this new file. The old class could then be removed)
Code Block language java title IsolationLevel.java /** * The isolation level */ public enum IsolationLevel { READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1); }
Proposed Changes
The proposal of this KIP is to add methods to alter offsets for a group in the AdminClient
. Alongside, methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions will be added.
...