Table of Contents | ||
---|---|---|
|
Status
Current state: Under Discussion Accepted
Discussion thread: Thread 1 and Thread 2here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The group configurations are stored in the controller like all the other dynamic configurations in the cluster. This allows configurations to be installed independently from wether whether the group exists or not. Configurations are also preserved if the group is deleted. In ZK mode, the dynamic group configurations will be store in the /config/groups
znode. In KRaft mode, they are stored in the ConfigRecord
.
...
- It has to setup the session timeouts for all the members (like today).
- It has to check wether whether the topic-partition metadata has changed and potentially trigger a rebalance for the group if it has.
- It has to check wether whether new topics match the regex subscriptions and trigger a rebalance for the group if new topic do.
- It has to check wether whether a new assignment is required for the group (group epoch != assignment epoch). If it is the case, the group coordinator can directly compute it using the server side assignor or can trigger a client side assignment computation.
...
- Lookups the group or creates it.
- Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.
- Checks wether whether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.
- There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and its request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions owned by the members are a subset of the target partitions. If it is the case, it is safe to transition the member to its target epoch again.
- Updates the members informations if any. The group epoch is incremented if there is any change. See "Group Epoch - Trigger a rebalance" chapter for details about the rebalance triggers.
- Reconcile the member assignments as explained earlier in this document.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{
"apiKey": TBD,
"type": "response",
"name": "ConsumerGroupHeartbeatResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED
// - NOT_COORDINATOR
// - COORDINATOR_NOT_AVAILABLE
// - COORDINATOR_LOAD_IN_PROGRESS
// - INVALID_REQUEST
// - UNKNOWN_MEMBER_ID
// - FENCED_MEMBER_EPOCH
// - UNSUPPORTED_ASSIGNOR
// - UNRELEASED_INSTANCE_ID
// - GROUP_MAX_SIZE_REACHED
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The member epoch." },
{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
"about": "True if the member should compute the assignment for the group." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
"about": "The heartbeat interval in milliseconds." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided; the assignment otherwise.", "fields": [
{ "name": "Error", "type": "int8", "versions": "0+",
"about": "The assigned error." },
{ "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member that can be used immediately." },
{ "name": "PendingTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member that cannot be used because they are not released by their former owners yet." },
{ "name": "MetadataVersion", "type": "int16", "versions": "0+",
"about": "The version of the metadata." },
{ "name": "MetadataBytes", "type": "bytes", "versions": "0+",
"about": "The assigned metadata." }
]}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
} |
...
When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:
- Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
- Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
- Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
- Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
- Returns the group state of the group.
...
When the group coordinator handle a ConsumerGroupInstallAssignmentRequest request:
- Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
- Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
- Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
- Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
- Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.
- Installs the new target assignment.
...
When the group coordinator handle a ConsumerGroupDescribeRequest request:
- Checks wether whether the group ids exists. If it does not, GROUP_ID_NOT_FOUND is returned.
- Looks up the groups and returns the response.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Epoch", "versions": "0+", "type": "int32" }, { "name": "TopicPartitionMetadataTopics", "versions": "0+", "type": "[]TopicPartitionTopicMetadata", "fields": [ { "name": "TopicId", "versions": "0+", "type": "uuid" }, { "name": "NumPartitions", "versions": "0+", "type": "int32" } ]} ], } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupMemberMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupEpoch", "versions": "0+", "type": "int32" }, { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", "type": "string" }, { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string" }, { "name": "ClientId", "versions": "0+", "type": "string" }, { "name": "ClientHost", "versions": "0+", "type": "string" }, { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" }, { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" }, { "name": "Assignors", "versions": "0+", "type": "[]Assignor", "fields": [ { "name": "Name", "versions": "0+", "type": "string" }, { "name": "MinimumVersion", "versions": "0+", "type": "int16" }, { "name": "MaximumVersion", "versions": "0+", "type": "int16" }, { "name": "Reason", "versions": "0+", "type": "int8" }, { "name": "Version", "versions": "0+", "type": "int16" }, { "name": "Metadata", "versions": "0+", "type": "bytes" } ]} ], } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.group.consumer; public interface PartitionAssignor { class AssignmentSpec { /** * The members keyed by member id. */ Map<String, AssignmentMemberSpec> members; /** * The topics' metadata keyed by topic id */ Map<Uuid, AssignmentTopicMetadata> topics; } class AssignmentMemberSpec { /** * The instance ID if provided. */ Optional<String> instanceId; /** * The rack ID if provided. */ Optional<String> rackId; /** * The topics that the member is subscribed to. */ Collection<String> subscribedTopics; /** * The current target partitions of the member. */ Collection<TopicPartition> targetPartitions; } class AssignmentTopicMetadata { /** * The topic name. */ String topicName; /** * The number of partitions. */ int numPartitions; } class GroupAssignment { /** * The member assignments keyed by member id. */ Map<String, MemberAssignment> members; } class MemberAssignment { /** * The target partitions assigned to this member. */ Collection<TopicPartition> targetPartitions; } /** * Unique name for this assignor. */ String name(); /** * Perform the group assignment given the current members and * topic metadata. * * @param assignmentSpec The assignment spec. * @return The new assignment for the group. */ GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException; } |
Broker Metrics
The set of new metrics is not clear at the moment. We plan to amend the KIP later on when progress on the implementation would have been made.
- Group count by type
- Group count by state
- Rebalance Rate
- Thread utilisation in percent
Broker Configurations
New properties in the broker configuration.
Existing generic group metrics have been migrated, with the same metric names except for
NumGroups which reported the number of generic groups. This metric changed to
kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|generic}
- number of groups based on type where type is the rebalance protocol
kafka.server:type=group-coordinator-metrics,name=partition-count,state={loading|active|failed}
- number of __consumer_offsets partitions based on state
kafka.server:type=group-coordinator-metrics,name=event-queue-size
- event accumulator queue size
kafka.server:type=group-coordinator-metrics,name=consumer-group-count,state={empty|assigning|reconciling|stable|dead}
- number of consumer groups based on state
consumer group rebalances sensor
- kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-rate
- kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-count
partition load sensor: __consumer_offsets partition load time
- kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
- kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg
thread idle ratio sensor: thread busy - idle ratio
- kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
- kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
Broker Configurations
New properties in the broker configuration.
Name | Type | Default | Doc |
---|---|---|---|
group.coordinator.threads | int | 1 | The number of threads used to run the state machines. |
group.consumer. | |||
Name | Type | Default | Doc |
group.coordinator.threads | int | 1 | The number of threads used to run the state machines. |
group.consumer.session.timeout.ms | int | 45s | The timeout to detect client failures when using the consumer group protocol. |
group.consumer.min.session.timeout.ms | int | 45s | The minimum session timeout. |
group.consumer.max.session.timeout.ms | int | 60s | The maximum session timeout. |
group.consumer.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
group.consumer.min.heartbeat.interval.ms | int | 5s | The minimum heartbeat interval. |
group.consumer.max.heartbeat.interval.ms | int | 15s | The maximum heartbeat interval. |
group.consumer.max.size | int | MaxValue | The maximum number of consumers that a single consumer group can accommodate. |
group.consumer.assignors | list | org.apache.kafka.server.group.consumer.UniformAssignor, org.apache.kafka.server.group.consumer.RangeAssignor | The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor. |
...
The KIP proposes to rely on the IBP/MetadataVersion to decide wether whether a record or an API could be used or not. We have discussed the idea to use a dedicate feature flag instead of relying on metadata.version. That would allow decoupling the group coordinator from the quorum controller during upgrades. We also need to flush out how to handle downgrades. We will do this in a future KIP.
...