It is generated by the
client once and must be used during its lifetime.
Table of Contents | ||
---|---|---|
|
...
Consumer Group | ||
---|---|---|
Name | Type | Description |
Group ID | string | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Group Epoch | int32 | The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group. |
Members | []Member | The set of members in the group. |
Partitions Metadata | []PartitionMetadata | The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes. |
Member | ||
Name | Type | Description |
Member ID | string | The unique identifier of the member. It is generated by the client once coordinator upon the first heartbeat request and must be used during its the lifetime of the member. The ID is similar to an incarnation ID. |
Instance ID | string | The instance ID configured by the consumer. |
Rack ID | string | The rack ID configured by the consumer. |
Client ID | string | The client ID configured by the consumer. |
Client Host | string | The client host configured by the consumer. |
Subscribed Topic Names | []string | The current set of subscribed topic names configured by the consumer. |
Subscribed Topic Regex | string | The current subscription regular expression configured by the consumer. |
Server Assignor | string | The server side assignor used by the group. |
Client Assignors | []Assignor | The list of client-side assignors supported by the member. The order of this list defined the priority. |
Assignor | ||
Name | Type | Description |
Name | string | The unique name of the assignor. |
Reason | int8 | The reason why the metadata was updated. |
Minimum Version | int16 | The minimum version of the metadata schema supported by this assignor. |
Maximum Version | int16 | The maximum version of the metadata schema supported by this assignor. |
Version | int16 | The version used to encode the metadata. |
Metadata | bytes | The metadata provided by the consumer for this assignor. |
...
The rebalance timeout is provided by the member when it joins the group. It is basically the max poll interval configured on the client side. The timer starts ticking when the heartbeat response request is sent out processed by the group coordinator.
...
Code Block | ||
---|---|---|
| ||
{
"apiKey": TBD,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ConsumerGroupHeartbeatRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id generated by the server. The member id must be kept during the entire lifetime of the member." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "The current member epoch; 0 to join the group; -1 to leave the group." },
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
{ "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
"about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
{ "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." },
{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The name of the assignor." },
{ "name": "MinimumVersion", "type": "int16", "versions": "0+",
"about": "The minimum supported version for the metadata." },
{ "name": "MaximumVersion", "type": "int16", "versions": "0+",
"about": "The maximum supported version for the metadata." },
{ "name": "Reason", "type": "int8", "versions": "0+",
"about": "The reason of the metadata update." },
{ "name": "MetadataVersion", "type": "int16", "versions": "0+",
"about": "The version of the metadata." },
{ "name": "MetadataBytes", "type": "bytes", "versions": "0+",
"about": "The metadata." }
]},
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the partitions owned by the member.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
} |
...
- GroupId must be non-empty.
- MemberId must be non-empty.
- MemberEpoch must be >= -1.
- InstanceId, if provided, must be non-empty.
- RebalanceTimeoutMs must be larger than zero in the first heartbeat request.SubscribedTopicNames and SubscribedTopicRegex cannot be used together.
- SubscribedTopicNames or SubscribedTopicRegex must be, at minimum, in the first heartbeat requestheartbeat request when member epoch is 0.
- SubscribedTopicRegex must be a valid regular expression.
- ServerAssignor and ClientAssignors cannot be used together.
- Assignor.Name must be non-empty.
- Assignor.MinimumVersion must be >= -1.
- Assignor.MaximumVersion must be >= 0 and >= Assignor.MinimumVersion.
- Assignor.Version must be in the >= Assignor.MinimumVersion and <= Assignor.MaximumVersion.
...
If the response contains no error, the member is done with the assignment process.
Upon receiving the STALE_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.
...
Code Block | ||
---|---|---|
| ||
{ "apiKey": 16, "type": "response", "name": "ListGroupsResponse", // Version 1 adds the throttle time. // // Starting in version 2, on quota violation, brokers send out // responses before throttling. // // Version 3 is the first flexible version. // // Version 4 adds the GroupState field (KIP-518). // // Version 5 adds the GroupType field (KIP-848). "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "Groups", "type": "[]ListedGroup", "versions": "0+", "about": "Each group in the response.", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group ID." }, { "name": "ProtocolType", "type": "string", "versions": "0+", "about": "The group protocol type." }, { "name": "GroupState", "type": "string", "versions": "4+", "ignorable": true, "about": "The group state name." }, { "name": "GroupType", "type": "string", "versions": "5+", "ignorable": true, "about": "The group statetype name." } ]} ] } |
Response Handling
...