Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: KafkaShareConsumer replaces KafkaConsumer

...

When a batch of records is first read from the log and added to the in-flight records for a share-partition, the broker does not know whether the set of records between the batch’s base offset and the last offset contains any gaps, as might occur for example as a result of log compaction. When the broker does not know which offsets correspond to records, the batch is considered an unmaterialized record batch. Rather than forcing the broker to iterate through all of the records in all cases, which might require decompressing every batch, the broker can send unmaterialized record batches to consumers. It initially assumes that all offsets between the base offset and the last offset correspond to records. When the consumer processes the batch, it may find gaps and it reports these using the ShareAcknowledge API. This means that the presence of unmaterialized record batches containing gaps might temporarily inflate the number of in-flight records, but this will be resolved by the consumer acknowledgements.

Client programming interface

...

A new interface KafkaShareConsumer is introduced for consuming from share groups. It looks very similar to KafkaConsumer trimmed down to the methods that apply to share groups.Introducing a new KafkaShareConsumer interface has two advantages compared with overloading KafkaConsumer to work with share groups:

...

.

...

...

To join a share group, the client application instantiates a KafkaShareConsumer using the configuration parameter group.id to give the ID of the share group. Then, it uses KafkaShareConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

...

  1. The batch contains no records, in which case the application just polls again. The call to KafkaShareConsumer.commitAsync() just does nothing because the batch was empty.

  2. All of the records in the batch are processed successfully. The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT) marks all records in the batch as successfully processed.

  3. One of the records encounters an exception. The call to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.REJECT) rejects that record. Earlier records in the batch have already been marked as successfully processed. The call to KafkaShareConsumer.commitAsync() commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.

Client programming interface option 2 - previous option included temporarily for comparison purposes

The KafkaConsumer interface is enhanced to support share groups.

To join a share group, the client application instantiates a KafkaConsumer using the configuration parameters group.type="share"  and group.id to give the ID of the share group. Then, it uses KafkaConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

Each call to KafkaConsumer.poll(Duration) fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement.

If the application calls the new KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method for any record in the batch, it is using explicit acknowledgement. The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) must be issued in the order in which the records appear in the ConsumerRecords object, which will be in order of increasing offset for each share-partition. In this case:

  • The application calls KafkaConsumer.commitSync/Async() which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaConsumer.poll(Duration) without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaConsumer.close() which attempts to commit any pending acknowledgements and releases any remaining acquired records.

If the application does not call KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) for any record in the batch, it is using implicit acknowledgement. In this case:

  • The application calls KafkaConsumer.commitSync/Async() which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.

  • The application calls KafkaConsumer.poll(Duration) without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.

  • The application calls KafkaConsumer.close() which releases any acquired records without acknowledgement.

The KafkaConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically.

Example - Acknowledging a batch of records (implicit acknowledgement)

In this example, a consumer using share group "myshare" subscribes to topic "foo". It processes all of the records in the batch and then calls KafkaConsumer.commitSync() which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with Kafka. Asynchronous commit would also be acceptable. Note that apart from the configuration group.type="share", this code would work with a consumer group. This is the only example for which that is true.

Code Block
languagejava
Properties props = new Properties();

props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Returns a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        doProcessing(record);
    }
    consumer.commitSync();                                                              // Commit the acknowledgement of all the records in the batch
}

Behind the scenes, the KafkaConsumer fetches records from the share-partition leader. The leader selects the records in Available state, and will return complete record batches (https://kafka.apache.org/documentation/#recordbatch) if possible. It moves the records into Acquired state, increments the delivery count, starts the acquisition lock timeout, and returns them to the KafkaConsumer . Then the KafkaConsumer keeps a map of the state of the records it has fetched and returns a batch to the application.

When the application calls KafkaConsumer.commitSync(), the KafkaConsumer updates the state map by marking all of the records in the batch as Acknowledged and it then commits the acknowledgements by sending the new state information to the share-partition leader. For each share-partition, the share-partition leader updates the record states atomically.

Example - Per-record acknowledgement (explicit acknowledgement)

In this example, the application uses the result of processing the records to acknowledge or reject the records in the batch.

Code Block
languagejava
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Returns a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        try {
            doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);                       // Mark the record as processed successfully
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);                       // Mark the record as unprocessable
        }
    }
    consumer.commitAsync();                                                             // Commit the acknowledgements of all the records in the batch
}

