Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Admin information, delivery count on records

...

Note that because the share groups are all consuming from the same log, the retention behavior for a topic applies to all of the share groups consuming from that topic.

Log compaction

When share groups are consuming from compacted topics, there is the possibility that in-flight records are cleaned while being consumed. In this case, the delivery flow for these records continues as normal because the disappearance of the cleaned records will only be discovered when they are next fetched from the log. This is analogous to a consumer group reading from a compacted topic - records which have been fetched by the consumer can continue to be processed, but if the consumer tried to fetch them again, it would discover they were no longer there.

When fetching records from a compacted topic, it is possible that record batches fetched have offset gaps which correspond to records the log cleaner removed. This simple results in gaps of the range of offsets of the in-flight records.

Reading transactional records

...

Two new control record types are introduced: SHARE_CHECKPOINT (5) and SHARE_DELTA (6). They are written into separate message sets with the Control flag set. This flag indicates that the records are not intended for application consumption. Indeed, these message sets are not returned to any consumers at all since they are just intended for the share-partition leader.

When a control record is written as a result of an operation such as a ShareAcknowledge  RPC, the control record must be written and fully replicated before the RPC response is sent.

SHARE_CHECKPOINT

A SHARE_CHECKPOINT record contains a complete checkpoint of the share-partition state. It contains:

...

In practice, the ESO is used as the cut-off point for cleaning of these control records.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaShareConsumer

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer  with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer . The interface stability is Evolving .

Administration

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The following table summarises the administration operations and how they work.

OperationSupported byNotes
Create share groupGroup coordinatorThis occurs as a side-effect of a ShareGroupHeartbeat. The group coordinator writes a record to the consumer offsets topic to persist the group's existence.
List share groupsGroup coordinator
List share group offsetsGroup coordinator and share-partition leaders
Describe share groupGroup coordinator
Alter share group offsetsShare-partition leadersThe share-partition leader makes a durable share-partition state update for each share-partition affected.
Delete share group offsetsShare-partition leadersThe share-partition leader makes a durable share-partition state update for each share-partition affected.
Delete share groupGroup coordinator working with share-partition leadersOnly empty share groups can be deleted. However, the share-partition leaders need to delete share group offsets, and then delete the share group. It is not an atomic operation. The share-partition leader makes a durable share-partition state update for each share-partition affected. The group coordinator writes a tombstone record to the consumer offsets topic to persist the group deletion.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaShareConsumer

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer  with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer . The interface stability is Evolving .

Code Block
languagejava
@InterfaceStability.Evolving
public interface ShareConsumer<K, 
Code Block
languagejava
@InterfaceStability.Evolving
public interface ShareConsumer<K, V> {

    /**
     * Get the current subscription. Will return the same topics used in the most recent call to
     * {@link #subscribe(Collection)}, or an empty set if no such call has been made.
     *
     * @return The set of topics currently subscribed to
     */
    Set<String> subscription();

    /**
     * Subscribe to the given list of topics to get dynamically assigned partitions.
     * <b>Topic subscriptions are not incremental. This list will replace the current
     * assignment, if there is one.</b> If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
     *
     * <p>
     * As part of group management, the coordinator will keep track of the list of consumers that belong to a particular
     * group and will trigger a rebalance operation if any one of the following events are triggered:
     * <ul>
     * <li>A member joins or leaves the share group
     * <li>An existing member of the share group is shut down or fails
     * <li>The number of partitions changes for any of the subscribed topics
     * <li>A subscribed topic is created or deleted
     * </ul>
     *
     * @param topics The list of topics to subscribe to
     *
     * @throws IllegalArgumentException If topics is null or contains null or empty elements
     * @throws KafkaException for any other unrecoverable errors
     */
    void subscribe(Collection<String> topics);

