...
Enum constant | Description |
---|---|
SHARE("share") | Share group |
Command-line tools
kafka-share-groups.sh
A new tool is added for working with share groups called kafka-share-groups.sh
is added for working with share groups. It has the following options:
...
$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-datetime 1999-12-31T23:57:00.000 --execute
kafka-console-share-consumer.sh
A new tool called kafka-console-share-consumer.sh
is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh
but using a share group and supporting the various acknowledge modes. It has the following options:
Option | Description |
---|---|
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--consumer-config <String: config file> | Consumer config properties file. Note that [consumer-property] takes precedence over this config. |
--consumer-property <String: consumer_prop> | Consumer property in the form key=value. |
--enable-systest-events | Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) |
--formatter <String: class> | The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter) |
--formatter-config <String: config file> | Config properties file to initialize the message formatter. Note that [property] takes precedence of this config. |
--group <String: share groud id> | The share group id of the consumer. (default: share) |
--help | Print usage information. |
--key-deserializer <String: deserializer for keys> | The name of the class to use for deserializing keys. |
--max-messages <Integer: num_messages> | The maximum number of messages to consume before exiting. If not set, consumption is continual. |
--property <String: prop> | The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.offset=true|false print.delivery=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> header.deserializer=<header.deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers. |
--reject | If specified, messages are rejected as they are consumed. |
--reject-message-on-error | If there is an error when processing a message, reject it instead of halting. |
--release | If specified, messages are released as they are consumed. |
--timeout-ms <Integer: timeout_ms> | If specified, exit if no message is available for consumption for the specific interval. |
--topic <String: topic> | REQUIRED: The topic to consume from. |
--value-deserializer <String: deserializer for values> | The name of the class to use for deserializing values. |
--version | Display Kafka version. |
kafka-producer-perf-test.sh
The following enhancements are made to the kafka-producer-perf-test.sh
tool. The changes are intended to make this tool useful for observing the operation of share groups by generating a low message rate with predictable message payloads.
Option | Description |
---|---|
--throughput THROUGHPUT | (Existing option) Enhanced to permit fractional rates, such as 0.5 meaning 1 message every 2 seconds. |
--payload-monotonic | payload is monotonically increasing integer. |
Configuration
Broker configuration
...