You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka has achieved great success as a highly scalable event-streaming platform. The way that consumer groups assign partitions to members of the group gives a powerful combination of ordering and scalability, but it does introduce coupling between the number of consumers in a consumer group and the number of partitions. Users of Kafka often have to “over-partition” simply to ensure they can have sufficient parallel consumption to cope with peak loads.

There are plenty of situations in which consumers could cooperatively consume from a stream of events without needing to be assigned exclusive access to specific topic-partitions. This, together with per-message acknowledgement and delivery counts, enables a class of use-cases traditionally built around the concept of a queue. For example, a queue is perfect for a situation in which messages are independent work items that can be processed concurrently by a pool of applications, and individually retried or acknowledged as processing completes. This is much easier to achieve using a queue rather than a partitioned topic with a consumer group.

This KIP introduces the concept of a share group as a way of enabling cooperative consumption using Kafka topics. It does not add the concept of a “queue” to Kafka per se, but rather that introduces cooperative consumption to accommodate these queuing use-cases using regular Kafka topics. Share groups make this possible. You can think of a share group as roughly equivalent to a “durable shared subscription” in existing systems.

This is indeed Queues for Kafka - queues done in a Kafka way, with no maximum queue depth and the ability to reset to a specific time for point-in-time recovery.

Proposed Changes

Share groups allow Kafka consumers to work together cooperatively consuming and processing the records from topics. They are an alternative to consumer groups for situations in which finer-grained sharing is required.

The fundamental differences between a share group and a consumer group are:

  • The consumers in a share group cooperatively consume records without partition assignment

  • The number of consumers in a share group can exceed the number of partitions

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records

Share groups are a new kind of group, alongside the existing consumer groups. A consumer specifies that it wants to use a share group using consumer configuration options group.type="share" and group.id.

All consumers in the same share group subscribed to the same topic cooperatively consume the records of that topic. If a topic is accessed by consumers in more than one share group, each share group cooperatively consumes from that topic independently of the other share groups.

Each consumer can dynamically set the list of topics it wants to subscribe to. In practice, all of the consumers in a share group will usually subscribe to the same topic or topics.

When a consumer in a share-group fetches records, it receives available records from any of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default, the lock duration is 30s, but it can also be controlled using a consumer configuration. The idea is that the lock is automatically released once the lock duration has elapsed, and then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in the following ways:

  • The consumer can acknowledge successful processing of the record

  • The consumer can release the record, which makes the record available for another delivery attempt

  • The consumer can reject the record, which indicates that the record is unprocessable and does not make the record available for another delivery attempt

  • The consumer can do nothing, in which case the lock is automatically released when the lock duration has elapsed

The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, as naturally happens when the locks time out. This limit is controlled by the broker share.record.lock.partition.limit configuration parameter. By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

Concepts

There are some concepts being introduced to Kafka to support share groups.

A share-group coordinator is the broker which is the group coordinator for a share group. The responsibility for being share-group coordinator for the cluster’s share groups is distributed among the brokers, exactly as for consumer groups. The share-group coordinator has the following responsibilities:

  • It maintains the list of share-group members.

  • It manages the topic-partition assignments for the share-group members. An initial, trivial implementation would be to give each member the list of all topic-partitions which matches its subscriptions and then use the pull-based protocol to fetch records from all partitions. A more sophisticated implementation could use topic-partition load and lag metrics to distribute partitions among the consumers as a kind of autonomous, self-balancing partition assignment, steering more consumers to busier partitions, for example. Alternatively, a push-based fetching scheme could be used. Protocol details will follow later.

A share-partition is a topic-partition with a subscription in a share-group. For a topic-partition subscribed in more than one share group, each share group has its own share-partition.

A share-partition leader is a component of the broker which manages the share-group’s view of a topic-partition. It is co-located with the topic-partition leader, and the leadership of a share-partition follows the leadership of the topic-partition. The share-partition leader has the following responsibilities:

  • It fetches the records from the replica manager from the local replica

  • It manages and persists the states of the in-flight records

