...
- Eager Mode - In the eager mode, the consumer revokes all its partitions before re-joining the group during a rebalance.
- Cooperative Mode - In the cooperative mode, the consumer only revokes the partitions that it does not own anymore before rejoining the group. So two rebalances are required to move a partition from member A to member B. One rebalance to revoke the partition from A and another rebalance to assign the partition to B.
The two modes must be treated differently. At the moment, the group coordinator does not know which mode is used by the group. This information will be added to the protocol.
Note that the upgrade path will only work from the consumer protocol version 3 (as described in KIP-792).
...
Code Block | ||
---|---|---|
| ||
{ "apiKey": TBD, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ConsumerGroupHeartbeatRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member id generated by the server. The member id must be kept during the entire lifetime of the member." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "The current member epoch; 0 to join the group; -1 to leave the group." }, { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null it not provided or if it didn't change since the last heartbeat; the instance Id otherwise." }, { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The name of the assignor." }, { "name": "MinimumVersion", "type": "int32", "versions": "0+", "about": "The minimum supported version for the metadata." }, { "name": "MaximumVersion", "type": "int32", "versions": "0+", "about": "The maximum supported version for the metadata." }, { "name": "Version", "type": "int32", "versions": "0+", "about": "The version of the metadata." }, { "name": "Reason", "type": "byteint8", "versions": "0+", "about": "The reason of the metadata update." }, { "name": "Metadata", "type": "bytes", "versions": "0+", "about": "The metadata." } ]}, { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the topics owned by the member.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]} ] } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": TBD, "type": "response", "name": "ConsumerGroupHeartbeatResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED // - NOT_COORDINATOR // - COORDINATOR_NOT_AVAILABLE // - COORDINATOR_LOAD_IN_PROGRESS // - INVALID_REQUEST // - UNKNOWN_MEMBER_ID // - FENCED_MEMBER_EPOCH // - UNSUPPORTED_ASSIGNOR // - COMPUTE_ASSIGNMENT "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error" }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The member epoch." }, { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+", "about": "The heartbeat interval in milliseconds." }, { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided; the assignment otherwise." "fields": [ { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The assigned topic-partitions to the member otherwise.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]}, { "name": "Error", "type": "byteint8", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The assigned error." } { "name": "Metadata", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The assigned metadata." } ] ] } |
...
Upon receiving the COMPUTE_ASSIGNMENT error, the consumer starts the assignment process.
Upon received receiving the UNKNOWN_MEMBER_ID or FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and rejoins with the same member id and the epoch 0.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": TBD, "type": "response", "name": "ConsumerGroupPrepareAssignmentResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED // - NOT_COORDINATOR // - COORDINATOR_NOT_AVAILABLE // - COORDINATOR_LOAD_IN_PROGRESS // - INVALID_REQUEST // - INVALID_GROUP_ID // - GROUP_ID_NOT_FOUND // - UNKNOWN_MEMBER_ID // - FENCED_MEMBER_EPOCH "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error" }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "GroupEpoch", "type": "int32", "versions": "0+", "about": "The group epoch." }, { "name": "ClientAssignorName", "type": "string", "versions": "0+", "about": "The selected assignor." }, { "name": "Members", "type": "[]Member", "versions": "0+", "about": "The members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The member epoch." }, { "name": "InstanceId", "type": "string", "versions": "0+", "about": "The member instance ID." }, { "name": "Subscriptions", "type": "[]uuid", "versions": "0+", "about": "The subscribed topic IDs." }, { "name": "AssignorVersion", "type": "int32", "versions": "0+", "about": "The version of the metadata." }, { "name": "AssignorReason", "type": "byteint8", "versions": "0+", "about": "The reason of the metadata update." }, { "name": "AssignorMetadata", "type": "bytes", "versions": "0+", "about": "The assignor metadata." }, { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "about": "The assigned topic-partitions to the member.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]} ]}, { "name": "PartitionMetadata", "type": "[]Metadata", "versions": "0+", "about": "The topic-partition metadata.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "NumPartitions", "type": "int32", "versions": "0+", "about": "The number of partitions." } ]} ] } |
...
If the response contains no error, the member calls the client side assignor with the group state.
Upon received receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.
Upon received receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": TBD, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ConsumerGroupInstallAssignmentRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member id assigned by the group coordinator." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The member epoch." }, { "name": "GroupEpoch", "type": "int32", "versions": "0+", "about": "The group epoch." }, { "name": "Error", "type": "byteint8", "versions": "0+", "about": "The assignment error; or zero if the assignment is successful." }, { "name": "Members", "type": "[]Member", "versions": "0+", "about": "The members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID." }, { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+", "about": "The assigned topic-partitions to the member.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]}, { "name": "MetadataVersion", "type": "int32", "versions": "0+", "about": "The metadata version." } { "name": "MetadataBytes", "type": "bytes", "versions": "0+", "about": "The metadata bytes." } ]} ] } |
...
If the response contains no error, the member is done.
Upon received receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.
Upon received the receiving any other errors, the consumer abandon the process.
...
Code Block | ||
---|---|---|
| ||
{ "apiKey": 16, "type": "response", "name": "ListGroupsResponse", // Version 1 adds the throttle time. // // Starting in version 2, on quota violation, brokers send out // responses before throttling. // // Version 3 is the first flexible version. // // Version 4 adds the GroupState field (KIP-518). // // Version 5 adds the GroupType field (KIP-848). "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "Groups", "type": "[]ListedGroup", "versions": "0+", "about": "Each group in the response.", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group ID." }, { "name": "ProtocolType", "type": "string", "versions": "0+", "about": "The group protocol type." }, { "name": "GroupState", "type": "string", "versions": "4+", "ignorable": true, "about": "The group state name." }, { "name": "GroupType", "type": "string", "versions": "5+", "ignorable": true, "about": "The group state name." } ]} ] } |
...
OffsetCommit API
The version of the API is bumped to 9.
Request Schema
...
language | js |
---|---|
linenumbers | true |
Required ACL
Request Validation
Request Handling
...
We propose to rename GenerationId to GenerationIdOrMemberEpoch.
Request Handling
When the group id corresponds to a consumer group using the new rebalance protocol, the provided member epoch must match the expected member epoch.
Response Schema
The response can return FENCED_MEMBER_EPOCH.
OffsetFetch API
The version of the API is bumped to 9.
Request Schema
Code Block | ||||
---|---|---|---|---|
|
Response Handling
OffsetCommit API
Update MemberId to UUID, add MemberEpoch
Request Schema
...
language | js |
---|---|
linenumbers | true |
{
"apiKey": 9,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "OffsetFetchRequest",
// In version 0, the request read offsets from ZK.
//
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
//
// Starting in version 2, the request can contain a null topics array to indicate that offsets
// for all topics should be fetched. It also returns a top level error code
// for group or coordinator level errors.
//
// Version 3, 4, and 5 are the same as version 2.
//
// Version 6 is the first flexible version.
//
// Version 7 is adding the require stable flag.
//
// Version 8 is adding support for fetching offsets for multiple groups at a time
//
// Version 9 adds GenerationIdOrMemberEpoch and MemberId fields (KIP-848).
"validVersions": "0-9",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
"about": "The group to fetch offsets for." },
// New fields.
{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "9+", "ignorable": true,
"about": "The member ID assigned by the group coordinator." },
// End of new fields.
{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7",
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
"about": "The topic name."},
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
"about": "The partition indexes we would like to fetch offsets for." }
]},
{ "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
"about": "Each group we would like to fetch offsets for", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID."},
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
"about": "The topic name."},
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
"about": "The partition indexes we would like to fetch offsets for." }
]}
]},
{"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
"about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."}
]
} |
Request Handling
The MemberId and the GenerationIdOrMemberEpoch are verified. FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID is returned.
Response Schema
The response is the same. Only new FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID errors could be returned.
Response Handling
Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.
DescribeConfigs API
The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.
AlterIncrementalConfigs API
The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.
Records
Records
This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.
As explained earlier, they will be persisted in the __consumer_offsets compacted topic. The compacted topic based storage requires a dedicated key type per record type in order for the compaction to work. The current protocol already uses versions from 0 to 2 (included) for the keys.
Group Metadata
Groups can be rather large so we propose to use several records to store a group in order to not be limited by the maximum batch size (1MB by default). Therefore we propose to store group metadata with two records types: the ConsumerGroupMetadata and the ConsumerGroupMemberMetadata.
A group with X members will be stored with X+2 records. One ConsumerGroupMemberMetadata per member, one ConsumerGroupPartitionMetadata, and one ConsumerGroupMetadata for the group at the end. Atomicity is not a concern here. All the records can be applied independently.
Moreover, the whole group does not necessarily have to be written for every epoch. Members who have not changed could be omitted as the compacted topic will retain their previous state anyway.
When a member is deleted, a tombstone for him is written to the partition.
ConsumerGroupMetadataKey
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupMetadataKey",
"validVersions": "3",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "3" }
]
} |
ConsumerGroupMetadataValue
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32" }
],
} |
ConsumerGroupPartitionMetadataKey
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupPartitionMetadataKey",
"validVersions": "4",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "4" }
]
} |
ConsumerGroupPartitionMetadataValue
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupPartitionMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32" },
{ "name": "TopicPartitionMetadata", "versions": "0+",
"type": "[]TopicPartition", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid" },
{ "name": "NumPartitions", "versions": "0+", "type": "int32" }
]}
],
} |
ConsumerGroupMemberMetadataKey
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupMemberMetadataKey",
"validVersions": "5",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "5" },
{ "name": "MemberId", "type": "string", "versions": "5" }
]
} |
ConsumerGroupMemberMetadataValue
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupMemberMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupEpoch", "versions": "0+", "type": "int32" },
{ "name": "InstanceId", "versions": "0+", "type": "string" },
{ "name": "ClientId", "versions": "0+", "type": "string" },
{ "name": "ClientHost", "versions": "0+", "type": "string" },
{ "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
{ "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
{ "name": "Assignors", "versions": "0+",
"type": "[]Assignor", "fields": [
{ "name": "Name", "versions": "0+", "type": "string" },
{ "name": "MinimumVersion", "versions": "0+", "type": "int32" },
{ "name": "MaximumVersion", "versions": "0+", "type": "int32" },
{ "name": "Version", "versions": "0+", "type": "int32" },
{ "name": "Reason", "versions": "0+", "type": "int8" },
{ "name": "Metadata", "versions": "0+", "type": "bytes" }
]}
],
} |
Target Assignment
The target assignment is stored in a single record.
ConsumerGroupTargetAssignmentKey
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupTargetAssignmentKey",
"validVersions": "6",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "5" }
]
} |
ConsumerGroupTargetAssignmentValue
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupTargetAssignmentValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32" },
{ "name": "AssignorName", "versions": "0+", "type": "string" },
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
{ "name": "Members", "versions": "0+", "type": "[]Member", "fields": [
{ "name": "MemberId", "versions": "0+", "type": "string" },
{ "name": "TopicPartitions", "versions": "0+",
"type": "[]TopicPartition", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid" },
{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
]},
{ "name": "MetadataError", "versions": "0+", "type": "int8" },
{ "name": "MetadataVersion", "versions": "0+", "type": "int32" },
{ "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
]
]
} |
ConsumerGroupTargetMemberAssignmentKey
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupTargetMemberAssignmentKey",
"validVersions": "6",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "6" },
{ "name": "MemberId", "type": "string", "versions": "6" },
]
} |
ConsumerGroupTargetMemberAssignmentValue
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupTargetMemberAssignmentValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
{ "name": "MemberId", "versions": "0+", "type": "string" },
{ "name": "TopicPartitions", "versions": "0+",
"type": "[]TopicPartition", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid" },
{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
]},
{ "name": "MetadataError", "versions": "0+", "type": "int8" },
{ "name": "MetadataVersion", "versions": "0+", "type": "int32" },
{ "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
],
} |
Current Member Assignment
The current member assignment represents, as the name suggests, the current assignment of a given member.
When a member is deleted from the group, a tombstone for him is written to the partition.
ConsumerGroupCurrentMemberAssignmentKey
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupCurrentMemberAssignmentKey",
"validVersions": "7",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "7" },
{ "name": "MemberId", "type": "string", "versions": "7" },
]
} |
ConsumerGroupCurrentMemberAssignmentValue
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupCurrentMemberAssignmentValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32" },
{ "name": "TopicPartitions", "versions": "0+",
"type": "[]TopicPartition", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid" },
{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
]},
{ "name": "MetadataError", "versions": "0+", "type": "int8" },
{ "name": "MetadataVersion", "versions": "0+", "type": "int32" },
{ "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
],
} |
Broker API
Broker side assignor API - Basically the same as the consumer but without metadata.
TODO
Broker Metrics
- Group count by type
- Group count by state
- Rebalance Rate
Broker Configurations
New properties in the broker configuration.
Name | Type | Default | Doc |
---|---|---|---|
group.consumer.session.timeout.ms | int | 30s | The timeout to detect client failures when using the consumer group protocol. |
group.consumer.min.session.timeout.ms | int | 45s | The minimum session timeout. |
group.consumer.max.session.timeout.ms | int | 60s | The maximum session timeout. |
group.consumer.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
group.consumer.min.heartbeat.interval.ms | int | 5s | The minimum heartbeat interval. |
group.consumer.max.heartbeat.interval.ms | int | 15s | The maximum heartbeat interval. |
group.consumer.max.size | int | MaxValue | The maximum number of consumers that a single consumer group can accommodate. |
group.consumer.assignors | List | range, uniform | The server side assignors. |
Group Configurations
New dynamic group properties.
Name | Type | Default | Doc |
---|---|---|---|
group.consumer.session.timeout.ms | int | 30s | The timeout to detect client failures when using the consumer group protocol. |
group.consumer.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
Consumer API
New PartitionAssignor interface
The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.consumer;
public interface PartitionAssignor {
class Group {
/**
* The members.
*/
List<GroupMember> members;
/**
* The mapping from topic ID to number of partitions
* as provided by the group coordinator
*/
Map<Uuid, Integer> topicMetadata;
}
class GroupMember {
/**
* The member ID.
*/
String memberId;
/**
* The instance ID if provided.
*/
Optional<String> instanceId;
/**
* The set of topic IDs that the member is subscribed to.
*/
List<Uuid> subscribedTopicIds;
/**
* The reason reported by the member.
*/
byte reason;
/**
* The version of the metadata encoded in {{@link GroupMember#metadata()}}.
*/
int version;
/**
* The custom metadata provided by the member as defined
* by {{@link PartitionAssignor#metadata()}}.
*/
ByteBuffer metadata;
/**
* The partitions owned by the member at the current epoch.
*/
List<TopicIdPartition> ownedPartitions;
}
class Assignment {
/**
* The member assignment.
*/
List<MemberAssignment> members;
}
class MemberAssignment {
/**
* The member ID.
*/
String memberId;
/**
* The assigned partitions.
*/
List<TopicIdPartition> partitions;
/**
* The error reported by the assignor.
*/
byte error;
/**
* The version of the metadata encoded in {{@link GroupMember#metadata()}}.
*/
int version;
/**
* The custom metadata provided by the assignor.
*/
ByteBuffer metadata;
}
class Metadata {
/**
* The reason reported by the assignor.
*/
byte reason;
/**
* The version of the metadata encoded in {{@link Metadata#metadata()}}.
*/
int version;
/**
* The custom metadata provided by the assignor.
*/
ByteBuffer metadata;
}
/**
* Unique name for this assignor.
*/
String name();
/**
* The minimum version.
*/
int minimumVersion();
/**
* The maximum version.
*/
int maximumVersion();
/**
* Return serialized data that will be sent to the assignor.
*/
Metadata metadata();
/**
* Perform the group assignment given the current members and
* topic metadata.
*
* @param group The group state.
* @return The new assignment for the group.
*/
Assignment assign(Group group);
/**
* Callback which is invoked when the member received a new
* assignment from the assignor/group coordinator.
*/
void onAssignment(MemberAssignment assignment);
} |
Deprecate Consumer#enforceRebalance and Consumer#enforceRebalance(String)
The enforceRebalance methods are no longer necessary and will be deprecated in a future release.
Deprecate ConsumerPartitionAssignor interface.
The ConsumerPartitionAssignor interface will be deprecated in a future (major) release.
Consumer Configurations
Name | Type | Default | Doc |
---|---|---|---|
group.protocol | enum | generic | A flag which indicates if the new protocol should be used or not. It could be: generic or consumer |
group.remote.assignor | string | uniform | The server side assignor to use. It cannot be used in conjunction with group.local.assignor. |
group.local.assignors | list | empty | The list of client side (local) assignors. It cannot be used in conjunction with group.remote.assignor. |
Streams Metadata
TODO
Streams Configurations
Name | Type | Default | Doc |
---|---|---|---|
group.protocol | enum | generic | A flag which indicates if the new protocol should be used or not. It could be: generic or consumer |
Admin API
Admin#listConsumerGroups
The Admin#listConsumerGroups will be extended to support querying group types and retrieving/querying the new group states.
Code Block | ||||
---|---|---|---|---|
| ||||
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
/**
* If types is set, only groups with these types will be returned.
*/
public ListConsumerGroupsOptions withTypes(Set<String> types) {
this.types = types;
}
/**
* Returns the list of Types that are requested or empty if no types
* have been specified.
*/
public Set<String> types() {
return types;
}
}
public class ConsumerGroupListing {
/**
* Consumer Group type, generic by default.
*/
public String type() {
return type;
}
}
public enum ConsumerGroupState {
UNKNOWN("Unknown"),
PREPARING_REBALANCE("PreparingRebalance"),
COMPLETING_REBALANCE("CompletingRebalance"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty"),
ASSIGNING("Assigning"),
RECONCILING("Reconciling");
} |
Admin#describeConsumerGroups
The Admin#describeConsumerGroups will be extended to expose the new information related to the new protocol.
Code Block | ||||
---|---|---|---|---|
| ||||
public class ConsumerGroupDescription {
public String type() {
return type;
}
}
public class MemberDescription {
// Current Assignment
public MemberAssignment assignment() {}
// Target Assignment
public MemberAssignment targetAssignment() {}
}
public class MemberAssignment {
/**
* The reason reported by the assignor.
*/
byte error;
/**
* The version of the metadata encoded in {{@link Metadata#metadata()}}.
*/
int version;
/**
* The custom metadata provided by the assignor.
*/
ByteBuffer metadata;
/**
* The partitions owned by the member at the current epoch.
*/
List<TopicIdPartition> ownedPartitions;
} |
Admin#incrementalAlterConfigs and Admin#describeConfigs
The GROUP resource type is added.
Code Block | ||||
---|---|---|---|---|
| ||||
public final class ConfigResource {
/**
* Type of resource.
*/
public enum Type {
GROUP((byte) 16), BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
}
} |
kafka-consumer-groups
The kafka-consumer-group command line tool will be extended to support the –type filter which allows to list or to describe groups implementing a specific type.
Code Block | ||||
---|---|---|---|---|
| ||||
kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–list -–type <comma separated list of types>
kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–describe -–type <comma separated list of types> |
Required ACL
Request Validation
Request Handling
Response Schema
...
language | js |
---|---|
linenumbers | true |
Response Handling
OffsetFetch API
Add MemberId/MemberEpoch
Request Schema
...
language | js |
---|---|
linenumbers | true |
Required ACL
Request Validation
Request Handling
Response Schema
...
language | js |
---|---|
linenumbers | true |
Response Handling
DescribeConfigs API
Add group resource.
Request Schema
...
language | js |
---|---|
linenumbers | true |
Required ACL
Request Validation
Request Handling
Response Schema
...
language | js |
---|---|
linenumbers | true |
Response Handling
AlterIncrementalConfigs API
Add group resource.
Request Schema
...
language | js |
---|---|
linenumbers | true |
Required ACL
Request Validation
Request Handling
Response Schema
...
language | js |
---|---|
linenumbers | true |
Response Handling
Records
TODO
Broker API
Broker side assignor API - Basically the same as the consumer but without metadata.
Broker Metrics
TODO
Broker Configurations
- session timeout
- heartbeat interval
- min/max for both
Group Configurations
- session timeout
- heartbeat interval
Consumer API
Client side assignor
Consumer Configurations
- flag
- server side assignor
- client side assignor
Streams Metadata
TODO
Streams Configurations
- flag
Admin API
...
Case Studies
Let’s look at a few examples to illustrate the rebalance logic. Let’s assume that the group is subscribed to the topic foo which has 3 partitions.
...