Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The reconciliation process for a share group is very simple because there is no fencing - the group coordinator revokes the partitions which are no longer in the target assignment of the member and assigns the new partitions to the member at the same time. There’s no need for the revocations to be acknowledged before new partitions are assigned. The member acknowledges changes to its assignment, but the group coordinator does not depend upon receiving the acknowledgement to proceed.

Heartbeat and session

The member uses the ConsumerGroupHeartbeat API to establish a session with the group coordinator. The member is expected to heartbeat every group.share.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.share.session.timeout.ms, the group coordinator will kick the member out from the group. The member is told the heartbeat interval in the response to the ConsumerGroupHeartbeat API.

If a member is kicked out of the group because it fails to heartbeat, because there’s intentionally no fencing, at the protocol level, the consumer does not lose the ability to fetch and acknowledge records. A failure to heartbeat is most likely because the consumer has died. If the consumer just failed to heartbeat due to a temporary pause, it could in theory continue to fetch and acknowledge records. When it finally sends a heartbeat and realises it’s been kicked out of the group, it should stop fetching records because its assignment has been revoked, and rejoin the group.

Static membership

Share groups do not support static membership. Because the membership of a share group is more ephemeral, there’s less advantage to maintaining an assignment when a member has temporarily left but will rejoin within the session timeout.

Share group states

Share groups do not have the ASSIGNING state because only server-side assignors are supported, and do not need the RECONCILING state because there’s no need for all members to converge before the group enters the STABLE state.

  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it.

In-flight records

For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share-partition start offset (SPSO), and the last offset of records which are eligible for consumption is known as the share-partition end offset (SPEO). The records between starting at the SPSO and up to the SPEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.

...

  • ShareFetch  for fetching records from share-partition leaders
  • ShareAcknowledge  for acknowledging delivery with share-partition leaders

The ShareFetch API works very much like incremental fetch using a concept called a share session. Each share session contains a set of topic-partitions which are managed in the share-partition leaders. The share-partition leader manages the fetching of records and the in-flight record state for its share-partitions. The consumer adds and removes topic-partitions from its share session using the ShareFetch API just like the Fetch API is used for incremental fetch. With the Fetch API, the consumer specifies the fetch offset. With the ShareFetch API, the consumer just fetches records and the share-partition leader decides which records to return.

In order to ensure no share-partitions are starved from records being fetched, the share-partition leader rotates the order of share-partitions for which it returns partition information. This ensures that it eventually returns data about all partitions for which data is available.

When a batch of records is first read from the log and added to the in-flight records for a share-partition, the broker does not know whether the set of records between the batch’s base offset and the last offset contains any gaps, as might occur for example as a result of log compaction. When the broker does not know which offsets correspond to records, the batch is considered an unmaterialized record batch. Rather than forcing the broker to iterate through all of the records in all cases, which might require decompressing every batch, the broker can send unmaterialized record batches to consumers. It initially assumes that all offsets between the base offset and the last offset correspond to records. When the consumer processes the batch, it may find gaps and it reports these using the ShareAcknowledge API. This means that the presence of unmaterialized record batches containing gaps might temporarily inflate the number of in-flight records, but this will be resolved by the consumer acknowledgements.

Client programming interface option 1 - new, preferred option

...

This KIP introduces a new interface for consuming records from a share group.

Method signatureDescription
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
void acknowledge(ConsumerRecord record, AcknowledgementType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
Map<TopicPartition,Long> beginningOffset(Collection<TopicPartition> partitions Get the first offset for the given partitions. For a share group, returns the share start offset.Map<TopicPartition,Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) Get the first offset for the given partitions. For a share group, returns the share start offset.
void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.
void commitAsync() Commits the acknowledgements for the records returned.
void commitSync()Commits the acknowledgements for the records returned.
void commitSync(Duration timeout)Commits the acknowledgements for the records returned
.Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions) Get the end offsets for the given partitions. For a share group, returns the share end offset.Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) Get the end offsets for the given partitions
.
For a share group, returns the share end offset.
Map<String, List<PartitionInfo>> listTopics()Get metadata about partitions for all topics that the user is authorized to view.Map<String, List<PartitionInfo>> listTopics(Duration timeout) Get metadata about partitions for all topics that the user is authorized to view.
Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.
List<PartitionInfo> partitionsFor(String topic) Get metadata about the partitions for a given topic.
List<PartitionInfo> partitionsFor(String topic,  Duration timeout) Get metadata about the partitions for a given topic.
ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using the subscribe API.
void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.
Set<String> subscription() Get the current subscription.
void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection) .
void wakeup() Wakeup the consumer.

KafkaConsumer - only applies to Client Programming interface option 2

...

Method signatureDescriptionApplies to share groups?
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync()  or commitAsync()  call.

Only applies to share groups, otherwise throws a new InvalidGroupTypeException