In-flight records

For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share start offset (SSO), and the last offset of records which are eligible for consumption is known as the share end offset (SEO). The records between starting at the SSO and up to the SEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.

The SEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SSO and the SEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SSO and the SEO. The upper bound is controlled by the broker configuration share.record.lock.partition.limit. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.

The records in a share-partition are in one of four states:

State

Description

Available

The record is available for a consumer

Acquired

The record has been acquired for a specific consumer, with a time-limited acquisition lock

Acknowledged

The record has been processed and acknowledged by a consumer

Archived

The record is not available for a consumer

All records before the SSO are in Archived state. All records after the SEO are in Available state, but not yet being delivered to consumers.

The records also have a delivery count in order to prevent unprocessable records being endlessly delivered to consumers. If a record is repeatedly causing exceptions during its processing, it is likely that it is a “poison message”, perhaps with a formatting or semantic error. Every time that a record is acquired by a consumer in a share group, its delivery count increments by 1. The first time a record is acquired, its delivery count is 1.

The state transitions look like this:

+--------------+
|  Available   |<------------------+
+--------------+                   |
       |                           |
       | acquired                  | - if (delivery count < share.delivery.attempt.limit)
       | for consumer              |     - released by consumer
       | (delivery count++)        |     - acquisition lock elapsed
       V                           |
+--------------+                   |
|   Acquired   |-------------------+
+--------------+                   |
       |                           |
       | accepted                  | - if (delivery count == share.delivery.attempt.limit)
       | by consumer               |     - released by consumer
       |                           |     - acquisition lock elapsed
       V                           | OR
+--------------+                   | - rejected by consumer as unprocessable
| Acknowledged |                   |
+--------------+                   |
       |                           |
       | SSO moves                 |
       | past record               |
       |                           |
       V                           |
+--------------+                   |
|   Archived   |<------------------+
+--------------+

When records are fetched for a consumer, the share-partition leader starts at the SSO and finds Available records. For each record it finds, it moves it into Acquired state, bumps its delivery count and adds it to a batch of acquired records to return to the consumer. The consumer then processes the records and acknowledges their consumption. The delivery attempt completes successfully and the records move into Acknowledged state.

Alternatively, if the consumer cannot process a record or its acquisition lock elapses, the delivery attempt completes unsuccessfully and the record’s next state depends on the delivery count. If the delivery count has reached the cluster’s share delivery attempt limit (5 by default), the record moves into Archived state and is not eligible for additional delivery attempts. If the delivery count has not reached the limit, the record moves back into Available state and can be delivered again.

Ordering

Share groups focus on primarily on sharing to allow consumers to be scaled independently of partitions. The records in a share-partition can be delivered out of order to a consumer, in particular when redeliveries occur.

For example, imagine two consumers in a share group consuming from a single-partition topic. The first consumer fetches records 100 to 109 inclusive and then crashes. At the same time, the second consumer fetches, processes and acknowledges records 110 to 119. When the second consumer fetches again, it gets records 100 to 109 with their delivery counts set to 2 because they are being redelivered. That’s exactly what you want, but the offsets do not necessarily increase monotonically in the same way as they do for a consumer group.

The records returned in a batch for particular share-partition are guaranteed to be in order of increasing offset. There are no guarantees about the ordering of offsets between different batches.

Managing the SSO and SEO

The consumer group concepts of seeking and position do not apply to share groups. The SSO for each share-partition can be initialized for an empty share group and the SEO naturally moves forwards as records are consumed.

When a topic subscription is added to a share group for the first time, the SSO is initialized for each share-partition. By default, the SSO for each share-partition is initialized to the latest offset for the corresponding topic-partitions.

Alternatively, there is an administrative action available using either AdminClient.alterShareGroupOffsets or the kafka-share-groups.sh tool to reset the SSO for an empty share group with no active members. This can be used to “reset” a share group to the start of a topic, a particular timestamp or the end of a topic. It can also be used to initialize the share group to the start of a topic. Resetting the SSO discards all of the in-flight record state and delivery counts.