In this example, each record processed is separately acknowledged using a call to the new KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method. The AcknowledgeType argument indicates whether the record was processed successfully or not. In this case, the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error such as a deserialization error, this is appropriate. For a transient error which might not affect a subsequent processing attempt, the AcknowledgeType.RELEASE is more appropriate because the record remains eligible for further delivery attempts.

The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) are simply updating the state map in the KafkaConsumer. It is only once KafkaConsumer.commitAsync() is called that the acknowledgements are committed by sending the new state information to the share-partition leader.

Example - Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)

In this example, the application stops processing the batch when it encounters an exception.

Code Block
languagejava
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Returns a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        try {
            doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);                       // Mark the record as processed successfully
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);                       // Mark this record as unprocessable
            break;
        }
    }
    consumer.commitAsync();                                                             // Commit the acknowledgements of the acknowledged records only
}

There are the following cases in this example:

  1. The batch contains no records, in which case the application just polls again. The call to KafkaConsumer.commitAsync() just does nothing because the batch was empty.

  2. All of the records in the batch are processed successfully. The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT) marks all records in the batch as successfully processed.

  3. One of the records encounters an exception. The call to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType.REJECT) rejects that record. Earlier records in the batch have already been marked as successfully processed. The call to KafkaConsumer.commitAsync() commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.

Access control

Share group access control is performed on the GROUP resource type, just the same as consumer groups, with the same rules for the actions checked. A share group is just a new kind of group.

  • Operations which read information about a share group need permission to perform the DESCRIBE action on the named group resource

  • Operations which change information about a share group (such as consuming a record) need permission to perform the READ action on the named group resource

Managing durable share-partition state

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt.

Examples

...

Operation

...

State changes

...

Cumulative state

...

Starting state of topic-partition with latest offset 100

...

SPSO=100, SPEO=100

...

SPSO=100, SPEO=100

...

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

...

Fetch records 100-109

...

SPEO=110, records 100-109 (acquired, delivery count 1)

...

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

...

Acknowledge 100-109

...

SPSO=110

...

SPSO=110, SPEO=110

...

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

...

Fetch records 110-119

...

SPEO=120, records 110-119 (acquired, delivery count 1)

...

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

...

Release 110

...

record 110 (available, delivery count 1)

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

...

Acknowledge 119

...

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

...

Fetch records 110, 120

...

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

...

Lock timeout elapsed 111, 112

...

