Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Group types

...

  • The consumers in a share group cooperatively consume records with partitions that may be assigned to multiple consumers

  • The number of consumers in a share group can exceed the number of partitions

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records

Share groups are a new kind type of group, alongside the existing consumer groups, adding "share"  to the existing group types of "consumer"  and "classic" .

All consumers in the same share group subscribed to the same topic cooperatively consume the records of that topic. If a topic is accessed by consumers in more than one share group, each share group cooperatively consumes from that topic independently of the other share groups.

...

The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, as naturally happens when the locks time out. This limit is controlled by the broker configuration property group.share.record.lock.partition.limit configuration parameter . By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

...

  • It fetches the records from the replica manager from the local replica

  • It manages and persists the states of the in-flight records

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

A share group is a new type of group, adding "share"  to the existing group types of "generic"  and "consumer".

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors. This KIP introduces just one assignor, org.apache.kafka.server.group.share.SimpleAssignor , which assigns all partitions of all subscribed topics to all members.

Relationship with consumer groups

Consumer groups and share groups exist in the same namespace in a Kafka cluster. As a result, if there’s a consumer group with a particular name, you cannot create a share group with the same name, and vice versa. But consumer groups and share groups are quite different in terms of use, so attempts to perform operations for one kind of group on a group of the incorrect type will fail with a GroupIdNotFoundException . The new AdminClient.listGroups  method gives a way of listing groups of all types.

Because consumer groups and share groups are both created automatically on first use, the type of group that is created depends upon how the group name was first used. As a result, it is helpful to be able to ensure that a group of a particular name can only be created with a particular type. This is achieved by defining a group configuration property group.type , using the kafka-configs.sh  tool or the AdminClient.incrementalAlterConfigs  method. For example, you could use the following command to ensure that the group ID "G1" is to be used for a share group only.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name group --entity-name G1 --alter --add-config group.type=share

If a regular Kafka consumer then attempts to use "G1" as a consumer group, the exception "InconsistentGroupProtocolException" will be thrown.

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors. This KIP introduces just one assignor, org.apache.kafka.server.group.share.SimpleAssignor , which assigns all partitions of all subscribed topics to all members.

In the future, a more sophisticated share group assignor could balance the number of consumers In the future, a more sophisticated share group assignor could balance the number of consumers assigned to the partitions, and it may well revoke partitions from existing members in order to improve the balance. The simple assignor isn’t that smart.

...

Here are the previous examples, showing the control records which record the cumulative state durably. Note that any SHARE_DELTA could be replaced with a SHARE_CHECKPOINT.

Operation

State changes

Cumulative state

Control records

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
SHARE_CHECKPOINT offset 130:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 110,
  "States": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
SHARE_DELTA offset 131:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 130,
  "States": [
    {
      "BaseOffset": 100,
      "LastOffset": 109,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 132:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 131,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.

Acknowledge 119

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
SHARE_DELTA offset 133:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 132,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 134:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 133,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 135:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 134,
  "States": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111,112

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 136:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 135,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111,112

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 136,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}

or alternatively, taking a new checkpoint:

Code Block
SHARE_CHECKPOINT offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 2,
  "StartOffset": 120,
  "EndOffset": 120,
  "States": []
}

Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

...

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer  with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer . The interface stability is Evolving .

Method signatureDescription
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync()  or commitAsync()  call.void acknowledge(ConsumerRecord
KafkaShareConsumer(Map<String, Object> configs)
Constructor
KafkaShareConsumer(Properties properties)
Constructor
KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync(), commitAsync() or poll(Duration) call.
void acknowledge(ConsumerRecord record, AcknowledgeType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync(), commitAsync()
 
or
commitAsync
poll(
)
Duration) call.
Uuid clientInstanceId(Duration timeout)
Determines the client's unique client instance ID used for telemetry
  call
.
void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.
void commitAsync() Commits the acknowledgements for the records returned.
void commitSync()Commits the acknowledgements for the records returned.
void commitSync(Duration timeout)Commits the acknowledgements for the records returned.
Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.
ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using the subscribe API.
void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.
Set<String> subscription() Get the current subscription.
void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection) .
void wakeup() Wakeup the consumer.

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