For example, to start using a share group S1 to consume for the first time from the earliest offset of a topic T1, you could use:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-earliest --execute

If the number of partitions is increased for a topic with a subscription in a share group, the SSO for the newly created share-partitions is initialized to 0 (which is of course both the earliest and latest offset for an empty topic-partition). This means there is no doubt about what happens when the number of partitions is increased.

In-flight records example

An example of a share-partition showing the states looks like this:

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |   8   |   9   |  ...  | <- offset
| Archv | Archv | Acqrd | Avail | Acqrd | Acked | Archv | Avail | Avail | Avail | Avail | <- state
|       |       |   1   |   2   |   1   |       |       |       |       |       |       | <- delivery count
+-------+-------+---^---+-------+-------+-------+-------+-------+-------+---^---+-------+
                    |                                                       |
                    +-- Share start offset (SSO)                            +-- Share end offset (SEO) 

The share group is currently managing the consumption of the in-flight records, which have offsets 2 to 8 inclusive.

  • All records earlier than offset 2 are in Archived state and are not in-flight

  • Records 2 and 4 have been acquired for consumption by a consumer, and their delivery counts have been incremented to 1

  • Record 3 has previously been acquired twice for consumption by a consumer, but went back into Available state

  • Record 5 has been acknowledged

  • Record 6 has previously been acquired for consumption by a consumer, was rejected because it cannot be processed, and is in Archived state

  • Records 7 and 8 are available for consumption by a consumer

  • All records starting with offset 9 and later are in Available state

The cluster records this information durably. In this example, the durable state contains the SSO position, the non-zero delivery count for offset 3, the Acknowledged state of offset 5, and the Archived state of offset 6.

Batching

Cooperative consumption is inherently record-based, but the expectation is that batching is used to maximise performance. For example:

  • When a consumer fetches records, the share-partition leader prefers to return complete record batches.

  • In the usual and optimal case, all of the records in a batch will be in Available state and can all be moved to Acquired state with the same acquisition lock time-out.

  • When the consumer has processed the fetched records, it can acknowledge delivery of all of the records as a single batch, transitioning them all into Acknowledged state.

So, when a bunch of consumers are cooperatively consumed from a topic using a share group, the natural unit of sharing is the record batch. The processing loop is roughly:

  • Fetch record batch

  • Process records

  • Acknowledge all records in batch

In the situation where some records in a batch have been released or rejected separately, subsequent fetches of those records are more likely to have gaps.

Client programming interface

The KafkaConsumer interface is enhanced to support share groups.

To join a share group, the client application instantiates a KafkaConsumer using the configuration parameters group.type="share"  and group.id to give the ID of the share group. Then, it uses KafkaConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

Each call to KafkaConsumer.poll(Duration) fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement.

If the application calls the new KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method for any record in the batch, it is using explicit acknowledgement. The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) must be issued in the order in which the records appear in the ConsumerRecords object, which will be in order of increasing offset for each share-partition. In this case:

  • The application calls KafkaConsumer.commitSync/Async() which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaConsumer.poll(Duration) without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaConsumer.close() which attempts to commit any pending acknowledgements and releases any remaining acquired records.

If the application does not call KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) for any record in the batch, it is using implicit acknowledgement. In this case:

  • The application calls KafkaConsumer.commitSync/Async() which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.

  • The application calls KafkaConsumer.poll(Duration) without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.

  • The application calls KafkaConsumer.close() which releases any acquired records without acknowledgement.

The KafkaConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically.

Example - Acknowledging a batch of records (implicit acknowledgement)

In this example, a consumer using share group "myshare" subscribes to topic "foo". It processes all of the records in the batch and then calls KafkaConsumer.commitSync() which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with Kafka. Asynchronous commit would also be acceptable. Note that apart from the configuration group.type="share", this code would work with a consumer group. This is the only example for which that is true.

Properties props = new Properties();

props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Returns a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        doProcessing(record);
    }
    consumer.commitSync();                                                              // Commit the acknowledgement of all the records in the batch
}

