Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: AcknowledgeCommitListener and transaction improvements

...

For the read_committed isolation level, the share group only consumes committed records. The share-partition leader itself is responsible for keeping track of the commit and abort markers and filtering out transactional records which have been aborted. So, the set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPEO can only move up to the last stable offset.

This processing has to occur on the broker because none of the clients receives all of the records. It can be performed with shallow iteration of the log.

In-flight records example

...

A new interface KafkaShareConsumer is introduced for consuming from share groups. It looks very similar to KafkaConsumer trimmed down to the methods that apply to share groups.

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interface, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

To join a share group, the client application instantiates a KafkaShareConsumer using the configuration parameter group.id to give the ID of the share group. Then, it uses KafkaShareConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

...

The KafkaShareConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per share-partition.

Example - Acknowledging a batch of records (implicit acknowledgement)

When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.TimeoutException, but the record may well be re-acquired for the same consumer and returned to it again.

Acknowledge commit callback

Acknowledgements errors are delivered to a new kind of callback called an acknowledge commit callback which can optionally be registered with a KafkaShareConsumer.wakeup() .

  • If the application uses KafkaShareConsumer.commitSync() to commit its acknowledgements, the results of the acknowledgements is returned to the application

  • If the application uses KafkaShareConsumer.commitAsync()  or KafkaShareConsumer.poll(Duration) to commit its acknowledgements, the results of the acknowledgements are only delivered if there is an acknowledge commit callback registered.

The acknowledge commit callback is called on the application thread and it is not permitted to call the methods of KafkaShareConsumer  with the exception of KafkaShareConsumer.wakeup() .

Example - Acknowledging a batch of records (implicit acknowledgement)

In this example, a consumer using share group "myshare" subscribes to topic "foo". It processes all of the records in the batch and then calls KafkaShareConsumer.commitSync() which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with In this example, a consumer using share group "myshare" subscribes to topic "foo". It processes all of the records in the batch and then calls KafkaShareConsumer.commitSync() which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with Kafka. Asynchronous commit would also be acceptable.

...

