Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Admin information, delivery count on records

...

  • The consumers in a share group cooperatively consume records with partitions that may be assigned to multiple consumers

  • The number of consumers in a share group can exceed the number of partitions

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records

Share groups are a new kind type of group, alongside the existing consumer groups, adding "share"  to the existing group types of "consumer"  and "classic" .

All consumers in the same share group subscribed to the same topic cooperatively consume the records of that topic. If a topic is accessed by consumers in more than one share group, each share group cooperatively consumes from that topic independently of the other share groups.

...

The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, as naturally happens when the locks time out. This limit is controlled by the broker configuration property group.share.record.lock.partition.limit configuration parameter . By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

...

A share-partition is a topic-partition with a subscription in a share - group. For a topic-partition subscribed in more than one share group, each share group has its own share-partition.

...

  • It fetches the records from the replica manager from the local replica

  • It manages and persists the states of the in-flight records

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

A share group is a new type of group, adding "share"  to the existing group types of "generic"  and "consumer".

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors. This KIP introduces just one assignor, org.apache.kafka.server.group.share.SimpleAssignor , which assigns all partitions of all subscribed topics to all members.

In the future, a more sophisticated share group assignor could balance the number of consumers assigned to the partitions, and it may well revoke partitions from existing members in order to improve the balance. The simple assignor isn’t that smart.

For a share group, a rebalance is a much less significant event than for a consumer group because there’s no fencing. When a partition is assigned to a member of a share group, it’s telling the member that it should fetch records from that partition, which it may well be sharing with the other members of the share group. The members are not aware of each other, and there’s no synchronization barrier or fencing involved. The group coordinator, using the server-side assignor, is responsible for telling the members which partitions they are assigned and revoked. But the aim is to give every member useful work, rather than to keep the members' assignments safely separated.

For a share group, the group coordinator does not need to persist the assignments, but it does need to persist the assignment epoch so that it doesn't move backwards if the group coordinator changes.

The reconciliation process for a share group is very simple because there is no fencing - the group coordinator revokes the partitions which are no longer in the target assignment of the member and assigns the new partitions to the member at the same time. There’s no need for the revocations to be acknowledged before new partitions are assigned. The member acknowledges changes to its assignment, but the group coordinator does not depend upon receiving the acknowledgement to proceed.

Data model

This is the data model maintained by the group coordinator for share groups.

Share Group and Member

The group and members represent the current state of a share group. This is reminiscent of a simplified consumer group.

This means that the fetch-from-follower optimization is not supported by share-groups. The KIP does however include rack information so that consumers could preferentially fetch from share-partitions whose leadership is in the same rack.

Relationship with consumer groups

Consumer groups and share groups exist in the same namespace in a Kafka cluster. As a result, if there’s a consumer group with a particular name, you cannot create a share group with the same name, and vice versa. But consumer groups and share groups are quite different in terms of use, so attempts to perform operations for one kind of group on a group of the incorrect type will fail with a GroupIdNotFoundException . The new AdminClient.listGroups  method gives a way of listing groups of all types.

Because consumer groups and share groups are both created automatically on first use, the type of group that is created depends upon how the group name was first used. As a result, it is helpful to be able to ensure that a group of a particular name can only be created with a particular type. This is achieved by defining a group configuration property group.type , using the kafka-configs.sh  tool or the AdminClient.incrementalAlterConfigs  method. For example, you could use the following command to ensure that the group ID "G1" is to be used for a share group only.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name group --entity-name G1 --alter --add-config group.type=share

If a regular Kafka consumer then attempts to use "G1" as a consumer group, the exception "InconsistentGroupProtocolException" will be thrown.

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors, which implement the new internal org.apache.kafka.coordinator.group.assignor.SharePartitionAssignor  interface.

This KIP introduces just one assignor, org.apache.kafka.coordinator.group.assignor.SimpleShareAssignor , which assigns all partitions of all subscribed topics to all members. In the future, a more sophisticated share group assignor could balance the number of consumers assigned to the partitions, and it may well revoke partitions from existing members in order to improve the balance. The simple assignor isn’t that smart.

For a share group, a rebalance is a much less significant event than for a consumer group because there’s no fencing. When a partition is assigned to a member of a share group, it’s telling the member that it should fetch records from that partition, which it may well be sharing with the other members of the share group. The members are not aware of each other, and there’s no synchronization barrier or fencing involved. The group coordinator, using the server-side assignor, is responsible for telling the members which partitions they are assigned and revoked. But the aim is to give every member useful work, rather than to keep the members' assignments safely separated.

For a share group, the group coordinator does not need to persist the assignments, but it does need to persist the assignment epoch so that it doesn't move backwards if the group coordinator changes.

The reconciliation process for a share group is very simple because there is no fencing - the group coordinator revokes the partitions which are no longer in the target assignment of the member and assigns the new partitions to the member at the same time. There’s no need for the revocations to be acknowledged before new partitions are assigned. The member acknowledges changes to its assignment, but the group coordinator does not depend upon receiving the acknowledgement to proceed.

Data model

This is the data model maintained by the group coordinator for share groups.

Share Group and Member

The group and members represent the current state of a share group. This is reminiscent of a simplified consumer group.

Share Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Server Assignore

string

The server-side assignor used by the group.
Members

[]Member

The set of members in the group.
Partitions Metadata

[]PartitionMetadata

The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
Name

Type

Description
Member ID

string

The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request and must be used throughout the lifetime of the member.
Rack ID

string

The rack ID configured by the consumer.
Share Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members

[]Member

The set of members in the group.
Partitions Metadata

[]PartitionMetadata

The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
Name

Type

Description
Member ID

string

The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request and must be used throughout the lifetime of the member.
Rack ID

string

The rack ID configured by the consumer.
Client ID

string

The client ID configured by the consumer.
Client Host

string

The client host of the consumer.
Subscribed Topic Names

[]string

The current set of subscribed topic names configured by the consumer.
Server Assignor

string

The server-side assignor used by the group.

Target Assignment

The target assignment of the group. This represents the assignment that all the members of the group will eventually converge to. It is a declarative assignment which is generated by the assignor based on the group state.

...