...

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions)Delete offset information for a set of partitions in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.
ListGroupsResult listGroups() List the groups available in the cluster.
ListGroupsResult listGroups(ListGroupsOptions options) List the groups available in the cluster.

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

...

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of partitions in a share group with the default
     * options. This will succeed at the partition level only if the group is not actively
     * subscribed to the corresponding topic.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, Map, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
        return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of partitions in a share group. This will
     * succeed at the partition level only if the group is not actively subscribed
     * to the corresponding topic.
     *
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition> partitions,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * List the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult.
     */
    default ListShareGroupsResult listShareGroups() {
        return listShareGroups(new ListShareGroupsOptions());
    }

    /**
     * List the share groups available in the cluster.
     *
     * @param options The options to use when listing the share groups.
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
 
 

AlterShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;

    /**
     * TheList resultthe ofgroups theavailable {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeedin the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListGroupsResult.
     */
    publicdefault KafkaFuture<Void>ListGroupsResult alllistGroups() {
        return listGroups(new ListGroupsOptions());
    }

      /**
     * ReturnList athe futuregroups whichavailable canin bethe usedcluster.
 to check the result for*
 a given partition.
  * @param options */
The options to use publicwhen KafkaFuture<Void>listing partitionResult(final TopicPartition partition) {
the groups.
      }
}

AlterShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

...

* @return The ListGroupsResult.
     */
    ListGroupsResult listGroups(ListGroupsOptions);

AlterShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsetsAdmin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Set<TopicPartition>OffsetAndMetadata>), DeleteShareGroupOffsetsOptionsAlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResultAlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the deletionsalter offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

...

AlterShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsetsAdmin#alterShareGroupOffsets(String groupId, Set<TopicPartition>Map<TopicPartition, OffsetAndMetadata>), DeleteShareGroupOffsetsOptionsAlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptionsAlterShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions>AbstractOptions<AlterShareGroupOffsetsOptions> {
}

...

DeleteShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroups(Collection<String>Admin#deleteShareGroupOffsets(String, Set<TopicPartition>, DeleteShareGroupsOptionsDeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsResultDeleteShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a map from group id to futures future which can be used to check the result statusfor ofa individualgiven deletionspartition.
     */
    public Map<String, KafkaFuture<Void>> deletedGroups(KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

...

DeleteShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupsAdmin#deleteShareGroupOffsets(Collection<String>String, DeleteShareGroupsOptionsSet<TopicPartition>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptionsDeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupsOptions>AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

...

DeleteShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#describeShareGroupsAdmin#deleteShareGroups(Collection<String>, DescribeShareGroupsOptionsDeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsResultDeleteShareGroupsResult {
    /**
     * Return a future which yieldssucceeds all ShareGroupDescription objects, only if all the describesdeletions succeed.
     */
    public KafkaFuture<Map<String,KafkaFuture<Void> ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which yield group descriptionscan be used to check the status of individual deletions.
     */
    public Map<String, KafkaFuture<ShareGroupDescription>>KafkaFuture<Void>> describedGroupsdeletedGroups() {
    }
}

ShareGroupDescription

...

DeleteShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * AOptions detailedfor descriptionthe of a single share group in the cluster{@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions ShareGroupDescriptionextends AbstractOptions<DeleteShareGroupsOptions> {
}

DescribeShareGroupsResult

Code Block
languagejava
package  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsResult {
    /**
     * A list of the members of Return a future which yields all ShareGroupDescription objects, if all the sharedescribes groupsucceed.
     */
    public KafkaFuture<Map<String, Collection<MemberDescription>ShareGroupDescription>> membersall(); {
    }

    /**
     * TheReturn sharea groupmap state,from orgroup UNKNOWNid ifto thefutures statewhich cannotyield begroup parseddescriptions.
     */
    public Map<String, ShareGroupStateKafkaFuture<ShareGroupDescription>> statedescribedGroups();

  /** {
   * The share group coordinator, or null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

...

 }
}

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * Options for {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}A detailed description of a single share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptions extendsShareGroupDescription AbstractOptions<DescribeShareGroupsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The resultid of the {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
  share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
     * ReturnThe ashare future which yields all Map<Stringgroup state, Map<TopicPartition, OffsetAndMetadata> objects,or UNKNOWN if requeststhe forstate allcannot thebe groups succeedparsed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> allShareGroupState state() {
    }
;

    /**
   * The *share Returngroup acoordinator, futureor whichnull yieldsif athe mapcoordinator ofis topicnot partitionsknown.
 to OffsetAndMetadata objects*/
 for thepublic specified group.Node coordinator();

  /**
   */
 The authorized operations for publicthis KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {group, or null if that information is not known.
   */
  public Set<AclOperation> }authorizedOperations();
}

...

DescribeShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#listShareGroupOffsetsAdmin#describeShareGroups(Map<StringCollection<String>, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptionsDescribeShareGroupsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptionsDescribeShareGroupsOptions extends AbstractOptions<ListShareGroupOffsetsOptions>AbstractOptions<DescribeShareGroupsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

...

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

Code Block
languagejava
package org.apache.kafka.clientclients.admin;
 
/**
 * The Specificationresult of sharethe group offsets to list using {@link {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpecListShareGroupOffsetsResult {
  public ListShareGroupOffsetsSpec();

  /**
     * Set the topic partitions whose offsets are to be listed for a share group.
Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects, if requests for all the groups succeed.
     */
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);  public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
    }

    /**
     * Returns the topic partitions whose offsets are Return a future which yields a map of topic partitions to beOffsetAndMetadata listedobjects for athe sharespecified group.
     */
    public Collection<TopicPartition> topicPartitions();KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
    }
}

...

ListShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.clientsclient.admin;
 
/**
 * TheOptions result offor the {@link Admin#listShareGroups(ListShareGroupsOptionsAdmin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends ListShareGroupsResultAbstractOptions<ListShareGroupOffsetsOptions> {
}

ListShareGroupOffsetsSpec

Code Block
languagejava
package    org.apache.kafka.client.admin;
 
/**
 * Specification of share group *offsets Returnsto alist futureusing that yields either an exception, or the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> all(){@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpec {
    }public ListShareGroupOffsetsSpec();

    /**
   * Set the * Returns a future which yields just the valid listings.
  topic partitions whose offsets are to be listed for a share group.
   */
  ListShareGroupOffsetsSpec  public KafkaFuture<Collection<ShareGroupListing>> valid() {
    }
 
   topicPartitions(Collection<TopicPartition> topicPartitions);

  /**
   * Returns * Returns a future which yields just the errors which occurred.
  the topic partitions whose offsets are to be listed for a share group.
   */
    public KafkaFuture<Collection<Throwable>> errorsCollection<TopicPartition> topicPartitions() {
    }
}

...

;
}

ListShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clientclients.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * AThe listingresult of athe share group in the cluster{@link Admin#listShareGroups(ListShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupListingListShareGroupsResult {
  public ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> state);

 /**
     * Returns a future that yields either an exception, or the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> all() {
    }

    /**
     * The id ofReturns a future which yields just the sharevalid grouplistings.
     */
    public StringKafkaFuture<Collection<ShareGroupListing>> groupIdvalid();
 {
    }
 
    /**
     * The share group state Returns a future which yields just the errors which occurred.
     */
    public Optional<ShareGroupState>KafkaFuture<Collection<Throwable>> stateerrors(); {
    }
}

...