void acknowledge(ConsumerRecord record, AcknowledgementType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync()  or commitAsync()  call.

Only applies to share groups, otherwise throws a new InvalidGroupTypeException

void assign(Collection<TopicPartition> partitions) Manually assign a list of partitions to this consumer.

No, throws a new InvalidGroupTypeException 

Set<TopicPartition> assignment()Get the set of partitions currently assignment to this consumer.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,Long> beginningOffset(Collection<TopicPartition> partitions Get the first offset for the given partitions. For a share group, returns the share start offset.

Yes

Map<TopicPartition,Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) Get the first offset for the given partitions. For a share group, returns the share start offset.

Yes

void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.

Yes

void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.

Yes

void commitAsync() Commit offsets returned on the last poll(Duration) for all the subscribed list of topics and partitions. For a share group, commits the acknowledgements for the records returned.

Yes

void commitAsync(Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback) Commit the specified offset for the specified list of topics and partitions to Kafka.

No, throws a new InvalidGroupTypeException 

void commitSync()Commit offsets returned on the last poll(Duration) for all the subscribed list of topics and partitions. For a share group, commits the acknowledgements for the records returned.

Yes

void commitSync(Duration timeout)Commit offsets returned on the last poll(Duration) for all the subscribed list of topics and partitions. For a share group, commits the acknowledgements for the records returned.

Yes

void commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)Commit the specified offsets for the specified list of topics and partitions.

No, throws a new InvalidGroupTypeException 

void commitSync(Map<TopicPartition,OffsetAndMetadata> offsets, Duration timeout)Commit the specified offsets for the specified list of topics and partitions.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions) Get the last committed offsets for the given partitions (whether the commit happened by this process or another).

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) Get the last committed offsets for the given partitions (whether the commit happened by this process or another).

No, throws a new InvalidGroupTypeException 

OptionalLong currentLag(TopicPartition topicPartition) Get the consumer's current lag on the partition.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions) Get the end offsets for the given partitions. For a share group, returns the share end offset.

Yes

Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) Get the end offsets for the given partitions. For a share group, returns the share end offset.

Yes

void enforceRebalance() Alter the consumer to trigger a new rebalance by rejoining the group.

No, throws a new InvalidGroupTypeException 

ConsumerGroupMetadata groupMetadata() Return the current group metadata associated with this consumer.

No, throws a new InvalidGroupTypeException 

Map<String, List<PartitionInfo>> listTopics()Get metadata about partitions for all topics that the user is authorized to view.

Yes

Map<String, List<PartitionInfo>> listTopics(Duration timeout) Get metadata about partitions for all topics that the user is authorized to view.

Yes

Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.

Yes

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch) Look up the offsets for the given partitions by timestamp.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long>, Duration timeout) Look up the offsets for the given partitions by timestamp.

No, throws a new InvalidGroupTypeException 

List<PartitionInfo> partitionsFor(String topic) Get metadata about the partitions for a given topic.

Yes

List<PartitionInfo> partitionsFor(String topic,  Duration timeout) Get metadata about the partitions for a given topic.

Yes

void pause(Collection<TopicPartition> partitions) Suspend fetching from the requested partitions.

No, throws a new InvalidGroupTypeException 

Set<TopicPartition> paused() Get the set of partitions that were previously paused by call to pause(Collection).

No, throws a new InvalidGroupTypeException 

ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.

Yes

long position(TopicPartition partition) Get the offset of the next record that will be fetched (if a record with that offset exists).

No, throws a new InvalidGroupTypeException 

long position(TopicPartition partition, Duration timeout) Get the offset of the next record that will be fetched (if a record with that offset exists).

No, throws a new InvalidGroupTypeException 

void resume(Collection<TopicPartition> partitions) Resume specified partitions which have been paused with pause(Collection) .

No, throws a new InvalidGroupTypeException 

void seek(TopicPartition partition, long offset) Overrides the fetch offsets that the consumer will use on the next poll(timeout) .

No, throws a new InvalidGroupTypeException 

void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) Overrides the fetch offsets that the consumer will use on the next poll(timeout) .

No, throws a new InvalidGroupTypeException 

void seekToBeginning(Collection<TopicPartition> partitions) Seek to the first offset for each of the given partitions.

No, throws a new InvalidGroupTypeException 

void seekToEnd(Collection<TopicPartition> partitions) Seek to the last offset for each of the given partitions.

No, throws a new InvalidGroupTypeException 

void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.

Yes

void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) Subscribe to the given list of topics to get dynamically assigned partitions.

No, throws a new InvalidGroupTypeException 

void subscribe(Pattern pattern) Subscribe to all topics matching specified pattern to get dynamically assigned partitions.

No, throws a new InvalidGroupTypeException 

void subscribe(Pattern pattern, ConsumerRebalanceListener listener) Subscribe to all topics matching specified pattern to get dynamically assigned partitions.

No, throws a new InvalidGroupTypeException 

Set<String> subscription() Get the current subscription.

Yes

void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection)  or subscribe(Pattern) .

Yes

void wakeup() Wakeup the consumer.

Yes

...

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

Consumer configuration

The following new configuration properties are added for the consumer.

...