Note that because the share groups are all consuming from the same log, the retention behavior for a topic applies to all of the share groups consuming from that topic.

In-flight records example

An example of a share-partition showing the states looks like this:

Log compaction

When share groups are consuming from compacted topics, there is the possibility that in-flight records are cleaned while being consumed. In this case, the delivery flow for these records continues as normal because the disappearance of the cleaned records will only be discovered when they are next fetched from the log. This is analogous to a consumer group reading from a compacted topic - records which have been fetched by the consumer can continue to be processed, but if the consumer tried to fetch them again, it would discover they were no longer there.

When fetching records from a compacted topic, it is possible that record batches fetched have offset gaps which correspond to records the log cleaner removed. This simple results in gaps of the range of offsets of the in-flight records.

Reading transactional records

Each consumer in a consumer group has its own isolation level which controls how it handles records which were produced in transactions. For a share group, the concept of isolation level applies to the entire group, not each consumer.

The isolation level of a share group is controlled by the group configuration group.share.isolation.level.

For the read_uncommitted isolation level, which is the default, the share group consumes all transactional and non-transactional records.

For the read_committed isolation level, the share group only consumes committed records. The share-partition leader itself is responsible for keeping track of the commit and abort markers and filtering out transactional records which have been aborted. So, the set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPEO can only move up to the last stable offset.

This processing has to occur on the broker because none of the clients receives all of the records. It can be performed with shallow iteration of the log.

In-flight records example

An example of a share-partition showing the states looks like this:

Code Block
+-------
Code Block
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   0   -------+-------+-------+-------+-------+-------+
|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |   8   |   9   |  ...  | <- offset
| Archv | Archv | Acqrd | Avail | Acqrd | Acked | Archv | Avail | Avail | Avail | Avail | <- state
|       |       |   1   |   2   |   1   |       |       |       |       |       |       | <- delivery count
+-------+-------+---^---+-------+-------+-------+-------+-------+-------+---^---+-------+
                    |                                                       |
                    +-- Share-partition start offset (SPSO)                 +-- Share-partition end offset (SPEO) 

...

A new interface KafkaShareConsumer is introduced for consuming from share groups. It looks very similar to KafkaConsumer trimmed down to the methods that apply to share groups.

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interface, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

To join a share group, the client application instantiates a KafkaShareConsumer using the configuration parameter group.id to give the ID of the share group. Then, it uses KafkaShareConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

...

The KafkaShareConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per share-partition.

Example - Acknowledging a batch of records (implicit acknowledgement)

In this example, a consumer using share group "myshare" subscribes to topic "foo". It processes all of the records in the batch and then calls KafkaShareConsumer.commitSync() which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with Kafka. Asynchronous commit would also be acceptable.

When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.TimeoutException, but the record may well be re-acquired for the same consumer and returned to it again.

Acknowledge commit callback

Acknowledgements errors are delivered to a new kind of callback called an acknowledge commit callback which can optionally be registered with a KafkaShareConsumer.wakeup() .

  • If the application uses KafkaShareConsumer.commitSync() to commit its acknowledgements, the results of the acknowledgements is returned to the application

  • If the application uses KafkaShareConsumer.commitAsync()  or KafkaShareConsumer.poll(Duration) to commit its acknowledgements, the results of the acknowledgements are only delivered if there is an acknowledge commit callback registered.

The acknowledge commit callback is called on the application thread and it is not permitted to call the methods of KafkaShareConsumer  with the exception of KafkaShareConsumer.wakeup() .

Example - Acknowledging a batch of records (implicit acknowledgement)

In this example, a consumer using share group "myshare" subscribes to topic "foo". It processes all of the records in the batch and then calls KafkaShareConsumer.commitSync() which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with Kafka. Asynchronous commit would also be acceptable.

Code Block
languagejava
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myshare");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(
Code Block
languagejava
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myshare");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Returns a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        doProcessing(record);
    }
    consumer.commitSync();                                                              // Commit the acknowledgement of all the records in the batch
}

...

Operation

State changes

Cumulative state

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

Fetch records 110-119

Consumer 1 gets 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Fetch records 111,112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 111,112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

...

The durable share-partition state is recorded using control records, in a similar way to the transaction markers introduced in KIP-98 - Exactly Once Delivery and Transactional Messaging. These control records are written onto the topic-partition whose delivery they reflect. This is important for performance reasons because it means the share-partition leader is able to read and write them directly on the topic-partition for which it is of course also the leader.

Two new control record types are introduced: SHARE_CHECKPOINT (5) and SHARE_DELTA (6). They are written into separate message sets with the Control flag set. This flag indicates that the records are not intended for application consumption. Indeed, these message sets are not returned to consumer at all since they are just intended for the share-partition leader.

In order to recover the share-partition state, the share-partition leader has to read a SHARE_CHECKPOINT and zero or more SHARE_DELTA records which chain backwards to the SHARE_CHECKPOINT with the same checkpoint epoch. By applying the records in order, from earliest to latest, the state can be rebuilt.

To avoid having to scan the topic in order to find these records, the share-partition leader keeps a share snapshot file which lets it locate the control records more efficiently.

SHARE_CHECKPOINT

A SHARE_CHECKPOINT record contains a complete checkpoint of the share-partition state. It contains:

  • The group ID

  • The checkpoint epoch, which is an integer that increments with each SHARE_CHECKPOINT

  • The SPSO

  • The SPEO

  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples where each tuple contains information for a sequence of records with the same state and delivery count

Here are some examples of how the cumulative state from the previous table would be represented in SHARE_CHECKPOINT records:

...

Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 100,
  "EndOffset": 100,
  "States": []
}

...

in a similar way to the transaction markers introduced in KIP-98 - Exactly Once Delivery and Transactional Messaging. These control records are written onto the topic-partition whose delivery they reflect. This is important for performance reasons because it means the share-partition leader is able to read and write them directly on the topic-partition for which it is of course also the leader.

Two new control record types are introduced: SHARE_CHECKPOINT (5) and SHARE_DELTA (6). They are written into separate message sets with the Control flag set. This flag indicates that the records are not intended for application consumption. Indeed, these message sets are not returned to any consumers at all since they are just intended for the share-partition leader.

