...
Share groups introduce two new APIs in the Kafka protocol .
...
for fetching and acknowledging records.
ShareFetch
for fetching records from share-partition leadersShareAcknowledge
for acknowledging delivery with share-partition leaders
...
A new enum org.apache.kafka.common.ShareGroupState
is added:
Enum constant |
---|
|
|
|
|
Its definition follows the pattern of ConsumerGroupState
with fewer states.
...
ShareGroupHeartbeat
- for consumers to form and maintain share groupsShareGroupDescribe
- for describing share groupsShareFetch
- for fetching records from share-partition leadersShareAcknowledge
- for acknowledging delivery of records with share-partition leaders- Additional APIs not yet documented for the AdminAPI enhancements which will follow the obvious existing precedents
...
Code Block |
---|
{
"apiKey": TBD,
"type": "response",
"name": "ShareGroupHeartbeatResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED
// - NOT_COORDINATOR
// - COORDINATOR_NOT_AVAILABLE
// - COORDINATOR_LOAD_IN_PROGRESS
// - INVALID_REQUEST
// - UNKNOWN_MEMBER_ID
// - GROUP_MAX_SIZE_REACHED
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
"about": "The heartbeat interval in milliseconds." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided; the assignment otherwise.", "fields": [
{ "name": "Error", "type": "int8", "versions": "0+",
"about": "The assigned error." },
{ "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member." }
]}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
} |
ShareGroupDescribe API
The ShareGroupDescribe API is used to describe share groups.
Request schema
Code Block |
---|
{
"apiKey": NN,
"type": "request",
"listeners": ["broker"],
"name": "ShareGroupDescribeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include authorized operations." }
]
} |
Response schema
Code Block |
---|
{
"apiKey": NN,
"type": "response",
"name": "ShareGroupDescribeResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
"about": "Each described group.",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group ID string." },
{ "name": "GroupState", "type": "string", "versions": "0+",
"about": "The group state string, or the empty string." },
{ "name": "GroupEpoch", "type": "int32", "versions": "0+",
"about": "The group epoch." },
{ "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
"about": "The assignment epoch." },
{ "name": "AssignorName", "type": "string", "versions": "0+",
"about": "The selected assignor." },
{ "name": "Members", "type": "[]Member", "versions": "0+",
"about": "The members.",
"fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID." },
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member instance ID." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member rack ID." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch." },
{ "name": "ClientId", "type": "string", "versions": "0+",
"about": "The client ID." },
{ "name": "ClientHost", "type": "string", "versions": "0+",
"about": "The client host." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
"about": "The subscribed topic names." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]
}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]},
{ "name": "Assignment", "versions": "0+", "fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The assigned topic-partitions to the member." },
{ "name": "Error", "type": "int8", "versions": "0+",
"about": "The assigned error." },
{ "name": "MetadataVersion", "type": "int32", "versions": "0+",
"about": "The assignor metadata version." },
{ "name": "MetadataBytes", "type": "bytes", "versions": "0+",
"about": "The assignor metadata bytes." }
]}
]
} |
ShareFetch API
The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "null if not provided or if it didn't change since the last fetch; the group identifier otherwise." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last fetch; the member id generated by the coordinator otherwise." }, { "name": "AcquisitionTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "-1 if it didn't chance since the last fetch; the maximum time in milliseconds that the fetched records are acquired for the consumer." }, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": true, "about": "The share session ID." }, { "name": "SessionEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, "about": "The share session epoch, which is used for ordering requests in a session." }, { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, "about": "The current leader epoch of the partition." }, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." } ]} ]}, { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false, "about": "In an incremental fetch request, the partitions to remove.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions indexes to forget." } ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ShareFetchResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": false, "about": "The share session ID." }, { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no fetch error." }, { "name": "LastStableOffset", "type": "int64", "versions": "0+", "default": "-1", "ignorable": true, "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]}, { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "about": "The aborted transactions.", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId", "about": "The producer id associated with the aborted transaction." }, { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The first offset in the aborted transaction." } ]}, { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}, { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": [ {"name": "BaseOffset", "type": "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."}, {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."}, {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."} ]} ]} ]}, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0, "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] } |
...
The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ShareAcknowledgeRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "SessionId", "type": "int32", "versions": "0+", "about": "The share session ID." }, { "name": "SessionEpoch", "type": "int32", "versions": "0+", "about": "The share session epoch, which is used for ordering requests in a session." }, { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+", "about": "The topics containing records to acknowledge.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+", "about": "The partitions containing records to acknowledge.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "Start offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "GapOffsets", "type": "[]int64", "versions": "0+", "about": "Array of offsets in this range which do not correspond to records."}, { "name": "AcknowledgeType", "type": "string", "versions": "0+", "default": "accept", "about": "The type of acknowledgement, such as accept or release."} ]} ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ShareAcknowledgeResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": false, "about": "The share session ID." }, { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]} ]} ]}, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0, "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] } |
...