Behind the scenes, the KafkaConsumer fetches records from the share-partition leader. The leader selects the records in Available state, and will return complete record batches (https://kafka.apache.org/documentation/#recordbatch) if possible. It moves the records into Acquired state, increments the delivery count, starts the acquisition lock timeout, and returns them to the KafkaConsumer . Then the KafkaConsumer keeps a map of the state of the records it has fetched and returns a batch to the application.

When the application calls KafkaConsumer.commitSync(), the KafkaConsumer updates the state map by marking all of the records in the batch as Acknowledged and it then commits the acknowledgements by sending the new state information to the share-partition leader. For each share-partition, the share-partition leader updates the record states atomically.

Example - Per-record acknowledgement (explicit acknowledgement)

In this example, the application uses the result of processing the records to acknowledge or reject the records in the batch.

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Returns a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        try {
            doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);                       // Mark the record as processed successfully
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);                       // Mark the record as unprocessable
        }
    }
    consumer.commitAsync();                                                             // Commit the acknowledgements of all the records in the batch
}

In this example, each record processed is separately acknowledged using a call to the new KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method. The AcknowledgeType argument indicates whether the record was processed successfully or not. In this case, the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error such as a deserialization error, this is appropriate. For a transient error which might not affect a subsequent processing attempt, the AcknowledgeType.RELEASE is more appropriate because the record remains eligible for further delivery attempts.

The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) are simply updating the state map in the KafkaConsumer. It is only once KafkaConsumer.commitAsync() is called that the acknowledgements are committed by sending the new state information to the share-partition leader.

Example - Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)

In this example, the application stops processing the batch when it encounters an exception.

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Returns a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        try {
            doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);                       // Mark the record as processed successfully
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);                       // Mark this record as unprocessable
            break;
        }
    }
    consumer.commitAsync();                                                             // Commit the acknowledgements of the acknowledged records only
}

There are the following cases in this example:

  1. The batch contains no records, in which case the application just polls again. The call to KafkaConsumer.commitAsync() just does nothing because the batch was empty.

  2. All of the records in the batch are processed successfully. The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT) marks all records in the batch as successfully processed.

  3. One of the records encounters an exception. The call to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType.REJECT) rejects that record. Earlier records in the batch have already been marked as successfully processed. The call to KafkaConsumer.commitAsync() commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.

Access control

Share group access control is performed on the GROUP resource type, just the same as consumer groups, with the same rules for the actions checked. A share group is just a new kind of group.

  • Operations which read information about a share group need permission to perform the DESCRIBE action on the named group resource

  • Operations which change information about a share group (such as consuming a record) need permission to perform the READ action on the named group resource