When a control record is written as a result of an operation such as a ShareAcknowledge  RPC, the control record must be written and fully replicated before the RPC response is sent.

SHARE_CHECKPOINT

A SHARE_CHECKPOINT record contains a complete checkpoint of the share-partition state. It contains:

  • The group ID

  • The checkpoint epoch, which is an integer that increments with each SHARE_CHECKPOINT

  • The SPSO

  • The SPEO

  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples where each tuple contains information for a sequence of records with the same state and delivery count

Here are some examples of how the cumulative state from the previous table would be represented in SHARE_CHECKPOINT records:

Cumulative stateSHARE_CHECKPOINT
SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 100,
  "EndOffset": 100,
  "States": []
}


SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 121,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1 (whatever it was when it was acknowledged)
    },
    {
      "BaseOffset": 120,
      "LastOffset": 120,
      "State": 0 (Available),
      "DeliveryCount": 0
    }
  ]
}


Note that the Acquired state is not recorded because it’s transient. As a result, an Acquired record with a delivery count of 1 is recorded as Available with a delivery count of 0. In the unlikely event of a share-partition leader crash, memory of the in-flight delivery will be lost.

SHARE_DELTA

A SHARE_DELTA record contains a partial update to the share-partition state. It contains:

  • The group ID
  • The checkpoint epoch of the SHARE_CHECKPOINT it applies to
  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples

Examples with control records

Here are the previous examples, showing the control records which record the cumulative state durably. Note that any SHARE_DELTA could be replaced with a SHARE_CHECKPOINT. This example omits the details about consumer instances.

Operation

State changes

Cumulative state

Control records

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
SHARE_CHECKPOINT offset 130:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 110,
  "States": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
SHARE_DELTA offset 131:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 100,
      "LastOffset": 109,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 132:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.

Acknowledge 119

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
SHARE_DELTA offset 133:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 134:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 135:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111,112

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 136:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111,112

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}

or alternatively, taking a new checkpoint:

Code Block
SHARE_CHECKPOINT offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 2,
  "StartOffset": 120,
  "EndOffset": 120,
  "States": []
}

Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt.

Recovering share-partition state and interactions with log cleaning

A share-partition is a topic-partition with a subscription in a share group. The share-partition is essentially a view of the topic-partition, managed by the share-partition leader, with durable state stored on the topic-partition in SHARE_CHECKPOINT and SHARE_DELTA control records.

In order to recreate the share-partition state when a broker becomes the leader of a share-partition, it must read the most recent SHARE_CHECKPOINT and any subsequent SHARE_DELTA control records, which will all have the same checkpoint epoch. In order to minimise the amount of log scanning required, it’s important to write SHARE_CHECKPOINT records frequently, and also to have an efficient way of finding the most recent SHARE_CHECKPOINT record.

For each share-partition, the offset of the most recent SHARE_CHECKPOINT record is called the Share Checkpoint Offset (SCO). The Earliest Share Offset (ESO) is the earliest of the share checkpoint offsets for all of the share groups with a subscription in a share group.

  • The log cleaner can clean all SHARE_CHECKPOINT and SHARE_DELTA records before the SCO.
  • The log cleaner must not clean SHARE_CHECKPOINT and SHARE_DELTA records after the SCO.

In practice, the ESO is used as the cut-off point for cleaning of these control records.

Administration

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The following table summarises the administration operations and how they work.

OperationSupported byNotes
Create share groupGroup coordinatorThis occurs as a side-effect of a ShareGroupHeartbeat. The group coordinator writes a record to the consumer offsets topic to persist the group's existence.
List share groupsGroup coordinator
List share group offsetsGroup coordinator and share-partition leaders
Describe share groupGroup coordinator
Alter share group offsetsShare-partition leadersThe share-partition leader makes a durable share-partition state update for each share-partition affected.
Delete share group offsetsShare-partition leadersThe share-partition leader makes a durable share-partition state update for each share-partition affected.
Delete share groupGroup coordinator working with share-partition leadersOnly empty share groups can be deleted. However, the share-partition leaders need to delete share group offsets, and then delete the share group. It is not an atomic operation. The share-partition leader makes a durable share-partition state update for each share-partition affected. The group coordinator writes a tombstone record to the consumer offsets topic to persist the group deletion.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaShareConsumer

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer  with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer . The interface stability is Evolving .

Code Block
languagejava
@InterfaceStability.Evolving
public interface ShareConsumer<K, V> {

    /**
     * Get the current subscription. Will return the same topics used in the most recent call to
     * {@link #subscribe(Collection)}, or an empty set if no such call has been made.
     *
     * @return The set of topics currently subscribed to
     */
    Set<String> subscription();

    /**
     * Subscribe to the given list of topics to get dynamically assigned partitions.
     * <b>Topic subscriptions are not incremental. This list will replace the current
     * assignment, if there is one.</b> If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
     *
     * <p>
     * As part of group management, the coordinator will keep track of the list of consumers that belong to a particular
     * group and will trigger a rebalance operation if any one of the following events are triggered:
     * <ul>
     * <li>A member joins or leaves the share group
     * <li>An existing member of the share group is shut down or fails
     * <li>The number of partitions changes for any of the subscribed topics
     * <li>A subscribed topic is created or deleted
     * </ul>
     *
     * @param topics The list of topics to subscribe to
     *
     * @throws IllegalArgumentException If topics is null or contains null or empty elements
     * @throws KafkaException for any other unrecoverable errors
     */
    void subscribe(Collection<String> topics);

    /**
     * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void unsubscribe();

    /**
     * Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
     * subscribed to any topics before polling for data.
     *
     * <p>
     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
     * If the timeout expires, an empty record set will be returned.
     *
     * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
     *
     * @return map of topic to records since the last fetch for the subscribed list of topics
     *
     * @throws AuthenticationException if authentication fails. See the exception for more details
     * @throws AuthorizationException if caller lacks Read access to any of the subscribed
     *             topics or to the configured groupId. See the exception for more details
     * @throws InterruptException if the calling thread is interrupted before or while this method is called
     * @throws InvalidTopicException if the current subscription contains any invalid
     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
     * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
     * @throws KafkaException for any other unrecoverable errors (e.g. invalid groupId or
     *             session timeout, errors deserializing key/value pairs,
     *             or any new error cases in future versions)
     * @throws IllegalArgumentException if the timeout value is negative
     * @throws IllegalStateException if the consumer is not subscribed to any topics
     * @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
     */
    ConsumerRecords<K, V> poll(Duration timeout);

