Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added admin client interfaces

...

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.KafkaShareConsumer .

Method signatureDescription
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
void acknowledge(ConsumerRecord record, AcknowledgeType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.
void commitAsync() Commits the acknowledgements for the records returned.
void commitSync()Commits the acknowledgements for the records returned.
void commitSync(Duration timeout)Commits the acknowledgements for the records returned.
Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.
ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using the subscribe API.
void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.
Set<String> subscription() Get the current subscription.
void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection) .
void wakeup() Wakeup the consumer.

...

Add the following methods on the org.apache.kafka.client.admin.AdminClient  interface.

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions)Delete offset information for a set of partitions in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(String groupIdMap<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.

...

  • Altering the offsets for a share group resets the Share-Partition Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

GroupType

  • -partitions in the share group (share-partitions)
  • The members of a share group are not assigned distinct sets of partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

Here are the method signatures.

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of partitions in a share group with the default
     * options. This will succeed at the partition level only if the group is not actively
     * subscribed to the corresponding topic.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, Map, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
        return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of partitions in a share group. This will
     * succeed at the partition level only if the group is not actively subscribed
     * to the corresponding topic.
     *
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition> partitions,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * List the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult.
     */
    default ListShareGroupsResult listShareGroups() {
        return listShareGroups(new ListShareGroupsOptions());
    }

    /**
     * List the share groups available in the cluster.
     *
     * @param options The options to use when listing the share groups.
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
 

AlterShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

AlterShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

DeleteShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsets(String, Set<TopicPartition>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

DeleteShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsets(String, Set<TopicPartition>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

DeleteShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a map from group id to futures which can be used to check the status of individual deletions.
     */
    public Map<String, KafkaFuture<Void>> deletedGroups() {
    }
}

DeleteShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupsOptions> {
}

DescribeShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsResult {
    /**
     * Return a future which yields all ShareGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
    }
}

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * A detailed description of a single share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupDescription {
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The share group state, or UNKNOWN if the state cannot be parsed.
   */
  public ShareGroupState state();

  /**
   * The share group coordinator, or null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

DescribeShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptions extends AbstractOptions<DescribeShareGroupsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
    /**
     * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
    }

    /**
     * Return a future which yields a map of topic partitions to OffsetAndMetadata objects for the specified group.
     */
    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
    }
}

ListShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> {
}

ListShareGroupOffsetsSpec

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpec {
  public ListShareGroupOffsetsSpec();

  /**
   * Set the topic partitions whose offsets are to be listed for a share group.
   */
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);

  /**
   * Returns the topic partitions whose offsets are to be listed for a share group.
   */
  Collection<TopicPartition> topicPartitions();
}

ListShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroups(ListShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsResult {
    /**
     * Returns a future that yields either an exception, or the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> all() {
    }

    /**
     * Returns a future which yields just the valid listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> valid() {
    }
 
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

ShareGroupListing

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * A listing of a share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupListing {
  public ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> state);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * The share group state.
   */
  public Optional<ShareGroupState> state();
}

ListShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * Options for {@link Admin#listShareGroups(ListShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends AbstractOptions<ListShareGroupsOptions> {
    /**
     * If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     */
    public ListShareGroupsOptions inStates(Set<ShareGroupState> states);

    /**
     * Return the list of States that are requested or empty if no states have been specified.
     */
    public Set<ShareGroupState> states();
}

GroupType

Another case is added Add another case to the org.apache.kafka.common.GroupType  enum:

Enum constantDescription
SHARE("share") Share group

ShareGroupState

A new enum org.apache.kafka.common.ShareGroupState  is added:

Enum constant

DEAD 

EMPTY 

STABLE 

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Command-line tools

kafka-share-groups.sh

...

This KIP introduces delivery counts and a maximum number of delivery attempts. An obvious future extension is the ability to copy records that failed to be delivered onto a dead-letter queue. This would of course give a way to handle poison messages without them permanently blocking processing.A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE ) on the group than a proper consumer ( READ ). This is a little more complicated because it needs to have a position independent of the SPSO so that it can traverse along the queue.

The focus in this KIP is on sharing rather than ordering. The concept can be extended to give key-based ordering so that partial ordering and fine-grained sharing can be achieved at the same time.

...

As a result, the KIP now proposes an entirely different class KafkaShareConsumer  which gives a very similar interface as KafkaConsumer  but eliminates the downsides listed above.