Managing durable share-partition state

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share Start Offset (SSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt.

Examples

Operation

State changes

Cumulative state

Starting state of topic-partition with latest offset 100

SSO=100, SEO=100

SSO=100, SEO=100

In the batched case with successful processing, there’s a state change per batch to move the SSO forwards

Fetch records 100-109

SEO=110, records 100-109 (acquired, delivery count 1)

SSO=100, SEO=110, records 100-109 (acquired, delivery count 1)

Acknowledge 100-109

SSO=110

SSO=110, SEO=110

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

Fetch records 110-119

SEO=120, records 110-119 (acquired, delivery count 1)

SSO=110, SEO=120, records 110-119 (acquired, delivery count 1)

Release 110

record 110 (available, delivery count 1)

SSO=110, SEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Acknowledge 119

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SSO=110, SEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Fetch records 110, 120

SEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SSO=110, SEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Lock timeout elapsed 111, 112

records 111-112 (available, delivery count 1)

SSO=110, SEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 113-118

records 113-118 acknowledged

SSO=110, SEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Fetch records 111,112

records 111-112 (acquired, delivery count 2)

SSO=110, SEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 110

SSO=111

SSO=111, SEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 111,112

SSO=120

SSO=120, SEO=121, record 120 (acquired, delivery count 1)

Further details to follow as the design progresses.


Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaConsumer

This KIP introduces two new methods on KafkaConsumer  which apply only to share groups:

  • void acknowledge(ConsumerRecord record) 
  • void acknowledge(ConsumerRecord record, AcknowledgementType type)

Of the existing KafkaConsumer  methods, many of them do not apply to share groups and will result in an exception.

Method signatureDescriptionApplies to share groups?
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync()  or commitAsync()  call.

Only applies to share groups, otherwise throws a new InvalidGroupTypeException

void acknowledge(ConsumerRecord record, AcknowledgementType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync()  or commitAsync()  call.

Only applies to share groups, otherwise throws a new InvalidGroupTypeException

void assign(Collection<TopicPartition> partitions) Manually assign a list of partitions to this consumer.

No, throws a new InvalidGroupTypeException 

Set<TopicPartition> assignment()Get the set of partitions currently assignment to this consumer.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,Long> beginningOffset(Collection<TopicPartition> partitions Get the first offset for the given partitions. For a share group, returns the share start offset.

Yes

Map<TopicPartition,Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) Get the first offset for the given partitions. For a share group, returns the share start offset.

Yes

void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.

Yes

void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.

Yes

void commitAsync() Commit offsets returned on the last poll(Duration) for all the subscribed list of topics and partitions. For a share group, commits the acknowledgements for the records returned.

Yes

void commitAsync(Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback) Commit the specified offset for the specified list of topics and partitions to Kafka.

No, throws a new InvalidGroupTypeException 

void commitSync()Commit offsets returned on the last poll(Duration) for all the subscribed list of topics and partitions. For a share group, commits the acknowledgements for the records returned.

Yes

void commitSync(Duration timeout)Commit offsets returned on the last poll(Duration) for all the subscribed list of topics and partitions. For a share group, commits the acknowledgements for the records returned.

Yes

void commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)Commit the specified offsets for the specified list of topics and partitions.

No, throws a new InvalidGroupTypeException 

void commitSync(Map<TopicPartition,OffsetAndMetadata> offsets, Duration timeout)Commit the specified offsets for the specified list of topics and partitions.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions) Get the last committed offsets for the given partitions (whether the commit happened by this process or another).

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) Get the last committed offsets for the given partitions (whether the commit happened by this process or another).

No, throws a new InvalidGroupTypeException 

OptionalLong currentLag(TopicPartition topicPartition) Get the consumer's current lag on the partition.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions) Get the end offsets for the given partitions. For a share group, returns the share end offset.

Yes

Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) Get the end offsets for the given partitions. For a share group, returns the share end offset.

Yes

void enforceRebalance() Alter the consumer to trigger a new rebalance by rejoining the group.

No, throws a new InvalidGroupTypeException 

ConsumerGroupMetadata groupMetadata() Return the current group metadata associated with this consumer.

No, throws a new InvalidGroupTypeException 

Map<String, List<PartitionInfo>> listTopics()Get metadata about partitions for all topics that the user is authorized to view.

Yes

Map<String, List<PartitionInfo>> listTopics(Duration timeout) Get metadata about partitions for all topics that the user is authorized to view.

Yes

Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.

Yes

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch) Look up the offsets for the given partitions by timestamp.

No, throws a new InvalidGroupTypeException 

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long>, Duration timeout) Look up the offsets for the given partitions by timestamp.

No, throws a new InvalidGroupTypeException 

List<PartitionInfo> partitionsFor(String topic) Get metadata about the partitions for a given topic.

Yes

List<PartitionInfo> partitionsFor(String topic,  Duration timeout) Get metadata about the partitions for a given topic.

Yes

void pause(Collection<TopicPartition> partitions) Suspend fetching from the requested partitions.

No, throws a new InvalidGroupTypeException 

Set<TopicPartition> paused() Get the set of partitions that were previously paused by call to pause(Collection).

No, throws a new InvalidGroupTypeException 

ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.

Yes

long position(TopicPartition partition) Get the offset of the next record that will be fetched (if a record with that offset exists).