    /**
     * Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
     * The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
     * {@link #poll(Duration)} call.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * @param record The record to acknowledge
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record);

    /**
     * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
     * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
     * {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}.
     *
     * @param record The record to acknowledge
     * @param type The acknowledge type which indicates whether it was processed successfully
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync();

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @param timeout The maximum amount of time to await completion of the acknowledgement
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void commitAsync();

    /**
     * Sets the acknowledge commit callback which can be used to handle acknowledgement completion.
     *
     * @param callback The acknowledge commit callback
     */
    void setAcknowledgeCommitCallback(AcknowledgeCommitCallback callback);

    /**
     * Determines the client's unique client instance ID used for telemetry. This ID is unique to
     * this specific client instance and will not change after it is initially generated.
     * The ID is useful for correlating client operations with telemetry sent to the broker and
     * to its eventual monitoring destinations.
     * <p>
     * If telemetry is enabled, this will first require a connection to the cluster to generate
     * the unique client instance ID. This method waits up to {@code timeout} for the consumer
     * client to complete the request.
     * <p>
     * Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
     * configuration option.
     *
     * @param timeout The maximum time to wait for consumer client to determine its client instance ID.
     *                The value must be non-negative. Specifying a timeout of zero means do not
     *                wait for the initial request to complete if it hasn't already.
     *
     * @return The client's assigned instance id used for metrics collection.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws IllegalStateException If telemetry is not enabled because config `{@code enable.metrics.push}`
     *                               is set to `{@code false}`.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException If an unexpected error occurs while trying to determine the client
     *                        instance ID, though this error does not necessarily imply the
     *                        consumer client is otherwise unusable.
     */
    Uuid clientInstanceId(Duration timeout);

    /**
     * Get the metrics kept by the consumer
     */
    Map<MetricName, ? extends Metric> metrics();

    /**
     * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
     * This will commit acknowledgements if possible within the default timeout.
     * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
     *
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close();

    /**
     * Tries to close the consumer cleanly within the specified timeout. This method waits up to
     * {@code timeout} for the consumer to complete acknowledgements and leave the group.
     * If the consumer is unable to complete acknowledgements and gracefully leave the group
     * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
     * used to interrupt close.
     *
     * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
     *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close(Duration timeout);

    /**
     * Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll.
     * The thread which is blocking in an operation will throw {@link WakeupException}.
     * If no thread is blocking in a method which can throw {@link WakeupException},
     * the next call to such a method will raise it instead.
     */
   void wakeup();
}

The following constructors are provided for KafkaShareConsumer .

Method signatureDescription
KafkaShareConsumer(Map<String, Object> configs)
Constructor
KafkaShareConsumer(Properties properties)
Constructor
KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor

AcknowledgeCommitCallback

The new org.apache.kafka.clients.consumer.AcknowledgeCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to called the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

Method signatureDescription
void onComplete(Map<TopicIdPartition, Set<OffsetAndMetadata>> offsets, Exception exception) 

A callback method the user can implement to provide asynchronous handling of request completion.

Parameters:

offsets - A map of the offsets that this callback applies to.

exception - The exception thrown during processing of the request, or null if the acknowledgement completed successfully.

Exceptions:

WakeupException - if KafkaShareConsumer.wakeup() is called.

InterruptException - if the calling thread is interrupted.

AuthorizationException - if not authorized to the topic or group.

KafkaException - for any other unrecoverable errors.

ConsumerRecord

Add the following method on the org.apache.kafka.client.consumer.ConsumerRecord  class.

Method signatureDescription

Optional<Short> deliveryCount() 

Get the delivery count for the record if available.

The delivery count is available for records delivered using a share group and Optional.empty() otherwise.

A new constructor is also added:

Code Block
languagejava
   /**
     * Creates a record to be received from a specified topic and partition
     *
     * @param topic The topic this record is received from
     * @param partition The partition of the topic this record is received from
     * @param offset The offset of this record in the corresponding Kafka partition
     * @param timestamp The timestamp of the record.
     * @param timestampType The timestamp type
     * @param serializedKeySize The length of the serialized key
     * @param serializedValueSize The length of the serialized value
     * @param key The key of the record, if one exists (null is allowed)
     * @param value The record contents
     * @param headers The headers of the record
     * @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
     * @param deliveryCount Optional delivery count of the record (may be empty when deliveries not counted)
     */
    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          long timestamp,
                          TimestampType timestampType,
                          int serializedKeySize,
                          int serializedValueSize,
                          K key,
                          V value,
                          Headers headers,
                          Optional<Integer> leaderEpoch,
                          Optional<Short> deliveryCount)

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

Enum constantDescription
ACCEPT  (0)The record was consumed successfully
RELEASE  (1)The record was not consumed successfully. Release it for another delivery attempt.
REJECT  (2)The record was not consumed successfully. Reject it and do not release it for another delivery attempt.

AdminClient

Add the following methods on the org.apache.kafka.client.admin.AdminClient  interface.

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions)Delete offset information for a set of partitions in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.
ListGroupsResult listGroups() List the groups available in the cluster.
ListGroupsResult listGroups(ListGroupsOptions options) List the groups available in the cluster.

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

  • Altering the offsets for a share group resets the Share-Partition Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned distinct sets of partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

Here are the method signatures.

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of partitions in a share group with the default
     * options. This will succeed at the partition level only if the group is not actively
     * subscribed to the corresponding topic.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, Map, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
        return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of partitions in a share group. This will
     * succeed at the partition level only if the group is not actively subscribed
     * to the corresponding topic.
     *
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition> partitions,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options
Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 121,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1 (whatever it was when it was acknowledged)
    },
    {
      "BaseOffset": 120,
      "LastOffset": 120,
      "State": 0 (Available),
      "DeliveryCount": 0
    }
  ]
}

