...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupCurrentMemberAssignmentValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Epoch", "versions": "0+", "type": "int32" }, { "name": "TopicPartitions", "versions": "0+", "type": "[]TopicPartition", "fields": [ { "name": "TopicId", "versions": "0+", "type": "uuid" }, { "name": "Partitions", "versions": "0+", "type": "[]int32" } ]}, { "name": "MetadataError", "versions": "0+", "type": "int8" }, { "name": "MetadataVersion", "versions": "0+", "type": "int32" }, { "name": "MetadataBytes", "versions": "0+", "type": "bytes" } ], } |
Broker API
Broker side assignor API - Basically the same as the consumer but without metadata.
TODO
Broker Metrics
- Group count by type
- Group count by state
- Rebalance Rate
Broker Configurations
New properties in the broker configuration.
...
The new PartitionAssignor interface will be introduced on the server side. Two implementations will be provided out of the box: RangeAssignor (range) and UniformAssignor (uniform).
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.group.consumer;
public interface PartitionAssignor {
class Group {
/**
* The members.
*/
List<GroupMember> members;
/**
* The mapping from topic ID to number of partitions
* as provided by the group coordinator
*/
Map<Uuid, Integer> topicMetadata;
}
class GroupMember {
/**
* The member ID.
*/
String memberId;
/**
* The instance ID if provided.
*/
Optional<String> instanceId;
/**
* The set of topic IDs that the member is subscribed to.
*/
List<Uuid> subscribedTopicIds;
/**
* The partitions owned by the member at the current epoch.
*/
List<TopicIdPartition> ownedPartitions;
}
class Assignment {
/**
* The member assignment.
*/
List<MemberAssignment> members;
}
class MemberAssignment |
Group Configurations
New dynamic group properties.
...
Consumer API
New PartitionAssignor interface
The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.consumer; public interface PartitionAssignor { class Group { /** * The membersmember ID. */ List<GroupMember> String membersmemberId; /** * The mapping from topic ID to number of partitions * as provided by the group coordinator */ Map<Uuid, Integer> topicMetadataassigned partitions. */ List<TopicIdPartition> partitions; } class GroupMember { /** * TheUnique member ID. */ String memberId; /** * The instance ID if provided. */ Optional<String> instanceId; name for this assignor. */ String name(); /** * ThePerform setthe ofgroup topicassignment IDsgiven that the membercurrent members and * topic metadata. * * @param group The group state. * @return The new assignment for the group. */ Assignment assign(Group group); } |
Broker Metrics
- Group count by type
- Group count by state
- Rebalance Rate
Broker Configurations
New properties in the broker configuration.
Name | Type | Default | Doc |
---|---|---|---|
group.consumer.session.timeout.ms | int | 30s | The timeout to detect client failures when using the consumer group protocol. |
group.consumer.min.session.timeout.ms | int | 45s | The minimum session timeout. |
group.consumer.max.session.timeout.ms | int | 60s | The maximum session timeout. |
group.consumer.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
group.consumer.min.heartbeat.interval.ms | int | 5s | The minimum heartbeat interval. |
group.consumer.max.heartbeat.interval.ms | int | 15s | The maximum heartbeat interval. |
group.consumer.max.size | int | MaxValue | The maximum number of consumers that a single consumer group can accommodate. |
group.consumer.assignors | List | range, uniform | The server side assignors. |
Group Configurations
New dynamic group properties.
Name | Type | Default | Doc |
---|---|---|---|
group.consumer.session.timeout.ms | int | 30s | The timeout to detect client failures when using the consumer group protocol. |
group.consumer.heartbeat.interval.ms | int | 5s | The heartbeat interval given to the members. |
Consumer API
New PartitionAssignor interface
The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.consumer; public interface PartitionAssignor { class Group { /** * The members. */ List<GroupMember> members; /** * The mapping from topic ID to number of partitions * as provided by the group coordinator */ Map<Uuid, Integer> topicMetadata; } class GroupMember { /** * The member ID. */ String memberId; /** * The instance ID if provided. */ Optional<String> instanceId; /** * The set of topic IDs that the member is subscribed to. */ List<Uuid> subscribedTopicIds; /** * The reason reported by the member. */ byte reason; /** * The version of the metadata encoded in {{@link GroupMember#metadata()}}. */ int version; /** * The custom metadata provided by the member as defined * by {{@link PartitionAssignor#metadata()}}. */ ByteBuffer metadata; /** * The partitions owned by the member at the current epoch. */ List<TopicIdPartition> ownedPartitions; } class Assignment { /** * The member assignment. */ List<MemberAssignment> members; } class MemberAssignment { /** * The member ID. */ String memberId; /** * The assigned partitions. */ List<TopicIdPartition> partitions; /** * The error reported by the assignor. is subscribed to. */ List<Uuid> subscribedTopicIds; /** * The reason reported by the member. */ byte reason; /** * The version of the metadata encoded in {{@link GroupMember#metadata()}}. */ int version; /** * The custom metadata provided by the member as defined * by {{@link PartitionAssignor#metadata()}}. */ ByteBuffer metadata; /** * The partitions owned by the member at the current epoch. */ List<TopicIdPartition> ownedPartitions; } class Assignment { /** * The member assignment. */ List<MemberAssignment> members; } class MemberAssignment { /** * The member ID. */ String memberId; /** * The assigned partitions. */ List<TopicIdPartition> partitions; /** * The error reported by the assignor. */ byte error; /** * The version of the metadata encoded in {{@link GroupMember#metadata()}}. */ int version; /** * The custom metadata provided by the assignor. */ ByteBuffer metadata; } class Metadata { /** * The reason reported by the assignor. */ byte reason; /** * The version of the metadata encoded in {{@link Metadata#metadata()}}. */ int version; /** * The custom metadata provided by the assignor. */ ByteBuffer metadata; } /** * Unique name for this assignor. */ String name(); /** * The minimum version. */ int minimumVersion(); /** * The maximum version this assignor. */ String name(); /** * The minimum version. */ int minimumVersion(); /** * The maximum version. */ int maximumVersion(); /** * Return serialized data that will be sent to the assignor. */ Metadata metadata(); /** * Perform the group assignment given the current members and * topic metadata. * * @param group The group state. * @return The new assignment for the group. */ int Assignment maximumVersionassign(Group group); /** * Return serialized data that will be sent to Callback which is invoked when the member received a new * assignment from the assignor/group coordinator. */ Metadata void metadataonAssignment(); /** * Perform the group assignment given the current members and * topic metadata. * * @param group The group state. * @return The new assignment for the group. */ Assignment assign(Group group); /** * Callback which is invoked when the member received a new * assignment from the assignor/group coordinator. */ void onAssignment(MemberAssignment assignment); } |
Deprecate Consumer#enforceRebalance and Consumer#enforceRebalance(String)
The enforceRebalance methods are no longer necessary and will be deprecated in a future release.
Deprecate ConsumerPartitionAssignor interface.
The ConsumerPartitionAssignor interface will be deprecated in a future (major) release.
Consumer Configurations
...
A flag which indicates if the new protocol should be used or not. It could be: generic or consumer
...
MemberAssignment assignment);
} |
Deprecate Consumer#enforceRebalance and Consumer#enforceRebalance(String)
The enforceRebalance methods are no longer necessary and will be deprecated in a future release.
Deprecate ConsumerPartitionAssignor interface.
The ConsumerPartitionAssignor interface will be deprecated in a future (major) release.
Consumer Configurations
Name | Type | Default | Doc |
---|---|---|---|
group.protocol | enum | generic | A flag which indicates if the new protocol should be used or not. It could be: generic or consumer |
group.remote.assignor | string | uniform | The server side assignor to use. It cannot be used in conjunction with group.local.assignor. |
group.local.assignors | list | empty | The list of client side (local) assignors. It cannot be used in conjunction with group.remote.assignor. |
Streams Metadata
TODO
name | type | note |
processId | uuid / static | Inherited. In the future we may also remove this field when one instance only use one consumer |
userEndPoint | bytes / static | Inherited |
clientTags | map / static | Inherited |
errorCode | int8 / dynamic | Inherited and enhanced. Communicate rebalance reasons, possible values:
|
topologyHash | uuid / dynamic | Only updatable when errorCode is not “none”. |
taskLag | array / dynamic | Only updatable when errorCode is not “none”. |
name | type | note |
activeTasks | list | Inherited |
standbyTasks | map | Modified, only contain normal standby tasks |
warmupTasks | map | New, warm-up standby tasks |
partitionsByHost | map | Merged, global assignment information used for IQ |
errorCode | int8 | Inherited, possible values:
|
Streams Metadata
...
Streams Configurations
Name | Type | Default | Doc |
---|---|---|---|
group.protocol | enum | generic | A flag which indicates if the new protocol should be used or not. It could be: generic or consumer |
Admin API
Admin#listConsumerGroups
...