ShareGroupListing

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * OptionsA forlisting {@link Admin#listShareGroups(ListShareGroupsOptions)}.
 *
 of a share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptionsShareGroupListing extends AbstractOptions<ListShareGroupsOptions> {
  public  /**ShareGroupListing(String groupId);
  public    * If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     ShareGroupListing(String groupId, Optional<ShareGroupState> state);

  /**
   * The id of the share group.
   */
    public ListShareGroupsOptionsString inStatesgroupId(Set<ShareGroupState> states);

    /**
     * ReturnThe theshare list of States that are requested or empty if no states have been specifiedgroup state.
     */
    public Set<ShareGroupState>Optional<ShareGroupState> statesstate();
}

GroupType

...

ListShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.

...

ShareGroupState

...

client.admin;

import org.apache.kafka.common.ShareGroupState

...

DEAD 

...

EMPTY 

...

STABLE 

...

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

...

Option

...

Description

...

--all-topics

...

Consider all topics assigned to a group in the `reset-offsets` process.

...

--bootstrap-server <String: server to connect to>

...

REQUIRED: The server(s) to connect to.

...

--command-config <String: command config property file>

...

Property file containing configs to be passed to Admin Client.

...

--delete

...

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

...

--delete-offsets

...

Delete offsets of share group. Supports one share group at the time, and multiple topics.

...

--describe

...

Describe share group and list offset lag (number of records not yet processed) related to given group.

...

--dry-run

...

Only show results without executing changes on share groups. Supported operations: reset-offsets.

...

--execute

...

Execute operation. Supported operations: reset-offsets.

...

--group <String: share group>

...

The share group we wish to act on.

...

--help

...

Print usage information.

...

--list

...

List all share groups.

...

--members

...

Describe members of the group. This option may be used with the '--describe' option only.

...

--offsets

...

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

...

--reset-offsets

...

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

...

--to-datetime <String: datetime>

...

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

...

--to-earliest

...

Reset offsets to earliest offset.

...

--to-latest

...

Reset offsets to latest offset.

...

--topic <String: topic>

...

The topic whose share group information should be deleted or topic which should be included in the reset offset process.

...

--version

...

Display Kafka version.

;

/**
 * Options for {@link Admin#listShareGroups(ListShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends AbstractOptions<ListShareGroupsOptions> {
    /**
     * If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     */
    public ListShareGroupsOptions inStates(Set<ShareGroupState> states);

    /**
     * Return the list of States that are requested or empty if no states have been specified.
     */
    public Set<ShareGroupState> states();
}

ListGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#listGroups(ListGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListGroupsResult {
    /**
     * Returns a future that yields either an exception, or the full set of group listings.
     */
    public KafkaFuture<Collection<GroupListing>> all() {
    }
 
    /**
     * Returns a future which yields just the valid listings.
     */
    public KafkaFuture<Collection<GroupListing>> valid() {
    }
  
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

GroupListing

Code Block
languagejava
package org.apache.kafka.client.admin;
 
import org.apache.kafka.common.ShareGroupState;
 
/**
 * A listing of a group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class GroupListing {
  public GroupListing(String groupId, GroupType type);
 
  /**
   * The id of the group.
   */
  public String groupId();
 
  /**
   * The group type.
   */
  public GroupType type();
}

ListGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.GroupType;

/**
 * Options for {@link Admin#listGroups(ListGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
    /**
     * If types is set, only groups of these types will be returned. Otherwise, all groups are returned.
     */
    public ListGroupsOptions types(Set<GroupType> states);

    /**
     * Return the list of types that are requested or empty if no types have been specified.
     */
    public Set<GroupType> types();
}

GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

Enum constantDescription
SHARE("share") Share group

ShareGroupState

A new enum org.apache.kafka.common.ShareGroupState  is added:

Enum constant

DEAD 

EMPTY 

STABLE 

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or topic which should be included in the reset offset process

Here are some examples. 

To display a list of all share groups:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --list

To delete the information for topic T1  from inactive share group S1 , which essentially resets the consumption of this topic in the share group:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --delete-offsets

To set the starting offset for consuming topic T1  in inactive share group S1  to a specific date and time:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-datetime 1999-12-31T23:57:00.000 --execute

