Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Interface stability

...

The durable share-partition state is recorded using control records, in a similar way to the transaction markers introduced in KIP-98 - Exactly Once Delivery and Transactional Messaging. These control records are written onto the topic-partition whose delivery they reflect. This is important for performance reasons because it means the share-partition leader is able to read and write them directly on the topic-partition for which it is of course also the leader.

Two new control record types are introduced: SHARE_CHECKPOINT (5) and SHARE_DELTA (6). They are written into separate message sets with the Control flag set. This flag indicates that the records are not intended for application consumption. Indeed, these message sets are not returned to consumer at all since they are just intended for the share-partition leader.

In order to recover the share-partition state, the share-partition leader has to read a SHARE_CHECKPOINT and zero or more SHARE_DELTA records which chain backwards to the SHARE_CHECKPOINT with the same checkpoint epoch. By applying the records in order, from earliest to latest, the state can be rebuilt.

To avoid having to scan the topic in order to find these records, the share-partition leader keeps a share snapshot file which lets it locate the control records more efficiently.

SHARE_CHECKPOINT

A SHARE_CHECKPOINT record contains a complete checkpoint of the share-partition state. It contains:

  • The group ID

  • The checkpoint epoch, which is an integer that increments with each SHARE_CHECKPOINT

  • The SPSO

  • The SPEO

  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples where each tuple contains information for a sequence of records with the same state and delivery count

Here are some examples of how the cumulative state from the previous table would be represented in SHARE_CHECKPOINT records:

Cumulative stateSHARE_CHECKPOINT
SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 100,
  "EndOffset": 100,
  "States": []
}


SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 121,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1 (whatever it was when it was acknowledged)
    },
    {
      "BaseOffset": 120,
      "LastOffset": 120,
      "State": 0 (Available),
      "DeliveryCount": 0
    }
  ]
}


Note that the Acquired state is not recorded because it’s transient. As a result, an Acquired record with a delivery count of 1 is recorded as Available with a delivery count of 0. In the unlikely event a share-partition leader crash, memory of the in-flight delivery will be lost.

...

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.KafkaShareConsumer ShareConsumer  with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer . The interface stability is Evolving .

Method signatureDescription
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
void acknowledge(ConsumerRecord record, AcknowledgeType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync()  or commitAsync()  call.
void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.
void commitAsync() Commits the acknowledgements for the records returned.
void commitSync()Commits the acknowledgements for the records returned.
void commitSync(Duration timeout)Commits the acknowledgements for the records returned.
Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.
ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using the subscribe API.
void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.
Set<String> subscription() Get the current subscription.
void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection) .
void wakeup() Wakeup the consumer.

...