Note that the Acquired state is not recorded because it’s transient. As a result, an Acquired record with a delivery count of 1 is recorded as Available with a delivery count of 0. In the unlikely event a share-partition leader crash, memory of the in-flight delivery will be lost.

SHARE_DELTA

A SHARE_DELTA record contains a partial update to the share-partition state. It contains:

  • The group ID
  • The checkpoint epoch of the SHARE_CHECKPOINT it applies to
  • The offset of the preceding control record with the same checkpoint epoch
  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples

Examples with control records

Here are the previous examples, showing the control records which record the cumulative state durably. Note that any SHARE_DELTA could be replaced with a SHARE_CHECKPOINT.

...

Operation

...

State changes

...

Cumulative state

...

Control records

...

Starting state of topic-partition with latest offset 100

...

SPSO=100, SPEO=100

...

SPSO=100, SPEO=100

Code Block
SHARE_CHECKPOINT offset 130:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 110,
  "States": []
}

...

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

...

Fetch records 100-109

...

SPEO=110, records 100-109 (acquired, delivery count 1)

...

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

...

Acknowledge 100-109

...

SPSO=110

...

SPSO=110, SPEO=110

Code Block
SHARE_DELTA offset 131:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 130,
  "States": [
    {
      "BaseOffset": 100,
      "LastOffset": 109,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}

...

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

...

Fetch records 110-119

...

SPEO=120, records 110-119 (acquired, delivery count 1)

...

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

...

Release 110

...

record 110 (available, delivery count 1)

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 132:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 131,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.

...

Acknowledge 119

...

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Code Block
SHARE_DELTA offset 133:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 132,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}

...

Fetch records 110, 120

...

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

...

Lock timeout elapsed 111, 112

...

records 111-112 (available, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 134:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 133,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

...

Acknowledge 113-118

...

records 113-118 acknowledged

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 135:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 134,
  "States": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}

...

Fetch records 111,112

...

records 111-112 (acquired, delivery count 2)

...

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 110

...

SPSO=111

...

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 136:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 135,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}

...

Acknowledge 111,112

...

SPSO=120

...

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 136,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}

or alternatively, taking a new checkpoint:

Code Block
SHARE_CHECKPOINT offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 2,
  "StartOffset": 120,
  "EndOffset": 120,
  "States": []
}

Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaShareConsumer

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer  with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer . The interface stability is Evolving .

...

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

...

AdminClient

Add the following methods on the org.apache.kafka.client.admin.AdminClient  interface.

...

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

  • Altering the offsets for a share group resets the Share-Partition Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned distinct sets of partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

Here are the method signatures.

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets#deleteShareGroups(StringCollection<String>, Map, AlterShareGroupOffsetsOptionsDeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIdgroupIds The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata.
    IDs of the groups to delete.
     * @return The AlterShareGroupOffsetsResultDeleteShareGroupsResult.
     */
    default AlterShareGroupOffsetsResultDeleteShareGroupsResult alterShareGroupOffsetsdeleteShareGroups(String groupId, Map<TopicPartition, OffsetAndMetadata> offsetsCollection<String> groupIds) {
        return alterShareGroupOffsets(groupId, offsetsdeleteShareGroups(groupIds, new AlterShareGroupOffsetsOptionsDeleteShareGroupsOptions());
    }

      /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The* groupDelete forshare whichgroups tofrom alterthe offsetscluster.
     *
 @param offsets A map of* offsets@param bygroupIds partitionThe with associated metadata. Partitions not specified in the map are ignoredIDs of the groups to delete.
     * @param options The options to use when alteringdeleting a theshare offsetsgroup.
     * @return The AlterShareGroupOffsetsResultDeleteShareGroupsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsetsDeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, AlterShareGroupOffsetsOptionsDeleteShareGroupsOptions options);

    /**
     * DeleteDescribe offsetssome for a set of partitionsshare groups in athe sharecluster, group with the default
     * options. This will succeed at the partition level only if the group is not actively
     * subscribed to the corresponding topic.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets#describeShareGroups(StringCollection, Map, DeleteShareGroupOffsetsOptionsDescribeShareGroupsOptions)}
 with default options.
  * with default *options. See the overload for more details.
     *
    
     * @param groupIds The IDs of the groups to describe.
     * @return The DeleteShareGroupOffsetsResultDescribeShareGroupsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return deleteShareGroupOffsetsdescribeShareGroups(groupIdgroupIds, partitions, new DeleteShareGroupOffsetsOptionsDescribeShareGroupsOptions());
    }

     /**
     * DeleteDescribe offsetssome forshare agroups setin of partitions in a share group. This willthe cluster.
     *
     * succeed at the partition level only if the group is not actively subscribed @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * to@return theThe corresponding topicDescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
  * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition> partitions,
        DeleteShareGroupOffsetsOptionsDescribeShareGroupsOptions options);

      /**
     * List Deletethe share group groupsoffsets available fromin the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
 See  the  overload* for@param moregroupSpecs details.
Map of share group ids *
to a spec that specifies *the @param groupIds The IDstopic partitions of the groupsgroup to list deleteoffsets for.
     * @return The DeleteShareGroupsResult.ListShareGroupOffsetsResult
     */
    default DeleteShareGroupsResultListShareGroupOffsetsResult deleteShareGroups(Collection<String> groupIdslistShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return deleteShareGroupslistShareGroupOffsets(groupIdsgroupSpecs, new DeleteShareGroupsOptionsListShareGroupOffsetsOptions());
    }

    /**
     * List Deletethe share groups fromgroup offsets available in the cluster for the specified share groups.
     *
     * @param groupIds The IDs groupSpecs Map of share group ids to a spec that specifies the topic partitions of the groupsgroup to deletelist offsets for.
       * @param options The options to use when deletinglisting athe share group offsets.
     * @return The DeleteShareGroupsResult.ListShareGroupOffsetsResult
     */
    DeleteShareGroupsResultListShareGroupOffsetsResult deleteShareGroups(Collection<String> groupIdslistShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, DeleteShareGroupsOptionsListShareGroupOffsetsOptions options);

    /**
     * DescribeList somethe share groups available in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions#listShareGroups(ListShareGroupsOptions)}
 with default options.
  * with default options.* See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.*
     * @return The DescribeShareGroupsResultListShareGroupsResult.
     */
    default DescribeShareGroupsResultListShareGroupsResult describeShareGroupslistShareGroups(Collection<String> groupIds) {
        return describeShareGroupslistShareGroups(groupIds, new DescribeShareGroupsOptionsListShareGroupsOptions());
    }

     /**
     * DescribeList somethe share groups available in the cluster.
     *
     * @param groupIdsoptions The IDs of options to use when listing the share groups to describe.
     * @return The ListShareGroupsResult.
     * @param/
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
  The options to use when describing
    /**
     * List the groups.
 available in the cluster *with @returnthe Thedefault DescribeShareGroupsResultoptions.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
     * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListGroupsResult.
     */
    default ListGroupsResult listGroups() {
        return listGroups(new ListGroupsOptions());
   DescribeShareGroupsOptions options);}

      /**
     * List the sharegroups group offsets available in the cluster for the specified share groups with the default options cluster.
     *
     * @param <p>Thisoptions isThe aoptions convenienceto methoduse forwhen {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}listing the groups.
     * to@return listThe offsetsListGroupsResult.
 of all partitions for the*/
 specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResultListGroupsResult listGroups(ListGroupsOptions);

AlterShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) { Return a future which succeeds if all the alter offsets succeed.
     */
   return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());public KafkaFuture<Void> all() {
    }

    /**
     * Return a Listfuture thewhich sharecan groupbe offsetsused availableto incheck the clusterresult for thea specifiedgiven share groupspartition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

AlterShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 @param* Options groupSpecsfor Mapthe {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

DeleteShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsets(String, Set<TopicPartition>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResult {
of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * ListReturn thea sharefuture groupswhich availablesucceeds inonly theif cluster withall the defaultdeletions optionssucceed.
     */
    public *KafkaFuture<Void> <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.all() {
    }

    /**
     * SeeReturn thea overloadfuture forwhich morecan details.
be used to check the *
result for a   * @return The ListShareGroupsResultgiven partition.
     */
    defaultpublic ListShareGroupsResultKafkaFuture<Void> listShareGroupspartitionResult(final TopicPartition partition) {
        return listShareGroups(new ListShareGroupsOptions());
    }

    }
}

DeleteShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options   * Listfor the share groups available in the cluster{@link Admin#deleteShareGroupOffsets(String, Set<TopicPartition>, DeleteShareGroupOffsetsOptions)} call.
 *    *
     * @param options<p>
 * The optionsAPI toof usethis whenclass listingis theevolving, share groups.
     * @return The ListShareGroupsResult.
    see {@link Admin} for details.
 */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
 

...

@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

DeleteShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#alterShareGroupOffsetsAdmin#deleteShareGroups(String groupIdCollection<String>, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResultDeleteShareGroupsResult {
    /**
     * Return a future which succeeds only if all the alterdeletions offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future map from group id to futures which can be used to check the resultstatus forof aindividual given partitiondeletions.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partitionMap<String, KafkaFuture<Void>> deletedGroups() {
    }
}

...

DeleteShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsetsAdmin#deleteShareGroups(String groupIdCollection<String>, Map<TopicPartition, OffsetAndMetadata>), AlterShareGroupOffsetsOptions)DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptionsDeleteShareGroupsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions>AbstractOptions<DeleteShareGroupsOptions> {
}

...

DescribeShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsetsAdmin#describeShareGroups(StringCollection<String>, Set<TopicPartition>, DeleteShareGroupOffsetsOptionsDescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResultDescribeShareGroupsResult {
    /**
     * Return a future which yields all succeedsShareGroupDescription onlyobjects, if all the deletionsdescribes succeed.
     */
    public publicKafkaFuture<Map<String, KafkaFuture<Void>ShareGroupDescription>> all() {
    }

    /**
     * Return a futuremap whichfrom cangroup beid used to checkfutures thewhich resultyield for a given partitiongroup descriptions.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partitionMap<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
    }
}

...

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsets(String, Set<TopicPartition>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

DeleteShareGroupsResult

Code Block
languagejava
package import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.clientsacl.adminAclOperation;
 
/**
 * A Thedetailed resultdescription of a single theshare {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} callgroup in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsResultShareGroupDescription {
  public  /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
     * Return a map from groupThe id to futures which can be used to check of the status of individual deletionsshare group.
     */
    public Map<String, KafkaFuture<Void>> deletedGroupsString groupId() {
    }
}

DeleteShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
;

  /**
 * Options for the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
A *list The API of thisthe classmembers isof evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupsOptions> {
}

DescribeShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The result ofshare group state, or UNKNOWN if the {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)} call.
 * <p>
 state cannot be parsed.
   */
  public ShareGroupState state();

  /**
   * The APIshare ofgroup thiscoordinator, classor isnull evolving,if seethe {@linkcoordinator Admin}is fornot detailsknown.
   */
@InterfaceStability.Evolving
  public class DescribeShareGroupsResult {
  Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

DescribeShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptions extends AbstractOptions<DescribeShareGroupsOptions> { * Return a future which yields all ShareGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
    }
}

ShareGroupDescription

DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO)This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.clientclients.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * AThe detailed descriptionresult of athe single share group in the cluster{@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupDescriptionListShareGroupOffsetsResult {
   public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

   /**
     * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
    }

    /**
     * A list Return a future which yields a map of the members oftopic partitions to OffsetAndMetadata objects for the sharespecified group.
     */
    public KafkaFuture<Map<TopicPartition, Collection<MemberDescription>OffsetAndMetadata>> memberspartitionsToOffsetAndMetadata(String groupId);

  /**
   {
    }
}

ListShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The shareAPI groupof state,this orclass UNKNOWNis ifevolving, thesee state{@link cannotAdmin} befor parseddetails.
   */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> {
}

ListShareGroupOffsetsSpec

Code Block
languagejava
package org.apache.kafka.client.admin;
 
public ShareGroupState state();

  /**
 * Specification * Theof share group coordinator,offsets orto nulllist ifusing the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

DescribeShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptions extends AbstractOptions<DescribeShareGroupsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

{@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpec {
  public ListShareGroupOffsetsSpec();

  /**
   * Set the topic partitions whose offsets are to be listed for a share group.
   */
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);

  /**
   * Returns the topic partitions whose offsets are to be listed for a share group.
   */
  Collection<TopicPartition> topicPartitions();
}

ListShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptionsAdmin#listShareGroups(ListShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResultListShareGroupsResult {
    /**
     * ReturnReturns a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects, if requests for all the groups succeed that yields either an exception, or the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> all() {
    }

    /**
     * Returns a future which yields just the valid listings.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> allKafkaFuture<Collection<ShareGroupListing>> valid() {
    }
 
      /**
     * ReturnReturns a future which yields ajust mapthe oferrors topic partitions to OffsetAndMetadata objects for the specified groupwhich occurred.
     */
    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupIdKafkaFuture<Collection<Throwable>> errors() {
    }
}

...

ShareGroupListing

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * A listing of a share group in the cluster.
 * <p>
 * Options for {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> {
}

...

The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupListing {
  public ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> state);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * The share group state.
   */
  public Optional<ShareGroupState> state();
}

ListShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * Specification of share group offsets to list using Options for {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptionsAdmin#listShareGroups(ListShareGroupsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends ListShareGroupOffsetsSpecAbstractOptions<ListShareGroupsOptions> {
  public ListShareGroupOffsetsSpec();

  /**
     * Set the topic partitions whose offsets are to be listed for a share group.
If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     */
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions  public ListShareGroupsOptions inStates(Set<ShareGroupState> states);

    /**
     * ReturnsReturn the topiclist partitionsof whoseStates offsetsthat are requested or toempty beif listedno forstates ahave sharebeen groupspecified.
     */
  Collection<TopicPartition> topicPartitions  public Set<ShareGroupState> states();
}

...

ListGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
   
/**
  * The result of the {@link Admin#listShareGroupsAdmin#listGroups(ListShareGroupsOptionsListGroupsOptions)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
@InterfaceStability.Evolving
public class ListShareGroupsResultListGroupsResult {
        /**
          * Returns a future that yields either an exception, or the full set of share group listings.
          */
    public    public KafkaFuture<Collection<ShareGroupListing>> KafkaFuture<Collection<GroupListing>> all() {
        }
 
        /**
          * Returns a future which yields just the valid listings.
          */
    public KafkaFuture<Collection<GroupListing>>   public KafkaFuture<Collection<ShareGroupListing>> valid() {
        }
 
      
    /**
          * Returns a future which yields just the errors which occurred.
          */
    public    public KafkaFuture<Collection<Throwable>> errors() {
        }
}

...

GroupListing

Code Block
languagejava
package org.apache.kafka.client.admin;
 
import org.apache.kafka.common.ShareGroupState;
 
/**
  * A listing of a share group in the cluster.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
@InterfaceStability.Evolving
public class ShareGroupListingGroupListing {
  public  public ShareGroupListingGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState>GroupType statetype);
 
    /**
      * The id of the share group.
      */
  public  public String groupId();
 
    /**
      * The share group statetype.
      */
  public GroupType public Optional<ShareGroupState> statetype();
}

...

ListGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupStateGroupType;

/**
 * Options for {@link Admin#listShareGroupsAdmin#listGroups(ListShareGroupsOptionsListGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptionsListGroupsOptions extends AbstractOptions<ListShareGroupsOptions>AbstractOptions<ListGroupsOptions> {
    /**
     * If statestypes is set, only groups inof these statestypes will be returned. Otherwise, all groups are returned.
     */
    public ListShareGroupsOptionsListGroupsOptions inStatestypes(Set<ShareGroupState>Set<GroupType> statestypes);

    /**
     * Return the list of Statestypes that are requested or empty if no statestypes have been specified.
     */
    public Set<ShareGroupState>Set<GroupType> statestypes();
}

GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

...

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or topic which should be included in the reset offset process.

--version

Display Kafka version.

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default "org.apache.server.group.share.SimpleAssignor"

...

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

null, which uses the cluster configuration group.share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration group.share.record.lock.duration.max.ms group.share.record.lock.duration.max.ms

group.type 

Ensures that a newly created group has the specified group type.

Valid values: "consumer"  or "share" 

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

...

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group

Error codes

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE  - The record state is invalid. The acknowledgement of delivery could not be completed.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

...

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member idID generated by the coordinator. The member idID must be kept during the entire lifetime of the member." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
        "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
      { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

...

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ShareGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The member idID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise.", "fields": [
        { "name": "Error", "type": "int8", "versions": "0+",
          "about": "The assigned error." },
        { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member." }
    ]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
    ]}
  ]
}

