Table of Contents |
---|
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]4743
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP wants to introduce a command-line tool to enable users to reset Consumer Group offsets.
...
* Some messages with earlier timestamp might also be consumed."
Proposed Changes
This process will be separated in 3 steps:
Generate Reset Plan file with specific offsets by partition to reset, for a specific Consumer Group
Verify that the Reset Plan file is consistent and the current state of Consumer Group offsets
Execute the Reset Plan file, resetting offsets to the values specified in it.
Also, we will consider an option to generate and execute in one step, for users that don't need to validate the Reset Plan file.
This options will enable clients to move back and forward, depending on where they are currently in the topic.
For instance: if a topic 'T1' with 1 partition and 100 records (offsets 0-99) has 2 Consumer Groups: one Consumer Group 'C1' inactive, with current offset 20, and other Consumer Group 'C2' that has been actively consuming records from T1. If we reset to a point in time where offset is 80, clients will be moving forward. But if C2 is reset to the same point in time, it will be moving backward.
Public Interfaces
Consumer Group Reset Offset Tool
...
Additionally, this tool with support an additional operation to generate and execute in the one step:
--generate-and-execute
Generate Reset Plan file
To reset Consumer Group offsets first we will need a Reset Plan file with the specific offsets by partitions to reset to. This file could be generated in 3 ways:
...
--group: defines the Consumer Group ID (i.e. ´group.id´). Required.
--topics: comma-separated list of topics. Optional, if it’s not specified it will use all topics consumed by Consumer Group.
--output-file: relative/absolute path to an output file that will be populated with JSON file generated. Optional, if it’s not specified, JSON result will be printed in the terminal.
Reset Plan File: JSON Format
Code Block | ||
---|---|---|
| ||
{ “offsets”: { “group_id”: “ConsumerGroup1”, “topics”: [ “topic1”: [ { “partition”: 0, “offset”: 0 }, { “partition”: 1, “offset”: 0 } ], “topic2”: [ { “partition”: 0, “offset”: 0 }, { “partition”: 1, “offset”: 0 } ], ] }, “version”: 1 } |
Reset to Timestamp
This option will support a DateTime format (https://www.w3.org/TR/xmlschema-2/#dateTime) as input. This value will be transformed to epoch milliseconds and then used to get offsets by partition.
...
bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-to-timestamp 2017-01-01T09:00:00.000 --output kafka-consumer-group-reset.json
´´´
Reset From Period
This options will support Duration format (https://www.w3.org/TR/xmlschema-2/#duration) as input. This value will be subtracted from current timestamp from client’s machine to obtain the epoch milliseconds to be used to get offsets by partition.
...
bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-from-period P1M --output kafka-consumer-group-reset.json
´´´
Default
This option will generate a file with the current Consumer Groups offsets by partition. The result of this execution will generate a JSON file that could be used as a template to modify the offsets manually for all partitions.
...
bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --output kafka-consumer-group-reset.json
´´´
Verify JSON file
Once JSON file is generated, the tool will offer an option to validate it with the current state of the cluster. The tool will validate the following:
...
bin/kafka-consumer-group-reset.sh --verify --reset-json-file kafka-consumer-group-reset.json
´´´
Execute JSON file
Once user feels comfortable with the Reset Plan file, could prepare the environment to execute the file.
...
Finally user can use the ´verify´ option to validate that current and defined offset are the same.
Compatibility, Deprecation, and Migration Plan
This KIP won’t require a Migration Plan.
...
All the other options will be available for releases from 0.10.0.0 given that KIP-97 is implemented.
Test Plan
A unit test to validate correct command options by operation (generate, verify, execute)
A unit test to validate the functionality for each operation:
Generate:
Validate that topics consumed are obtained when no ´--topics´ argument is specified
Validate that current offsets by partition are obtained when default option is executed
Validate that an offset is obtained when a datetime is specified
Validate that oldest offset is obtained when a datetime is older than the oldest timestamp by partition
Validate that latest offset is obtained when a datetime is higher than the latest timestamp by partition
Validate that an offset is obtained when a period is specified
Validate that oldest offset is obtained when a period minus current timestamp is older than the oldest timestamp by partition
Validate that latest offset is obtained when a period minus current timestamp is higher than the latest timestamp by partition
Verify:
Validate that JSON structure is correct
Validate that topics exist
Validate that partitions exist
Validate that topics are actually consumed by Consumer Group
Warn that topic does not exist and nothing will be executed
Warn that topic is not been consumed by Consumer Group and nothing will be executed
Warn that partition does not exist and nothing will be executed.
Warn that specified offset is out of range (ie. from oldest to latest) and it will be reset to oldest or latest depending on which one is closer.
Execute:
Validate that ConsumerGroup is inactive.
Validate that offsets are updated in cluster after execution.
Warn that non-existing/not-consumed topic has not been updated
Warn that non-existing partitions has not been updated
- Warn that offsets out of bound has been updated to oldest or latest, depending which one is closer.
Rejected Alternatives
None
1. Reset to specific offset: Move Consumer Group to specific offset by partition.
...