    /**
     * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void unsubscribe();

    /**
     * Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
     * subscribed to any topics before polling for data.
     *
     * <p>
     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
     * If the timeout expires, an empty record set will be returned.
     *
     * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
     *
     * @return map of topic to records since the last fetch for the subscribed list of topics
     *
     * @throws AuthenticationException if authentication fails. See the exception for more details
     * @throws AuthorizationException if caller lacks Read access to any of the subscribed
     *             topics or to the configured groupId. See the exception for more details
     * @throws InterruptException if the calling thread is interrupted before or while this method is called
     * @throws InvalidTopicException if the current subscription contains any invalid
     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
     * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
     * @throws KafkaException for any other unrecoverable errors (e.g. invalid groupId or
     *             session timeout, errors deserializing key/value pairs,
     *             or any new error cases in future versions)
     * @throws IllegalArgumentException if the timeout value is negative
     * @throws IllegalStateException if the consumer is not subscribed to any topics
     * @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
     */
    ConsumerRecords<K, V> poll(Duration timeout);

    /**
     * Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
     * The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
     * {@link #poll(Duration)} call.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * @param record The record to acknowledge
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record);

    /**
     * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
     * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
     * {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}.
     *
     * @param record The record to acknowledge
     * @param type The acknowledge type which indicates whether it was processed successfully
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync();

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @param timeout The maximum amount of time to await completion of the acknowledgement
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void commitAsync();

    /**
     * Sets the acknowledge commit callback which can be used to handle acknowledgement completion.
     *
     * @param callback The acknowledge commit callback
     */
    void setAcknowledgeCommitCallback(AcknowledgeCommitCallback callback);

    /**
     * Determines the client's unique client instance ID used for telemetry. This ID is unique to
     * this specific client instance and will not change after it is initially generated.
     * The ID is useful for correlating client operations with telemetry sent to the broker and
     * to its eventual monitoring destinations.
     * <p>
     * If telemetry is enabled, this will first require a connection to the cluster to generate
     * the unique client instance ID. This method waits up to {@code timeout} for the consumer
     * client to complete the request.
     * <p>
     * Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
     * configuration option.
     *
     * @param timeout The maximum time to wait for consumer client to determine its client instance ID.
     *                The value must be non-negative. Specifying a timeout of zero means do not
     *                wait for the initial request to complete if it hasn't already.
     *
     * @return The client's assigned instance id used for metrics collection.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws IllegalStateException If telemetry is not enabled because config `{@code enable.metrics.push}`
     *                               is set to `{@code false}`.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException If an unexpected error occurs while trying to determine the client
     *                        instance ID, though this error does not necessarily imply the
     *                        consumer client is otherwise unusable.
     */
    Uuid clientInstanceId(Duration timeout);

    /**
     * Get the metrics kept by the consumer
     */
    Map<MetricName, ? extends Metric> metrics();

    /**
     * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
     * This will commit acknowledgements if possible within the default timeout.
     * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
     *
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close();

    /**
     * Tries to close the consumer cleanly within the specified timeout. This method waits up to
     * {@code timeout} for the consumer to complete acknowledgements and leave the group.
     * If the consumer is unable to complete acknowledgements and gracefully leave the group
     * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
     * used to interrupt close.
     *
     * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
     *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close(Duration timeout);

    /**
     * Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll.
     * The thread which is blocking in an operation will throw {@link WakeupException}.
     * If no thread is blocking in a method which can throw {@link WakeupException},
     * the next call to such a method will raise it instead.
     */
   void wakeup();
}

The following constructors are provided for KafkaShareConsumer .

Method signatureDescription
KafkaShareConsumer(Map<String, Object> configs)
Constructor
KafkaShareConsumer(Properties properties)
Constructor
KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor

AcknowledgeCommitCallback

The new org.apache.kafka.clients.consumer.AcknowledgeCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to called the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

Method signatureDescription
void onComplete(Map<TopicIdPartition, Set<OffsetAndMetadata>> offsets, Exception exception) 

A callback method the user can implement to provide asynchronous handling of request completion.

Parameters:

offsets - A map of the offsets that this callback applies to.

exception - The exception thrown during processing of the request, or null if the acknowledgement completed successfully.

Exceptions:

WakeupException - if KafkaShareConsumer.wakeup() is called.

InterruptException - if the calling thread is interrupted.

AuthorizationException - if not authorized to the topic or group.

KafkaException - for any other unrecoverable errors.

ConsumerRecord

Add the following method on the org.apache.kafka.client.consumer.ConsumerRecord  class.

Method signatureDescription

Optional<Short> deliveryCount() 

Get the delivery count for the record if available.

The delivery count is available for records delivered using a share group and Optional.empty() otherwise.

