...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": 16, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListGroupsRequest", // Version 1 and 2 are the same as version 0. // // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). // // Version 5 adds the TypesFilter field (KIP-848). "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "4+", "about": "The states of the groups we want to list. If empty all groups are returned with their state." }, { "name": "TypesFilter", "type": "[]string", "versions": "5+", "about": "The types of the groups we want to list. If empty all groups are returned" } ] } |
Required ACL
- Read Describe Group
Response Schema
The GroupType field is introduced. It represents the type of the group.
...
We propose to rename GenerationId to GenerationIdOrMemberEpoch.
Required ACL
- Read Group
Request Handling
When the group id corresponds to a consumer group using the new rebalance protocol, the provided member epoch must match the expected member epoch.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": 9, "type": "request", "listeners": ["zkBroker", "broker"], "name": "OffsetFetchRequest", // In version 0, the request read offsets from ZK. // // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic. // // Starting in version 2, the request can contain a null topics array to indicate that offsets // for all topics should be fetched. It also returns a top level error code // for group or coordinator level errors. // // Version 3, 4, and 5 are the same as version 2. // // Version 6 is the first flexible version. // // Version 7 is adding the require stable flag. // // Version 8 is adding support for fetching offsets for multiple groups at a time // // Version 9 adds GenerationIdOrMemberEpoch and MemberId fields (KIP-848). "validVersions": "0-9", "flexibleVersions": "6+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." }, // New fields. { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true, "about": "The generation of the group." }, { "name": "MemberId", "type": "string", "versions": "9+", "ignorable": true, "about": "The member ID assigned by the group coordinator." }, // End of new fields. { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name."}, { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7", "about": "The partition indexes we would like to fetch offsets for." } ]}, { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+", "about": "Each group we would like to fetch offsets for", "fields": [ { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", "about": "The group ID."}, { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", "about": "The topic name."}, { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+", "about": "The partition indexes we would like to fetch offsets for." } ]} ]}, {"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false", "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."} ] } |
Required ACL
- Describe Group
Request Handling
The MemberId and the GenerationIdOrMemberEpoch are verified. FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID is returned.
...
Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.
DescribeConfigs API
Request Schema
The API schema is the same but supports a new resource type: the ResourceType field can be set to GROUP (16). When GROUP is used, the resource name corresponds to the group id.
Required ACL
- Describe Config on the group.
Response Schema
No changes.
AlterIncrementalConfigs API
The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.
Request Schema
The schema is the same but the ResourceType field can be set to GROUP (16).
Required ACL
- Alter Config on the group.
Response Schema
No changes.
Records
This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.
...
New properties in the broker configuration.
Name | Type | Default | Doc | group.new.coordinator.enable | bool | false | Wether to enable the new group coordinator and the consumer group protocol.|
---|---|---|---|---|---|---|---|
group.coordinator.threads | int | 1 | The number of threads used to run the state machines. | ||||
group.consumer.session.timeout.ms | int | 30s | The timeout to detect client failures when using the consumer group protocol. | ||||
group.consumer.min.session.timeout.ms | int | 45s | The minimum session timeout. | ||||
group.consumer.max.session.timeout.ms | int | 60s | The maximum session timeout. | ||||
group.consumer.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. | ||||
group.consumer.min.heartbeat.interval.ms | int | 5s | The minimum heartbeat interval. | ||||
group.consumer.max.heartbeat.interval.ms | int | 15s | The maximum heartbeat interval. | ||||
group.consumer.max.size | int | MaxValue | The maximum number of consumers that a single consumer group can accommodate. | ||||
group.consumer.assignors | List | range, uniform | The server side assignors. |
...