...

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
   "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "null if not provided or if it didn't change since the last fetch; the group identifier otherwise." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last fetch; the member id generated by the coordinator otherwiseThe member ID." },
    { "name": "AcquisitionTimeoutMsShareSessionEpoch", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1The ifcurrent itshare didn't chance since the last fetch; the maximum time in milliseconds that the fetched records are acquired for the consumersession epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
      { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "SessionIdTopics", "type": "int32[]FetchTopic", "versions": "0+",
      "defaultabout": "0The topics to fetch.", "ignorablefields": true,[
      { "aboutname": "TopicId"The share session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The sharepartitions sessionto epochfetch.", which is used for ordering requests in a session." },
"fields": [
         { "name": "TopicsPartitionIndex", "type": "[]FetchTopicint32", "versions": "0+",
          "about": "The topicspartition to fetchindex." },
 "fields": [
      { "name": "TopicIdCurrentLeaderEpoch", "type": "uuidint32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The unique topic ID" current leader epoch of the partition." },
        { "name": "PartitionsPartitionMaxBytes", "type": "[]FetchPartitionint32", "versions": "0+",
          "about": "The maximum partitionsbytes to fetch.", "fields": [ from this partition.  See KIP-74 for cases where this limit may not be honored." },
        { "name": "PartitionIndexAcknowledgementBatches", "type": "int32[]AcknowledgementBatch", "versions": "0+",
          "about": "TheRecord batches partitionto indexacknowledge." },
 "fields": [
      { "name": "CurrentLeaderEpoch", "type": "int32", { "versionsname": "0+StartOffset", "defaulttype": "-1int64", "ignorableversions": true"0+",
            "about": "TheStart currentoffset leaderof epochbatch of records theto partitionacknowledge." },
          { "name": "PartitionMaxBytesLastOffset", "type": "int32int64", "versions": "0+",
            "about": "TheLast maximumoffset bytes(inclusive) toof fetchbatch fromof thisrecords partition.  See KIP-74 for cases where this limit may not be honored." }
      ]}
to acknowledge."},
       ]},
    { "name": "ForgottenTopicsDataGapOffsets", "type": "[]ForgottenTopicint64", "versions": "0+", "ignorable": false,

            "about": "In an incremental fetch request, the partitionsArray of offsets in this range which do not correspond to removerecords."}, "fields": [

          { "name": "TopicIdAcknowledgeType", "type": "uuidint8", "versions": "0+", "ignorabledefault": true, "about": "The unique topic ID""0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
    ]},
      { "name": "PartitionsForgottenTopicsData", "type": "[]int32ForgottenTopic", "versions": "0+",
 "ignorable": false,
      "about": "The partitions indexes to forget." } to remove from this share session.", "fields": [
    ]}
  ]
}

Response schema

Code Block
{
  "apiKeyname": NN"TopicId",
  "type": "responseuuid",
  "nameversions": "ShareFetchResponse0+",
  "validVersionsignorable": "0"true,
  "flexibleVersionsabout": "0+"The unique topic ID."},
  "fields": [
    { "name": "ThrottleTimeMsPartitions", "type": "[]int32", "versions": "0+",
 "ignorable": true,
      "about": "The durationpartitions in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quotaindexes to forget." },
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "SessionIdShareFetchResponse",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+", "default": "0", "ignorable": false,
      "about": "The share session ID." },
    { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", 
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
      { "name": "TopicIdThrottleTimeMs", "type": "uuidint32", "versions": "0+", "ignorable": true,
      "about": "The uniqueduration topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
        { "name": "PartitionIndexErrorCode", "type": "int32int16", "versions": "0+",
 "ignorable": true,
        "about": "The partition index top level response error code." },
        { "name": "ErrorCodeResponses", "type": "int16[]ShareFetchableTopicResponse", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
  "about": "The response topics.", "fields": [
      { "name": "LastStableOffsetTopicId", "type": "int64uuid", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The lastunique stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
 topic ID."},
       { "name": "CurrentLeaderPartitions", "type": "LeaderIdAndEpoch[]PartitionData", "versions": "0+",
 "taggedVersions       "about": "0+", "tag": 0The topic partitions.", "fields": [
          { "name": "LeaderIdPartitionIndex", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknownpartition index." },
          { "name": "LeaderEpochErrorCode", "type": "int32int16", "versions": "0+",
            "about": "The latesterror knowncode, leaderor epoch." }
        ]0 if there was no fetch error." },
        { "name": "AbortedTransactionsAcknowledgeErrorCode", "type": "[]AbortedTransactionint16", "versions": "0+",
          "nullableVersionsabout": "0+", "ignorable": trueThe acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "aboutversions": "The aborted transactions.0+",  "fields": [
          { "name": "ProducerIdLeaderId", "type": "int64int32", "versions": "0+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction ID of the current leader or -1 if the leader is unknown." },
          { "name": "FirstOffsetLeaderEpoch", "type": "int64int32", "versions": "0+",
            "about": "The firstlatest offsetknown in the aborted transactionleader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0,
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SessionIdMemberId", "type": "int32string", "versions": "0+", "nullableVersions": "0+",
      "about": "The share sessionmember ID." },
    { "name": "SessionEpochShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch, which is used for ordering requests in a session: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
      { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

  ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": false,
      "about": "The share session ID." },
    { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versionsname": "16+NodeEndpoints", "taggedVersionstype": "0+[]NodeEndpoint", "tagversions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
    "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alter offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "Each partition to alter offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "AlterShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  ,
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  ,
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DeleteShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset."},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareDeltaValue",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
   { "name": "GroupId", "type": "string", "versions": "0",
     "aboutflexibleVersions": "none"The,
 group identifier."fields": },[
   { "name": "CheckpointEpochGroupId", "type": "uint16string", "versions": "0",
     "about": "The checkpoint epoch, increments with each checkpointgroup identifier." },
   { "name": "BackOffsetCheckpointEpoch", "type": "int64uint16", "versions": "0",
     "about": "The offsetcheckpoint ofepoch, theincrements previouswith ShareCheckpointeach or ShareDeltacheckpoint." },
   { "name": "States", "type": "[]State", "versions": "0", "fields": [
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0",
        "about": "The state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
   ]}
  ]
}

Index structure for locating share-partition state

More information needs to be added to describe how the index for locating the share-partition state is arranged.

Metrics

Broker Metrics

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 
offset

share-

commit

acknowledgement (

offset

share-

commit

acknowledgement-rate and

offset

share-

commit

acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of

committed

offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=

offset

share-

commit

acknowledgement-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=

offset

share-

commit

acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 


kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type=

{accept,release,reject}

{accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 


kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...

Compatibility, Deprecation, and Migration Plan

The changes in this KIP add to the capabilities of Kafka rather than changing existing behavior.

Test Plan

Kafka Broker Migration

This KIP builds upon KIP-848 which introduced the new group coordinator and the new records for the __consumer_offsets  topic. The pre-KIP-848 group coordinator will not recognize the new records, so this downgrade is not supported.

Downgrading to a software version that supports the new group coordinator but does not support share groups is supported. This KIP adds a new version for the ConsumerGroupMetadataValue  record to include the group type. If the software version does not understand the v1 record type, it will assume the records apply to a consumer group of the same name. We should make sure this is a harmless situation.

More information need to be added here based on the share-partition persistence mechanism. Details are still under consideration here.

Test Plan

The feature will be throughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of share groups, and also to understand the impact on brokers with this new featureDetail to follow.

Rejected Alternatives

Share group consumers use KafkaConsumer

...