kafka-console-share-consumer.sh

A new tool called kafka-console-share-consumer.sh  is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh  but using a share group and supporting the various acknowledge modes. It has the following options:

OptionDescription--bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.--consumer-config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.--consumer-property <String: consumer_prop>

Consumer property in the form key=value.

--enable-systest-events

Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

--formatter <String: class>

The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)

--formatter-config <String: config file>

Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.

--group <String: share groud id>

The share group id of the consumer. (default: share)

--help

Print usage information.

--key-deserializer <String: deserializer for keys>

The name of the class to use for deserializing keys.

--max-messages <Integer: num_messages>

The maximum number of messages to consume before exiting. If not set, consumption is continual.

--property <String: prop>

The properties to initialize the message formatter. Default properties include:

 print.timestamp=true|false

 print.key=true|false

 print.offset=true|false

 print.delivery=true|false

 print.partition=true|false

 print.headers=true|false

 print.value=true|false

 key.separator=<key.separator>

 line.separator=<line.separator>

 headers.separator=<line.separator>

 null.literal=<null.literal>

 key.deserializer=<key.deserializer>

 value.deserializer=<value.deserializer>

 header.deserializer=<header.deserializer>

Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.

--reject

If specified, messages are rejected as they are consumed.

--reject-message-on-error

If there is an error when processing a message, reject it instead of halting.

--release

If specified, messages are released as they are consumed.

--timeout-ms <Integer: timeout_ms>

If specified, exit if no message is available for consumption for the specific interval.

--topic <String: topic>

REQUIRED: The topic to consume from.

--value-deserializer <String: deserializer for values>The name of the class to use for deserializing values

.

--version

Display Kafka version.

Here are some examples. 

To display a list of all share groups:

$ kafka-

...

share-

...

groups.sh

...

The following enhancements are made to the kafka-producer-perf-test.sh tool. The changes are intended to make this tool useful for observing the operation of share groups by generating a low message rate with predictable message payloads.

...

Configuration

Broker configuration

...

The timeout to detect client failures when using the group protocol.

...

The minimum session timeout.

...

The maximum session timeout.

...

The heartbeat interval given to the members.

...

The minimum heartbeat interval.

...

The maximum heartbeat interval.

...

The maximum number of consumers that a single share group can accommodate.

...

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

...

--bootstrap-server localhost:9092 --list

To delete the information for topic T1  from inactive share group S1 , which essentially resets the consumption of this topic in the share group:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --delete-offsets

To set the starting offset for consuming topic T1  in inactive share group S1  to a specific date and time:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-datetime 1999-12-31T23:57:00.000 --execute

kafka-console-share-consumer.sh

A new tool called kafka-console-share-consumer.sh  is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh  but using a share group and supporting the various acknowledge modes. It has the following options:

OptionDescription
--bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.
--consumer-config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--consumer-property <String: consumer_prop>

Consumer property in the form key=value.

--enable-systest-events

Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

--formatter <String: class>

The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)

--formatter-config <String: config file>

Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.

--group <String: share groud id>

The share group id of the consumer. (default: share)

--help

Print usage information.

--key-deserializer <String: deserializer for keys>

The name of the class to use for deserializing keys.

--max-messages <Integer: num_messages>

The maximum number of messages to consume before exiting. If not set, consumption is continual.

--property <String: prop>

The properties to initialize the message formatter. Default properties include:

 print.timestamp=true|false

 print.key=true|false

 print.offset=true|false

 print.delivery=true|false

 print.partition=true|false

 print.headers=true|false

 print.value=true|false

 key.separator=<key.separator>

 line.separator=<line.separator>

 headers.separator=<line.separator>

 null.literal=<null.literal>

 key.deserializer=<key.deserializer>

 value.deserializer=<value.deserializer>

 header.deserializer=<header.deserializer>

Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.

--reject

If specified, messages are rejected as they are consumed.

--reject-message-on-error

If there is an error when processing a message, reject it instead of halting.

--release

