...
Consumer Group | ||
---|---|---|
Name | Type | Description |
Group ID | string | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Group Epoch | int32 | The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group. |
Members | []Member | The set of members in the group. |
Partitions Metadata | []PartitionMetadata | The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes. |
Member | ||
Name | Type | Description |
Member ID | string | The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request client upon startup and must be used during the lifetime of the member. The ID is similar to an incarnation ID. |
Instance ID | string | The instance ID configured by the consumer. |
Rack ID | string | The rack ID configured by the consumer. |
Client ID | string | The client ID configured by the consumer. |
Client Host | string | The client host configured by the consumer. |
Subscribed Topic Names | []string | The current set of subscribed topic names configured by the consumer. |
Subscribed Topic Regex | string | The current subscription regular expression configured by the consumer. |
Server Assignor | string | The server side assignor used by the group. |
Client Assignors | []Assignor | The list of client-side assignors supported by the member. The order of this list defined the priority. |
Assignor | ||
Name | Type | Description |
Name | string | The unique name of the assignor. |
Reason | int8 | The reason why the metadata was updated. |
Minimum Version | int16 | The minimum version of the metadata schema supported by this assignor. |
Maximum Version | int16 | The maximum version of the metadata schema supported by this assignor. |
Version | int16 | The version used to encode the metadata. |
Metadata | bytes | The metadata provided by the consumer for this assignor. |
...
Every member is uniquely identified by a UUID. This is is called the Member ID. This UUID is generated on the server side and given to the member when it joins the groupclient side. It is used in all the communication with the group coordinator and must be kept during the entirely life span of the member (e.g. the consumer). In that sense, it is similar to an incarnation ID.
...
Code Block | ||
---|---|---|
| ||
{ "apiKey": TBD, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ConsumerGroupHeartbeatRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member id generated by the serverclient. The member id must be kept during the entire lifetime of the member." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch; 0 to join the group; -1 to leave the group." }, { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." }, { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The name of the assignor." }, { "name": "MinimumVersion", "type": "int16", "versions": "0+", "about": "The minimum supported version for the metadata." }, { "name": "MaximumVersion", "type": "int16", "versions": "0+", "about": "The maximum supported version for the metadata." }, { "name": "Reason", "type": "int8", "versions": "0+", "about": "The reason of the metadata update." }, { "name": "MetadataVersion", "type": "int16", "versions": "0+", "about": "The version of the metadata." }, { "name": "MetadataBytes", "type": "bytes", "versions": "0+", "about": "The metadata." } ]}, { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the partitions owned by the member.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]} ] } |
...
The target assignment is stored in a single record.
...
in N + 1 records where N is the number of members in the group. The records for the members are written first and followed by the assignment metadata. When a new assignment is computed, the group coordinator will compare it with the current assignment and only write the difference between the two assignments to the __consumer_offsets partition. The assignment must be atomic so the group coordinator will ensure that all the records are written in a single batch. This limit the size of the batch to 1MB (the default value). Given the incremental nature of the protocol, 1MB should be sufficient in most case here.
ConsumerGroupTargetAssignmentMetadataKey
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupTargetAssignmentKeyConsumerGroupTargetAssignmentMetadataKey", "validVersions": "6", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "6" } ] } |
...
ConsumerGroupTargetAssignmentMetadataValue
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupTargetAssignmentValueConsumerGroupTargetAssignmentMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" }, { "name ] } |
The AssignmentEpoch corresponds to the group epoch used to compute the assignment. It is not necessarily the last one.
ConsumerGroupTargetAssignmentMemberKey
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "Membersdata", "versionsname": "0+ConsumerGroupTargetAssignmentMemberKey", "validVersions": "7", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "7" }, { "name": "MemberId", "type": "string", "versions": "7" } ] } |
ConsumerGroupTargetAssignmentMemberValue
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupTargetAssignmentMemberValue", "validVersions": "0", "flexibleVersions": "0+", ""[]Member", "fields": [ { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" }, { "name": "MemberId", "versions": "0+", "type": "string" }, { "name": "Error", "versions": "0+", "type": "int8" }, { "name": "TopicPartitions", "versions": "0+", "type": "[]TopicPartition", "fields": [ { "name": "TopicId", "versions": "0+", "type": "uuid" }, { "name": "Partitions", "versions": "0+", "type": "[]int32" } ]}, { "name": "Version", "versions": "0+", "type": "int16" }, { "name": "Metadata", "versions": "0+", "type": "bytes" } ] ] } |
...
Current Member Assignment
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupCurrentMemberAssignmentKey", "validVersions": "78", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "7" }, { "name": "MemberId", "type": "string", "versions": "7" }, ] } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupCurrentMemberAssignmentValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "MemberEpoch", "versions": "0+", "type": "int32" }, { "name": "Error", "versions": "0+", "type": "int8" }, { "name": "TopicPartitions", "versions": "0+", "type": "[]TopicPartition", "fields": [ { "name": "TopicId", "versions": "0+", "type": "uuid" }, { "name": "Partitions", "versions": "0+", "type": "[]int32" } ]}, { "name": "Version", "versions": "0+", "type": "int16" }, { "name": "Metadata", "versions": "0+", "type": "bytes" } ], } |
Offsets
OffsetCommitValue
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "OffsetCommitValue", "validVersions": "0-4", "flexibleVersions": "4+", "fields": [ { "name": "offset", "type": "int64", "versions": "0+" }, { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true }, { "name": "metadata", "type": "string", "versions": "0+" }, { "name": "commitTimestamp", "type": "int64", "versions": "0+" }, { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true }, // Adds TopicId field. { "name": "topicId", "type": "uuid", "versions": "4", "ignorable": true } ] } |
...
The group membership protocol is also used outside of Apache Kafka. For instance, the Confluent Schema Registry uses it for leader election. It is not clear whether we really want to suppose such cases in the future. If we do, we could also define a new set of APIs for it. That would be much cleaner in the long run.
Metadata Transactions
The KIP proposes the rely on the atomicity of the batch to write the assignment to the __consumer_offsets partitions. This means that the assignment or, to be precise, the delta between two assignments can not be larger than 1MB where 1MB is the default batch size. In the future, we could imagine doing something similar to KIP-868 Metadata Transactions in the group coordinator. The solution outlined in KIP-868 does not work in our context because the __consumer_offsets is compacted. However, we could imagine a similar approach. We will tackle this in the future if needed.
Upgrade / Downgrade
The KIP proposes to rely on the IBP/MetadataVersion to decide wether a record or an API could be used or not. We have discussed the idea to use a dedicate feature flag instead of relying on metadata.version. That would allow decoupling the group coordinator from the quorum controller during upgrades. We also need to flush out how to handle downgrades. We will do this in a future KIP.