Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Correct formatting

...

The group and members represent the current state of a share group. This is reminiscent of a simplified consumer group.

Share Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members

[]Member

The set of members in the group.
Partitions Metadata

[]PartitionMetadata

The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
Name

Type

Description
Member ID

string

The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request and must be used throughout the lifetime of the member.
Rack ID

string

The rack ID configured by the consumer.
Client ID

string

The client ID configured by the consumer.
Client Host

string

The client host of the consumer.
Subscribed Topic Names

[]string

The current set of subscribed topic names configured by the consumer.
Server Assignor

string

The server-side assignor used by the group.

Target Assignment

The target assignment of the group. This represents the assignment that all the members of the group will eventually converge to. It is a declarative assignment which is generated by the assignor based on the group state.

Target Assignment
NameTypeDescription
Group ID stringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Assignment Epochint32The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch.
Assignment Errorint8The error reported by the assignor.
Members[]MemberThe assignment for each member.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member.
Partitions[]TopicIdPartitionThe set of partitions assigned to this member.
MetadatabytesThe metadata assigned to this member.

Current Assignment

The current assignment of a member.

Current Assignment
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Member IDstringThe member ID of this member.
Member Epochint32The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member.
Errorint8The error reported by the assignor.
Partitions[]TopicIdPartitionThe current partitions used by the member.
Versionint16The version used to encode the metadata.
MetadatabytesThe current metadata used by the member.

Rebalance process

The rebalance process is driven by the group coordinator and revolves around three kinds of epochs: the group epoch, the assignment epoch and the member epoch. This is intentionally very similar to how the process works for consumer groups in KIP-848.

...

Add another case to the org.apache.kafka.common.GroupType  enum:

Enum constantDescription
SHARE("share") Share group

Command-line tools

kafka-share-groups.sh

...

A new tool called kafka-console-share-consumer.sh  is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh  but using a share group and supporting the various acknowledge modes. It has the following options:

OptionDescription
--bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.
--consumer-config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--consumer-property <String: consumer_prop>

Consumer property in the form key=value.

--enable-systest-events

Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

--formatter <String: class>

The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)

--formatter-config <String: config file>

Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.

--group <String: share groud id>

The share group id of the consumer. (default: share)

--help

Print usage information.

--key-deserializer <String: deserializer for keys>

The name of the class to use for deserializing keys.

--max-messages <Integer: num_messages>

The maximum number of messages to consume before exiting. If not set, consumption is continual.

--property <String: prop>

The properties to initialize the message formatter. Default properties include:

 print.timestamp=true|false

 print.key=true|false

 print.offset=true|false

 print.delivery=true|false

 print.partition=true|false

 print.headers=true|false

 print.value=true|false

 key.separator=<key.separator>

 line.separator=<line.separator>

 headers.separator=<line.separator>

 null.literal=<null.literal>

 key.deserializer=<key.deserializer>

 value.deserializer=<value.deserializer>

 header.deserializer=<header.deserializer>

Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.

--reject

If specified, messages are rejected as they are consumed.

--reject-message-on-error

If there is an error when processing a message, reject it instead of halting.

--release

If specified, messages are released as they are consumed.

--timeout-ms <Integer: timeout_ms>

If specified, exit if no message is available for consumption for the specific interval.

--topic <String: topic>

REQUIRED: The topic to consume from.

--value-deserializer <String: deserializer for values>

The name of the class to use for deserializing values.

--version

Display Kafka version.

kafka-producer-perf-test.sh

The following enhancements are made to the kafka-producer-perf-test.sh tool. The changes are intended to make this tool useful for observing the operation of share groups by generating a low message rate with predictable message payloads.

OptionDescription
--throughput THROUGHPUT(Existing option) Enhanced to permit fractional rates, such as 0.5 meaning 1 message every 2 seconds.
--payload-monotonicpayload is monotonically increasing integer.

Configuration

Broker configuration

...