...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Alters offsets for the specified group.
*
* <p>This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata.
* @return The AlterOffsetsResult.
*/
public AlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
return alterOffsets(groupId, offsets, new AlterOffsetsOptions());
}
/**
* <p>Alters offsets for the specified group.
*
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterOffsetsResult.
*/
public abstract AlterOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterOffsetsOptions options);
/**
* <p>List offset for the specified partitions.
*
* <p>This is a convenience method for {@link #listOffsets(Collection, ListOffsetsOptions)}
*
* @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
* @return The ListOffsetsResult.
*/
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
}
/**
* <p>List offset for the specified partitions.
*
* @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
* @param options The options to use when retrieving the offsets
* @return The ListOffsetsResult.
*/
public abstract ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The result of the {@link AdminClient#alterOffsets(String, Map)} call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class AlterOffsetsResult { public AlterOffsetsResult(Map<TopicPartition, KafkaFuture<Void>> futures) {} public Map<TopicPartition, KafkaFuture<Void>> values() {} public KafkaFuture<Void>KafkaFuture<Map<TopicPartition, Void>> all() {} } |
AlterOffsetsOptions
Code Block language java title CommitOffsetsOptions.java /** * Options for the {@link AdminClient#alterOffsets() call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class AlterOffsetsOptions extends AbstractOptions<AlterOffsetsOptions> { }
ListOffsetsResult
Code Block language java title ListOffsetsResult.java /** * The result of the {@link AdminClient#listOffsets(String)} call. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsResult { public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {} /** * Return a map from TopicPartition to futures which can be used to retrieve the offsets */ public Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> values() {} /** * Return a future which succeeds only if offsets for all specified partitions have been successfully * retrieved. */ public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {} static public class ListOffsetsResultInfo { ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {} long offset() {} long timestamp() {} Optional<Integer> leaderEpoch() {} } }
ListOffsetsOptions
Code Block language java title ListOffsetsOptions.java /** * Options for {@link AdminClient#listOffsets()}. * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> { private final StringIsolationLevel isolationLevel; // isolationLevel default to readREAD_uncommittedUNCOMMITTED public ListOffsetsOptions() {} public ListOffsetsOptions(StringIsolationLevel isolationLevel) {} public StringIsolationLevel isolationLevel() {} }
OffsetSpec
Code Block language java /** * This class allows to specify the desired query when retrieving offsets */ public class OffsetSpec { public OffsetSpec latest() {} public OffsetSpec earliest() {} public OffsetSpec forTimestamp(long timestamp) {} }
IsolationLevel (The existing IsolationLevel class is not part of the public API, hence we are creating this new file. The old class could then be removed)
Code Block language java /** * The isolation level */ public enum IsolationLevel { READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1); }
Proposed Changes
The proposal of this KIP is to add methods to alter offsets for a group in the AdminClient
. Alongside, methods to retrieve log offsets (ListOffset API) so users can easily retrieve start and end offsets or offsets for a timestamp for partitions will be added.
...