No, throws a new InvalidGroupTypeException 

long position(TopicPartition partition, Duration timeout) Get the offset of the next record that will be fetched (if a record with that offset exists).

No, throws a new InvalidGroupTypeException 

void resume(Collection<TopicPartition> partitions) Resume specified partitions which have been paused with pause(Collection) .

No, throws a new InvalidGroupTypeException 

void seek(TopicPartition partition, long offset) Overrides the fetch offsets that the consumer will use on the next poll(timeout) .

No, throws a new InvalidGroupTypeException 

void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) Overrides the fetch offsets that the consumer will use on the next poll(timeout) .

No, throws a new InvalidGroupTypeException 

void seekToBeginning(Collection<TopicPartition> partitions) Seek to the first offset for each of the given partitions.

No, throws a new InvalidGroupTypeException 

void seekToEnd(Collection<TopicPartition> partitions) Seek to the last offset for each of the given partitions.

No, throws a new InvalidGroupTypeException 

void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.

Yes

void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) Subscribe to the given list of topics to get dynamically assigned partitions.

No, throws a new InvalidGroupTypeException 

void subscribe(Pattern pattern) Subscribe to all topics matching specified pattern to get dynamically assigned partitions.

No, throws a new InvalidGroupTypeException 

void subscribe(Pattern pattern, ConsumerRebalanceListener listener) Subscribe to all topics matching specified pattern to get dynamically assigned partitions.

No, throws a new InvalidGroupTypeException 

Set<String> subscription Get the current subscription.

Yes

void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection)  or subscribe(Pattern) .

Yes

void wakeup() Wakeup the consumer.

Yes

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

Enum constantDescription
ACCEPT  (0)The record was consumed successfully
RELEASE  (1)The record was not consumed successfully. Release it for another delivery attempt.
REJECT  (2)The record was not consumed successfully. Reject it and do not release it for another delivery attempt.

AdminClient

Add the following methods on the AdminClient  interface.

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition,OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(String groupId, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

  • Altering the offsets for a share group resets the Share Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned partitions
  • A share group has only two states - EMPTY  and STABLE 

Command-line tools

A new tool is added for working with share groups called kafka-share-groups.sh . It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or topic which should be included in the reset offset process.

--version

Display Kafka version.

Here are some examples. 

To display a list of all share groups:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --list

To delete the information for topic T1  from inactive share group S1 , which essentially resets the consumption of this topic in the share group:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --delete-offsets

To set the starting offset for consuming topic T1  in inactive share group S1  to a specific date and time:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-datetime 1999-12-31T23:57:00.000 --execute

Configuration

Broker configuration

ConfigurationDescriptionValues
share.group.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release.
share.delivery.count.limit The maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
share.record.lock.duration.ms Share-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
share.record.lock.duration.max.ms Share-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
share.record.lock.partition.limit Share-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000

Client configuration

ConfigurationDescriptionValues
group.type Type of the group: "consumer"  or "share" .Default "consumer"
record.lock.duration.ms Record acquisition lock duration in milliseconds.null, which uses the cluster configuration share.record.lock.duration.ms , minimum 1000, maximum limited by the cluster configuration share.record.lock.duration.max.ms 

Kafka protocol changes

Further details to follow as the design progresses.

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

This KIP introduces delivery counts and a maximum number of delivery attempts. An obvious future extension is the ability to copy records that failed to be delivered onto a dead-letter queue. This would of course give a way to handle poison messages without them permanently blocking processing.

A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE ) on the group than a proper consumer ( READ ). This is a little more complicated because it needs to have a position independent of the SSO so that it can traverse along the queue.

The focus in this KIP is on sharing rather than ordering. The concept can be extended to give key-based ordering so that partial ordering and fine-grained sharing can be achieved at the same time.

Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.

Compatibility, Deprecation, and Migration Plan

The changes in this KIP add to the capabilities of Kafka rather than changing existing behavior.

Test Plan

Detail to follow.

Rejected Alternatives

None

  • No labels