Wakeup the consumer
Method signatureDescription
KafkaShareConsumer(Map<String, Object> configs)
Constructor
KafkaShareConsumer(Properties properties)
Constructor
KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
void acknowledge(ConsumerRecord record) Acknowledge successful delivery of a record returned on the last poll(Duration). The acknowledgement is committed on the next commitSync(), commitAsync() or poll(Duration) call.
void acknowledge(ConsumerRecord record, AcknowledgeType type) Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync(), commitAsync() or poll(Duration) call.
Uuid clientInstanceId(Duration timeout)
Determines the client's unique client instance ID used for telemetry.
void close() Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
void close(Duration timeout) Tries to close the consumer cleanly within the specified timeout.
void commitAsync() Commits the acknowledgements for the records returned.
void Map<TopicIdPartition, Optional<KafkaException>> commitSync()Commits the acknowledgements for the records returned.
void Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout)Commits the acknowledgements for the records returned.
Map<MetricName, ? extends Metric> metrics() Get the metrics kept by the consumer.
ConsumerRecords<K,V> poll(Duration timeout) Fetch data for the topics or partitions specified using the subscribe API.
void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.
Set<String> subscription() Get the current subscription.
void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection) .
void wakeup() 
.
void subscribe(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.
Set<String> subscription() Get the current subscription.
void unsubscribe() Unsubscribe from topics currently subscribed with subscribe(Collection) .
void wakeup() Wakeup the consumer.

AcknowledgeCommitCallback

The new org.apache.kafka.clients.consumer.AcknowledgeCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to called the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

Method signatureDescription
void onCompletion(Map<TopicIdPartition, Optional<KafkaException>> results) 

A callback method the user can implement to provide asynchronous handling of request completion.

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

...

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

Request schema

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

Request schema

Code Block
{
  "apiKey": NN
Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersionstype": "0request",
  "flexibleVersionslisteners": ["0+broker"],
  "fields": [
    { "name": "GroupIdShareFetchRequest",
 "type": "string", "versionsvalidVersions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "aboutflexibleVersions": "null if not provided or if it didn't change since the last fetch; the group identifier otherwise." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "aboutfields": "The member ID." },[
    { "name": "MaxWaitMs", "type": "int32GroupId", "versionstype": "0+string",
      "aboutversions": "The maximum time in milliseconds to wait for the response." },
    { "name0+", "nullableVersions": "MinBytes0+", "typedefault": "int32null", "versionsentityType": "0+groupId",
      "about": "The minimum bytes to accumulate in the responsenull if not provided or if it didn't change since the last fetch; the group identifier otherwise." },
    { "name": "MaxBytesMemberId", "type": "int32string", "versions": "0+", "defaultnullableVersions": "0x7fffffff0+", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.member ID." },
    { "name": "TopicsMaxWaitMs", "type": "[]FetchTopicint32", "versions": "0+",
      "about": "The topics maximum time in milliseconds to fetchwait for the response.", "fields": [
  },
     { "name": "TopicIdMinBytes", "type": "uuidint32", "versions": "0+", "ignorable": true,
      "about": "The unique topic ID"minimum bytes to accumulate in the response." },
      { "name": "PartitionsMaxBytes", "type": "[]FetchPartitionint32", "versions": "0+",
 "default": "0x7fffffff", "ignorable": true,
      "about": "The partitionsmaximum bytes to fetch.", "fields": [
      See KIP-74 for cases where this limit may not be honored." },
    { "name": "PartitionIndexTopics", "type": "int32[]FetchTopic", "versions": "0+",
          "about": "The partitiontopics to indexfetch." },
 "fields": [
      { "name": "CurrentLeaderEpochTopicId", "type": "int32uuid", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The currentunique leader epoch of the partition." topic ID"},
        { "name": "PartitionMaxBytesPartitions", "type": "int32[]FetchPartition", "versions": "0+",
          "about": "The maximum bytespartitions to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },.", "fields": [
        { "name": "AcknowledgementBatchesPartitionIndex", "type": "[]AcknowledgementBatchint32", "versions": "0+",
          "about": "RecordThe batches to acknowledge.", "fields": [partition index." },
          { "name": "StartOffsetCurrentLeaderEpoch", "type": "int64int32", "versions": "0+",
 "default": "-1", "ignorable": true,
          "about": "StartThe offsetcurrent ofleader batchepoch of recordsthe to acknowledgepartition." },
          { "name": "LastOffsetPartitionMaxBytes", "type": "int64int32", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
 The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
         { "name": "GapOffsetsAcknowledgementBatches", "type": "[]int64AcknowledgementBatch", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},Record batches to acknowledge.", "fields": [
          { "name": "AcknowledgeTypeStartOffset", "type": "int8int64", "versions": "0+", "default": "0",
            "about": "The typeStart offset of batch of acknowledgementrecords - 0:Accept,1:Release,2:Rejectto acknowledge."},
        ]}
    ]},
    { "name": "ForgottenTopicsDataLastOffset", "type": "[]ForgottenTopicint64", "versions": "0+", "ignorable": false,

            "about": "InLast anoffset incremental(inclusive) fetchof request,batch theof partitionsrecords to removeacknowledge."}, "fields": [

          { "name": "TopicIdGapOffsets", "type": "uuid[]int64", "versions": "0+", "ignorable": true,
            "about": "The unique topic IDArray of offsets in this range which do not correspond to records."},
          { "name": "PartitionsAcknowledgeType", "type": "[]int32int8", "versions": "0+", "default": "0",
            "about": "The type partitionsof indexesacknowledgement to forget- 0:Accept,1:Release,2:Reject." }
        ]}
    ]
}

Response schema

Code Block
{},
    { "apiKeyname": NN"ForgottenTopicsData",
  "type": "response[]ForgottenTopic",
  "nameversions": "ShareFetchResponse0+",
  "validVersionsignorable": "0"false,
      "flexibleVersionsabout": "0+The partitions to remove from this share session.",
  "fields": [
      { "name": "ThrottleTimeMsTopicId", "type": "int32uuid", "versions": "0+", "ignorable": true,
      "about": "The durationunique in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." topic ID"},
      { "name": "ErrorCodePartitions", "type": "int16[]int32", "versions": "0+",
 "ignorable": true,
      "about": "The toppartitions levelindexes responseto error codeforget." },
    ]}
  ]
}

Response schema

Code Block
{
  "nameapiKey": "Responses"NN,
  "type": "[]ShareFetchableTopicResponseresponse",
  "versionsname": "0+ShareFetchResponse",
  "validVersions": "0",
   "aboutflexibleVersions": "The response topics.0+",
  "fields": [
      { "name": "TopicIdThrottleTimeMs", "type": "uuidint32", "versions": "0+", "ignorable": true,
      "about": "The unique topic ID"duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16", "versions": "0+",
 "ignorable": true,
      "about": "The topic partitionstop level response error code.", "fields": [ },
        { "name": "PartitionIndexResponses", "type": "int32[]ShareFetchableTopicResponse", "versions": "0+",
          "about": "The partitionresponse indextopics." },
 "fields": [
      { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "0+",
 "ignorable":         true, "about": "The errorunique code, or 0 if there was no fetch error." topic ID"},
        { "name": "LastStableOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },topic partitions.", "fields": [
        { "name": "CurrentLeaderPartitionIndex", "type": "LeaderIdAndEpochint32", "versions": "0+",
          "taggedVersionsabout": "0+", "tag": 0, "fields": [
 The partition index." },
         { "name": "LeaderIdErrorCode", "type": "int32int16", "versions": "0+",
            "about": "The IDerror of the current leader code, or -10 if there thewas leaderno isfetch unknownerror." },
          { "name": "LeaderEpochAcknowledgeErrorCode", "type": "int32int16", "versions": "0+",
            "about": "The latestacknowledge knownerror leader epoch." }
        ]code, or 0 if there was no acknowledge error." },
           { "name": "AbortedTransactionsCurrentLeader", "type": "[]AbortedTransactionLeaderIdAndEpoch", "versions": "0+", "nullableVersionstaggedVersions": "0+", "ignorabletag": true0,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerIdLeaderId", "type": "int64int32", "versions": "0+", "entityType": "producerId",
            "about": "The producer id associated withID of the current leader or -1 if the abortedleader is transactionunknown." },
          { "name": "FirstOffsetLeaderEpoch", "type": "int64int32", "versions": "0+",
            "about": "The firstlatest offsetknown in the aborted transactionleader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0,
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...