If specified, messages are released as they are consumed.

--timeout-ms <Integer: timeout_ms>

If specified, exit if no message is available for consumption for the specific interval.

--topic <String: topic>

REQUIRED: The topic to consume from.

--value-deserializer <String: deserializer for values>

The name of the class to use for deserializing values.

--version

Display Kafka version.

kafka-producer-perf-test.sh

The following enhancements are made to the kafka-producer-perf-test.sh tool. The changes are intended to make this tool useful for observing the operation of share groups by generating a low message rate with predictable message payloads.

OptionDescription
--throughput THROUGHPUT(Existing option) Enhanced to permit fractional rates, such as 0.5 meaning 1 message every 2 seconds.
--payload-monotonicpayload is monotonically increasing integer.

Configuration

Broker configuration

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

null, which uses the cluster configuration group.share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration group.share.record.lock.duration.max.ms

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

  • auto.offset.reset : this is handled by a dynamic group configuration group.share.auto.offset.reset 
  • enable.auto.commit  and auto.commit.interval.ms : share groups do not support auto-commit
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported

Kafka protocol changes

This KIP introduces the following new APIs:

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.

.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200
group.share.assignors 

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default "org.apache.server.group.share.SimpleAssignor"

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

null, which uses the cluster configuration group.share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration group.share.record.lock.duration.max.ms

group.type 

Ensures that a newly created group has the specified group type.

Valid values: "consumer"  or "share" 

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

  • auto.offset.reset : this is handled by a dynamic group configuration group.share.auto.offset.reset 
  • enable.auto.commit  and auto.commit.interval.ms : share groups do not support auto-commit
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported

Kafka protocol changes

This KIP introduces the following new APIs:

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "RebalanceTimeoutMsGroupId", "type": "int32string", "versions": "0+", "defaultentityType": -1"groupId",
        "about": "-1The if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwisegroup identifier." },
      { "name": "SubscribedTopicNamesMemberId", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "defaultversions": "null0+",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": TBD,
  "type": "response",
 The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
      { "name": "ShareGroupHeartbeatResponseMemberEpoch",
  "validVersionstype": "0int32",
  "flexibleVersionsversions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - GROUP_MAX_SIZE_REACHED
  "fields": [
 "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "ThrottleTimeMsRackId", "type": "int32string", "versions": "0+",
  "nullableVersions": "0+",   "aboutdefault": "Thenull",
  duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota"about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "ErrorCodeRebalanceTimeoutMs", "type": "int16int32", "versions": "0+", "default": -1,
        "about": "The top-level error code, or 0 if there was no error-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
      { "name": "ErrorMessageSubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The top-level error message, or null if there was no errornull if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  { "nameapiKey": "MemberId"TBD,
  "type": "stringresponse",
  "versionsname": "0+ShareGroupHeartbeatResponse",
  "nullableVersionsvalidVersions": "0+",
  "defaultflexibleVersions": "null0+",
  // Supported   "about"errors:
 "The member// id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },- GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - GROUP_MAX_SIZE_REACHED
  "fields": [
    { "name": "HeartbeatIntervalMsThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The heartbeatduration interval in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "AssignmentErrorCode", "type": "Assignmentint16", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "nullThe iftop-level noterror provided;code, theor assignment otherwise.", "fields": [
   0 if there was no error" },
     { "name": "ErrorErrorMessage", "type": "int8string", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "The assignedtop-level error message, or null if there was no error." },
        { "name": "AssignedTopicPartitionsMemberId", "type": "[]TopicPartitionsstring", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "The partitions assigned tomember ID generated by the coordinator. Only provided when the member." }
joins with MemberEpoch == 0." ]},
   ],
 { "commonStructsname": [
    { "name"MemberEpoch", "type": "TopicPartitionsint32", "versions": "0+",
      "fieldsabout": [
"The member epoch." },
     { "name": "TopicIdHeartbeatIntervalMs", "type": "uuidint32", "versions": "0+",
          "about": "The heartbeat interval topicin IDmilliseconds." },
        { "name": "PartitionsAssignment", "type": "[]int32Assignment", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "Thenull partitions." }
    ]}
  ]
}

ShareGroupDescribe API

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersionsif not provided; the assignment otherwise.", "fields": [
        { "name": "Error", "type": "int8", "versions": "0+",
  "flexibleVersions        "about": "0+"The assigned error." },
   "fields": [
    { "name": "GroupIdsAssignedTopicPartitions", "type": "[]stringTopicPartitions", "versions": "0+",
   "entityType       "about": "groupId",
    The partitions assigned to the member." }
    ]}
  ],
  "aboutcommonStructs": "The[
  ids of the groups to describe" },
{ "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "IncludeAuthorizedOperationsTopicId", "type": "booluuid", "versions": "0+",
          "about": "WhetherThe to include authorized operationstopic ID." },
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  { "name": "ShareGroupDescribeResponsePartitions",
  "validVersionstype": "0[]int32",
  "flexibleVersionsversions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+) "about": "The partitions." }
    ]}
  ]
}

