...
The group and members represent the current state of a share group. This is reminiscent of a simplified consumer group.
Share Group | ||
---|---|---|
Name | Type | Description |
Group ID | string | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Group Epoch | int32 | The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group. |
Members | []Member | The set of members in the group. |
Partitions Metadata | []PartitionMetadata | The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes. |
Member | ||
Name | Type | Description |
Member ID | string | The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request and must be used throughout the lifetime of the member. |
Rack ID | string | The rack ID configured by the consumer. |
Client ID | string | The client ID configured by the consumer. |
Client Host | string | The client host of the consumer. |
Subscribed Topic Names | []string | The current set of subscribed topic names configured by the consumer. |
Server Assignor | string | The server-side assignor used by the group. |
Target Assignment
The target assignment of the group. This represents the assignment that all the members of the group will eventually converge to. It is a declarative assignment which is generated by the assignor based on the group state.
Target Assignment | ||
---|---|---|
Name | Type | Description |
Group ID | string | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Assignment Epoch | int32 | The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch. |
Assignment Error | int8 | The error reported by the assignor. |
Members | []Member | The assignment for each member. |
Member | ||
Name | Type | Description |
Member ID | string | The unique identifier of the member. |
Partitions | []TopicIdPartition | The set of partitions assigned to this member. |
Metadata | bytes | The metadata assigned to this member. |
Current Assignment
The current assignment of a member.
Current Assignment | ||
---|---|---|
Name | Type | Description |
Group ID | string | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Member ID | string | The member ID of this member. |
Member Epoch | int32 | The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member. |
Error | int8 | The error reported by the assignor. |
Partitions | []TopicIdPartition | The current partitions used by the member. |
Version | int16 | The version used to encode the metadata. |
Metadata | bytes | The current metadata used by the member. |
Rebalance process
The rebalance process is driven by the group coordinator and revolves around three kinds of epochs: the group epoch, the assignment epoch and the member epoch. This is intentionally very similar to how the process works for consumer groups in KIP-848.
...
Add another case to the org.apache.kafka.common.GroupType
enum:
Enum constant | Description |
---|---|
SHARE("share") | Share group |
Command-line tools
kafka-share-groups.sh
...
A new tool called kafka-console-share-consumer.sh
is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh
but using a share group and supporting the various acknowledge modes. It has the following options:
Option | Description |
---|---|
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--consumer-config <String: config file> | Consumer config properties file. Note that [consumer-property] takes precedence over this config. |
--consumer-property <String: consumer_prop> | Consumer property in the form key=value. |
--enable-systest-events | Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) |
--formatter <String: class> | The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter) |
--formatter-config <String: config file> | Config properties file to initialize the message formatter. Note that [property] takes precedence of this config. |
--group <String: share groud id> | The share group id of the consumer. (default: share) |
--help | Print usage information. |
--key-deserializer <String: deserializer for keys> | The name of the class to use for deserializing keys. |
--max-messages <Integer: num_messages> | The maximum number of messages to consume before exiting. If not set, consumption is continual. |
--property <String: prop> | The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.offset=true|false print.delivery=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> header.deserializer=<header.deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. |
--reject | If specified, messages are rejected as they are consumed. |
--reject-message-on-error | If there is an error when processing a message, reject it instead of halting. |
--release | If specified, messages are released as they are consumed. |
--timeout-ms <Integer: timeout_ms> | If specified, exit if no message is available for consumption for the specific interval. |
--topic <String: topic> | REQUIRED: The topic to consume from. |
--value-deserializer <String: deserializer for values> | The name of the class to use for deserializing values. |
--version | Display Kafka version. |
kafka-producer-perf-test.sh
The following enhancements are made to the kafka-producer-perf-test.sh
tool. The changes are intended to make this tool useful for observing the operation of share groups by generating a low message rate with predictable message payloads.
Option | Description |
---|---|
--throughput THROUGHPUT | (Existing option) Enhanced to permit fractional rates, such as 0.5 meaning 1 message every 2 seconds. |
--payload-monotonic | payload is monotonically increasing integer. |
Configuration
Broker configuration
...