records 111-112 (available, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 113-118

...

records 113-118 acknowledged

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Fetch records 111,112

...

records 111-112 (acquired, delivery count 2)

...

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 110

...

SPSO=111

...

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 111,112

...

SPSO=120

...

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

Access control

Share group access control is performed on the GROUP resource type, just the same as consumer groups, with the same rules for the actions checked. A share group is just a new kind of group.

  • Operations which read information about a share group need permission to perform the DESCRIBE action on the named group resource

  • Operations which change information about a share group (such as consuming a record) need permission to perform the READ action on the named group resource

Managing durable share-partition state

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt.

Examples

Operation

State changes

Cumulative state

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

Fetch records 110-119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

Release 110

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Acknowledge 119

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Fetch records 110, 120

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Lock timeout elapsed 111, 112

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 113-118

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Fetch records 111,112

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 110

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 111,112

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

Further details to follow as the design progresses.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaShareConsumer

This KIP introduces a new interface for consuming records from a share group.

Method signatureDescription
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
void acknowledge(ConsumerRecord record, AcknowledgementType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.
void commitAsync() Commits the acknowledgements for the records returned.
void commitSync()Commits the acknowledgements for the records returned.
void commitSync(Duration timeout)Commits the acknowledgements for the records returned.
Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.
List<PartitionInfo> partitionsFor(String topic) Get metadata about the partitions for a given topic.
List<PartitionInfo> partitionsFor(String topic,  Duration timeout) Get metadata about the partitions for a given topic.
ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using the subscribe API.
void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.
Set<String> subscription() Get the current subscription.
void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection) .
void wakeup() Wakeup the consumer.

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

Enum constantDescription
ACCEPT  (0)The record was consumed successfully
RELEASE  (1)The record was not consumed successfully. Release it for another delivery attempt.
REJECT  (2)The record was not consumed successfully. Reject it and do not release it for another delivery attempt.

AdminClient

Add the following methods on the AdminClient  interface.

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition,OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(String groupId, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

  • Altering the offsets for a share group resets the Share Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

Command-line tools

A new tool is added for working with share groups called kafka-share-groups.sh . It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

--to-datetime <String: datetime>

Reset

Further details to follow as the design progresses.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaShareConsumer - only applies to Client Programming interface option 1

This KIP introduces a new interface for consuming records from a share group.

...

KafkaConsumer - only applies to Client Programming interface option 2

This KIP introduces two new methods on KafkaConsumer  which apply only to share groups:

  • void acknowledge(ConsumerRecord record) 
  • void acknowledge(ConsumerRecord record, AcknowledgementType type)

Of the existing KafkaConsumer  methods, many of them do not apply to share groups and will result in an exception.

...

Only applies to share groups, otherwise throws a new InvalidGroupTypeException

...

Only applies to share groups, otherwise throws a new InvalidGroupTypeException

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

Yes

...

Yes

...

Yes

...

Yes

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

Yes

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

Yes

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

Yes

...

Yes

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

Yes

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

No, throws a new InvalidGroupTypeException 

...

Yes

...

Yes

...

Yes

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

...

AdminClient

Add the following methods on the AdminClient  interface.

...

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

  • Altering the offsets for a share group resets the Share Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

Command-line tools

A new tool is added for working with share groups called kafka-share-groups.sh . It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or topic which should be included in the reset offset process.

--version

Display Kafka version.

...

group.type
ConfigurationDescriptionValuesType of the group: "consumer"  or "share" . Only applies for Client Programming Interface option 2.Default "consumer"
record.lock.duration.msRecord acquisition lock duration in milliseconds.null, which uses the cluster configuration share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration share.record.lock.duration.max.ms

...

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ShareAcknowledgeRequest",
Code Block
{
  "apiKeyvalidVersions": NN"0",
  "typeflexibleVersions": "request0+",
  "listenersfields": ["zkBroker", "broker"]
    { "name": "SessionId", "type": "int32", "versions": "0+",
      "nameabout": "ShareAcknowledgeRequest"The share session ID." },
    { "validVersionsname": "0SessionEpoch",
  "type": "int32", "flexibleVersionsversions": "0+",
      "fieldsabout": [ "The share session epoch, which is used for ordering requests in a session." },
    { "name": "SessionIdTopics", "type": "int32[]AcknowledgeTopic", "versions": "0+",
      "about": "The share session ID." },topics containing records to acknowledge.", "fields": [
      { "name": "SessionEpochTopicId", "type": "int32uuid", "versions": "0+",
      "about": "The shareunique session epoch, which is used for ordering requests in a session." topic ID"},
      { "name": "TopicsPartitions", "type": "[]AcknowledgeTopicAcknowledgePartition", "versions": "0+",
        "about": "The topicspartitions containing records to acknowledge.", "fields": [
        { "name": "TopicIdPartitionIndex", "type": "uuidint32", "versions": "0+",
          "about": "The uniquepartition topicindex." ID"},
        { "name": "PartitionsAcknowledgementBatches", "type": "[]AcknowledgePartitionAcknowledgementBatch", "versions": "0+",
          "about": "TheRecord partitionsbatches containing records to acknowledge.", "fields": [
          { "name": "PartitionIndexStartOffset", "type": "int32int64", "versions": "0+",
            "about": "The partition index." Start offset of batch of records to acknowledge."},
          { "name:": "AcknowledgementBatchesLastOffset", "type": "[]AcknowledgementBatchint64", "versions": "0+",
            "about": "Record batchesLast offset (inclusive) of batch of records to acknowledge."}, "fields": [
          { "name": "StartOffsetGapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Start offset of batch of recordsArray of offsets in this range which do not correspond to acknowledgerecords."},
          { "name":" "LastOffsetAcknowledgeType", "type": "int64string", "versions": "0+", "default": "accept",
            "about": "LastThe offsettype (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versionsof acknowledgement, such as accept or release."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0+",
            "aboutflexibleVersions": "Array of offsets in this range which do not correspond to records."}0+",
     "fields": [
     { "name": "AcknowledgeTypeThrottleTimeMs", "type": "stringint32", "versions": "0+", "defaultignorable": "accept"true,
            "about": "The typeduration ofin acknowledgement,milliseconds suchfor aswhich acceptthe or release."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
 request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "responseint16",
  "nameversions": "ShareAcknowledgeResponse0+",
  "validVersionsignorable": "0"true,
      "flexibleVersionsabout": "0+",
  "fields": [The top level response error code." },
    { "name": "ThrottleTimeMsSessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": truefalse,
      "about": "The durationshare in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quotasession ID." },
    { "name": "ErrorCodeResponses", "type": "int16[]ShareAcknowledgeTopicResponse", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
topics.", "fields": [
      { "name": "SessionIdTopicId", "type": "int32uuid", "versions": "0+", "default": "0", "ignorable": falsetrue,
      "about": "The shareunique sessiontopic ID." },
      { "name": "ResponsesPartitions", "type": "[]ShareAcknowledgeTopicResponsePartitionData", "versions": "0+",
        "about": "The responsetopic topicspartitions.", "fields": [
        { "name": "TopicIdPartitionIndex", "type": "uuidint32", "versions": "0+", "ignorable": true,
          "about": "The uniquepartition topicindex." ID"},
        { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16", "versions": "0+",
          "about": "The topicerror partitions."code, "fields": [
      or 0 if there was no error." }
   { "name": "PartitionIndex", "type": "int32", "versions": "0+", ]}
      ]}
    "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
        ]}
      ]}
    ]}
  ]
}

Metrics

Further details to follow as the design progresses.

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

This KIP introduces delivery counts and a maximum number of delivery attempts. An obvious future extension is the ability to copy records that failed to be delivered onto a dead-letter queue. This would of course give a way to handle poison messages without them permanently blocking processing.

A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE ) on the group than a proper consumer ( READ ). This is a little more complicated because it needs to have a position independent of the SPSO so that it can traverse along the queue.

The focus in this KIP is on sharing rather than ordering. The concept can be extended to give key-based ordering so that partial ordering and fine-grained sharing can be achieved at the same time.

Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.

Compatibility, Deprecation, and Migration Plan

The changes in this KIP add to the capabilities of Kafka rather than changing existing behavior.

Test Plan

Detail to follow.

Rejected Alternatives

]}
  ]
}

Metrics

Further details to follow as the design progresses.

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

This KIP introduces delivery counts and a maximum number of delivery attempts. An obvious future extension is the ability to copy records that failed to be delivered onto a dead-letter queue. This would of course give a way to handle poison messages without them permanently blocking processing.

A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE ) on the group than a proper consumer ( READ ). This is a little more complicated because it needs to have a position independent of the SPSO so that it can traverse along the queue.

The focus in this KIP is on sharing rather than ordering. The concept can be extended to give key-based ordering so that partial ordering and fine-grained sharing can be achieved at the same time.

Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.

Compatibility, Deprecation, and Migration Plan

The changes in this KIP add to the capabilities of Kafka rather than changing existing behavior.

Test Plan

Detail to follow.

Rejected Alternatives

Share group consumers use KafkaConsumer

In this option, the regular KafkaConsumer  was used by consumers to consume records from a share group, using a configuration parameter group.type  to choose between using a share group or a consumer group. While this means that existing Kafka consumers can trivially make use of share groups, there are some obvious downsides:

  1. An application using KafkaConsumer with a consumer group could be switched to a share group with very different semantics with just a configuration change. There is almost no chance that the application would work correctly.
  2. Libraries such as Kafka Connect which embed Kafka consumers while not work correctly with share groups without code changes beyond changing the configuration. As a result, there is a risk of breaking connectors due to misconfiguration using the group.type  configuration property.
  3. More than half of the KafkaConsumer  methods do not make sense for share groups introducing a lot of unnecessary cruft.

As a result, the KIP now proposes an entirely different class KafkaShareConsumer  which gives a very interface as KafkaConsumer  but eliminates the downsides listed above.None