...
Consumer Group | ||
---|---|---|
Name | Type | Description |
Group ID | string | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Group Epoch | int32 | The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group. |
Members | []Member | The set of members in the group. |
Partitions Metadata | []PartitionMetadata | The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes. |
Member | ||
Name | Type | Description |
Member ID | string | The unique identifier of the member. It is generated by the client once and must be used during its lifetime. The ID is similar to an incarnation ID. |
Instance ID | string | The instance ID configured by the consumer. |
Client ID | string | The client ID configured by the consumer. |
Client Host | string | The client ID host configured by the consumer. |
Subscribed Topic Names | []string | The current set of subscribed topic names configured by the consumer. |
Subscribed Topic Regex | string | The current subscription regular expression configured by the consumer. |
Server Assignor | string | The server side assignor used by the group. |
Client Assignors | []Assignor | The list of client-side assignors supported by the member. The order of this list defined the priority. |
Assignor | ||
Name | Type | Description |
Name | string | The unique name of the assignor. |
Reason | int8 | The reason why the metadata was updated. |
Minimum Version | int16 | The minimum version of the metadata schema supported by this assignor. |
Maximum Version | int16 | The maximum version of the metadata schema supported by this assignor. |
Version | int16 | The version used to encode the metadata. |
Metadata | bytes | The metadata provided by the consumer for this assignor. |
...
The new rebalance protocol relies on server side configurations such as group.consumer.heartbeat.interval.ms and group.consumer.session.timeout.ms. Our goal is to give administrator the ability to use and tweak those settings for their entire consumers fleet. However, it may not always be possible to have values fitting all workloads. Therefore, we propose to extend the IncrementalAlterConfigs and the DescribeConfigs API to support a new resource type called GROUP. This allows users to override the default defined by the administrators. The dynamic group configurations are described in the Public Interfaces section.
The group coordinator is responsible for storing those group configurations in order to keep their lifecycle tight to their group. When a group is deleted, we want the configuration to be deleted as well. This assumes that IncrementalAlterConfigs and the DescribeConfigs API will be routed to the group coordinator owning the group they are acting uponconfigurations are stored in the controller like all the other dynamic configurations in the cluster. This allows configurations to be installed independently from wether the group exists or not. Configurations are also preserved if the group is deleted. In ZK mode, the dynamic group configurations will be store in the /config/groups
znode. In KRaft mode, they are stored in the ConfigRecord
.
Regex Based Subscription
...
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "ConsumerGroupCurrentMemberAssignmentValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MemberEpoch", "versions": "0+", "type": "int32" },
{ "name": "Error", "versions": "0+", "type": "int8" },
{ "name": "TopicPartitions", "versions": "0+",
"type": "[]TopicPartition", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid" },
{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
]},
{ "name": "Version", "versions": "0+", "type": "int16" },
{ "name": "Metadata", "versions": "0+", "type": "bytes" }
],
} |
Group Configurations
GroupConfigurationKey
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "GroupConfigurationKey", "validVersions": "8", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "8" } ] } |
GroupConfigurationValue
Code Block | ||||
---|---|---|---|---|
| ||||
{
"type": "data",
"name": "GroupConfigurationValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Configurations", "versions": "0+", "type": "[]Configuration",
"fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The name of the configuration key." },
{ "name": "Value", "type": "string", "versions": "0+",
"about": "The value of the configuration." }
]}
]
} |
Broker API
The new PartitionAssignor interface will be introduced on the server side. Two implementations will be provided out of the box: RangeAssignor (range) and UniformAssignor (uniform).
...
Name | Type | Default | Doc |
---|---|---|---|
group.coordinator.threads | int | 1 | The number of threads used to run the state machines. |
group.consumer.session.timeout.ms | int | 30s | The timeout to detect client failures when using the consumer group protocol. |
group.consumer.min.session.timeout.ms | int | 45s | The minimum session timeout. |
group.consumer.max.session.timeout.ms | int | 60s | The maximum session timeout. |
group.consumer.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
group.consumer.min.heartbeat.interval.ms | int | 5s | The minimum heartbeat interval. |
group.consumer.max.heartbeat.interval.ms | int | 15s | The maximum heartbeat interval. |
group.consumer.max.size | int | MaxValue | The maximum number of consumers that a single consumer group can accommodate. |
group.consumer.assignors | List | range, uniform | The server side assignors. |
Group Configurations
New dynamic group properties.
...