ShareGroupDescribe API

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMsGroupIds", "type": "int32[]string", "versions": "0+", "entityType": "groupId",
      "about": "The durationids in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.of the groups to describe" },
    { "name": "GroupsIncludeAuthorizedOperations", "type": "[]DescribedGroupbool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  Each described group.",
      "fields": [
        { "name": "ErrorCodeShareGroupDescribeResponse",
  "typevalidVersions": "int160",
  "versionsflexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  //  "about": "The describe error, or 0 if there was no error." },- COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
        { "name": "ErrorMessageThrottleTimeMs", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default
      "about": "null",
The duration in milliseconds for which the request was throttled "about": "The top-level error messagedue to a quota violation, or nullzero if there was no error the request did not violate any quota." },
        { "name": "GroupIdGroups", "type": "string[]DescribedGroup", "versions": "0+",
      "entityTypeabout": "groupIdEach described group.",
          "aboutfields": "The group ID string." },[
        { "name": "GroupStateErrorCode", "type": "stringint16", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epochdescribe error, or 0 if there was no error." },
        { "name": "AssignmentEpochErrorMessage", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assignment epoch top-level error message, or null if there was no error." },
        { "name": "AssignorNameGroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group selectedID assignorstring." },
        { "name": "MembersGroupState", "type": "[]Memberstring", "versions": "0+",
          "about": "The members.",
group state string, or the      "fields": [
    empty string." },
        { "name": "MemberIdGroupEpoch", "type": "stringint32", "versions": "0+",
              "about": "The membergroup IDepoch." },
            { "name": "InstanceIdAssignmentEpoch", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The memberassignment instance IDepoch." },
            { "name": "RackIdAssignorName", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rackselected IDassignor." },
            { "name": "MemberEpochMembers", "type": "int32[]Member", "versions": "0+",
              "about": "The current member epoch." },members.",
          "fields": [
            { "name": "ClientIdMemberId", "type": "string", "versions": "0+",
              "about": "The clientmember ID." },
            { "name": "ClientHostInstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member clientinstance hostID." },
            { "name": "SubscribedTopicNamesRackId", "type": "[]string", "versions": "0+", "entityType"nullableVersions": "0+", "default": "topicNamenull",
              "about": "The subscribedmember topicrack namesID." },
            { "name": "AssignmentMemberEpoch", "type": "Assignmentint32", "versions": "0+",
              "about": "The current member assignmentepoch." },
          ]},
        { "name": "AuthorizedOperationsClientId", "type": "int32string", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions "about": "0+", "fields": [
The client ID." },
            { "name": "TopicIdClientHost", "type": "uuidstring", "versions": "0+",
              "about": "The topicclient IDhost." },
            { "name": "TopicNameSubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic namenames." },
            { "name": "PartitionsAssignment", "type": "[]int32Assignment", "versions": "0+",
              "about": "The current partitionsassignment." }
          ]},
        { "name": "AssignmentAuthorizedOperations", "type": "int32", "versions": "0+", "fieldsdefault": [
"-2147483648",
         { "nameabout": "TopicPartitions", "type32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "[]TopicPartitions", "versions": "0+",
        "aboutfields": "The assigned topic-partitions to the member." }, [
      { "name": "ErrorTopicId", "type": "int8uuid", "versions": "0+",
        "about": "The assignedtopic errorID." },
      { "name": "MetadataVersionTopicName", "type": "int32string", "versions": "0+", "entityType": "topicName",
        "about": "The assignortopic metadata versionname." },
      { "name": "MetadataBytesPartitions", "type": "bytes[]int32", "versions": "0+",
        "about": "The assignor metadata bytespartitions." }
    ]},
  ]
}

ShareFetch API

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
      { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "ShareFetchRequestTopicPartitions",
  "validVersionstype": "0[]TopicPartitions",
  "flexibleVersionsversions": "0+",
  "fields      "about": [
 "The assigned topic-partitions to the member." },
      { "name": "GroupIdError", "type": "stringint8", "versions": "0+",
        "about": "0+", "nullableVersions "The assigned error." },
      { "name": "0+MetadataVersion", "defaulttype": "nullint32", "entityTypeversions": "groupId0+",
        "about": "nullThe ifassignor not provided or if it didn't change since the last fetch; the group identifier otherwisemetadata version." },
      { "name": "MemberIdMetadataBytes", "type": "stringbytes", "versions": "0+", "nullableVersions": "0+", "default": "null",
 
        "about": "nullThe ifassignor notmetadata bytes." }
    ]}
  ]
}

ShareFetch API

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  provided or if it didn't change since the last fetch; the member id generated by the coordinator otherwise." },
    { "name": "AcquisitionTimeoutMsShareFetchRequest",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+",
  "defaultfields": -1,[
    {  "aboutname": "-1 if it didn't chance since the last fetch; the maximum time in milliseconds that the fetched records are acquired for the consumer." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The maximum time in milliseconds to wait for the responsenull if not provided or if it didn't change since the last fetch; the group identifier otherwise." },
    { "name": "MinBytesMemberId", "type": "int32string", "versions": "0+", "nullableVersions": "0+",
      "about": "The minimum bytes to accumulate in the responsemember ID." },
    { "name": "MaxBytes", "type": "int32", "versionsname": "0+MaxWaitMs", "defaulttype": "0x7fffffffint32", "ignorableversions": true"0+",
      "about": "The maximum bytestime toin fetch.milliseconds to Seewait KIP-74 for cases where this limit may not be honoredthe response." },
    { "name": "SessionIdMinBytes", "type": "int32", "versions": "0+", "default": "0", "ignorable": true,
      "about": "The share session ID minimum bytes to accumulate in the response." },
    { "name": "SessionEpochMaxBytes", "type": "int32", "versions": "0+", "default": "-10x7fffffff", "ignorable": true,
      "about": "The maximum sharebytes sessionto epoch,fetch. which isSee usedKIP-74 for ordering requests in a session cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "In an incremental fetch request, the partitions to remove.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "SessionId", "type "name": "int32ErrorCode", "versionstype": "0+int16", "defaultversions": "0+", "ignorable": falsetrue,
      "about": "The top sharelevel response sessionerror IDcode." },
    { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "LastStableOffset", "type": "int64", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]},
        { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "0+", "nullableVersions": "0+", "ignorable": true,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0,
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SessionIdMemberId", "type": "int32string", "versions": "0+",
      "aboutnullableVersions": "The share session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "0+",
      "about": "The share session epoch, which is used for ordering requests in a sessionmember ID." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": false,
      "about": "The share session ID." },
    { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0,
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 


kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 


kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...

The changes in this KIP add to the capabilities of Kafka rather than changing existing behavior.

Test Plan

Detail to followThe feature will be throughly tested with unit, integration and system tests.

Rejected Alternatives

Share group consumers use KafkaConsumer

...