A new constructor is also added:

Code Block
languagejava
   /**
     * Creates a record to be received from a specified instancetopic ID,and thoughpartition
 this error does not necessarily imply the*
     * @param topic The topic this record is received from
     * @param partition The partition of the topic this  consumer clientrecord is otherwisereceived unusable.from
     */
    Uuid clientInstanceId(Duration timeout);

    /** @param offset The offset of this record in the corresponding Kafka partition
     * Get@param thetimestamp metricsThe kepttimestamp byof the consumerrecord.
     */
 @param timestampType The Map<MetricName, ? extends Metric> metrics();

timestamp type
     /**
 @param serializedKeySize The length *of Close the consumer,serialized waitingkey
 for up to the default* timeout@param ofserializedValueSize 30The secondslength forof anythe neededserialized cleanup.value
     * This@param willkey commitThe acknowledgements if possible withinkey of the default timeout.
     * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
     *record, if one exists (null is allowed)
     * @param value The record contents
     * @param @throwsheaders The InterruptExceptionheaders Ifof the threadrecord
 is interrupted before or while* this@param methodleaderEpoch isOptional called
leader epoch of the record *(may @throwsbe KafkaExceptionempty for anylegacy other error during closerecord formats)
     */
 @param deliveryCount Optional void close();

    /**
     * Tries to close the consumer cleanly within the specified timeout. This method waits up todelivery count of the record (may be empty when deliveries not counted)
     */
    public ConsumerRecord(String topic,
     * {@code timeout} for the consumer to complete acknowledgements and leave the group.
     * If the consumer isint unablepartition,
 to complete acknowledgements and gracefully leave the group
     * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannotlong beoffset,
      * used to interrupt close.
     *
     * @param timeout The maximum time tolong waittimestamp,
 for consumer to close gracefully. The value must be
     *            TimestampType timestampType,
   non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
     *
    int *serializedKeySize,
 @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted before or while this method isint calledserializedValueSize,
        * @throws KafkaException for any other error during close
     */
    void close(Duration timeout);

K key,
    /**
     * Wake up the consumer. This method is thread-safe and is useful in particular to abort a longV poll.value,
     * The thread which is blocking in an operation will throw {@link WakeupException}.
     * If no thread isHeaders blockingheaders,
 in a method which can throw {@link WakeupException},
     * the next call to such a method will raise it instead.
    Optional<Integer> */leaderEpoch,
   void wakeup();
}

The following constructors are provided for KafkaShareConsumer .

...

KafkaShareConsumer(Map<String, Object> configs)

...

KafkaShareConsumer(Properties properties)

...

KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)

...

KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)

...

                      Optional<Short> deliveryCount)

AcknowledgeCommitCallback

The new org.apache.kafka.clients.consumer.AcknowledgeCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to called the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

...

A callback method the user can implement to provide asynchronous handling of request completion.

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

...

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.GroupType;

/**
 * Options for {@link Admin#listGroups(ListGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
    /**
     * If types is set, only groups of these types will be returned. Otherwise, all groups are returned.
     */
    public ListGroupsOptions types(Set<GroupType> statestypes);

    /**
     * Return the list of types that are requested or empty if no types have been specified.
     */
    public Set<GroupType> types();
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "null if not provided or if it didn't change since the last fetch; the group identifier otherwise otherwise." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "MemberIdShareSessionEpoch", "type": "stringint32", "versions": "0+",
      "nullableVersionsabout": "0+",
      "about": "The member IDThe current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
      { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareDeltaValue",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
   { "name": "GroupId", "type": "string", "versions": "0",
     "about": "The group identifier." },
   { "name": "CheckpointEpoch", "type": "uint16", "versions": "0",
     "about": "The checkpoint epoch, increments with each checkpoint." },
   { "name": "States", "type": "[]State", "versions": "0", "fields": [
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0",
        "about": "The state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
   ]}
  ]
}

Index structure for locating share-partition state

More information needs to be added to describe how the index for locating the share-partition state is arranged.

Metrics

Broker Metrics

The following new broker metrics should be added:

...

The feature will be throughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of share groups, and also to understand the impact on brokers with this new feature.

Rejected Alternatives

Share group consumers use KafkaConsumer

...