Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-4743
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP wants to introduce a command-line tool to enable users to reset Consumer Group offsets.
Today, if users want to reprocess old records (i.e. records where offset is smaller than current consumer offset), they will need to modify client-side code (eg. using "KafkaConsumer#seek()" operations).
This process will involve write their own program which starts a consumer and commits offset or change client-side code, redeploy the application, and eventually rollback changes in application to consume records as usual. This is a cumbersome process given that users have to be aware of how many partitions are by topic, in which offset they are, and to which offset they want to move to.
Bringing the option to reset offsets from outside of the application will enable a cleaner and consistent way to achieve reprocessing and move along the topic.
Currently, there are 2 indexes that allow users to reprocess records: offset index and timestamp index.
Offset index represent a pointer over the sequential id that each record has by partition, available from Kafka inception.
Timestamp index (added in release 0.10.1.0 by KIP-33) represents a pointer over the epoch milliseconds from record creation, that could be defined by the Producer or the Broker, depending in the Timestamp Type.
From KIP-33:
"Searching by timestamp will have better accuracy. The guarantees provided are:
* The messages whose timestamp are after the searched timestamp will be consumed.
* Some messages with earlier timestamp might also be consumed."
Proposed Changes
We are considering to add the options in ConsumerGroupCommand to reset offsets to support the following scenarios:
- Reset by time:
- Reset to Datetime: When we want to reset offsets to an specific point in time. (e.g. to 1/1/2017 at 00:00 to reprocess all records from these year)
- Reset to Period: When we want to go back some period ago (e.g. P7D to reprocess all records from one week ago)
- Reset to Earliest: When we want to reprocess all the records available by partition.
- Reset to Latest: When a Consumer Groups has an offset lag and we don't want to process missing records, only move to the latest.
- Reset to Current Time: When we want to only print out and/or backup the current offset by partition.
- Reset by position:
- Reset to Offset: When we want to move to an specific offset.
- Reset to Current Offset Minus 'n' offsets: When we want to reprocess an specific number of records by partition (e.g. `minus 5` will reprocess the 5 prior records from the current Consumer Group offset by partition).
- Reset to Current Offset Plus 'n' offsets: When we want to avoid process an specific number of records after the current Consumer Group offset by partition (e.g. `plus 5` will move the Current Group offset 5 position from the current.)
- Reset by file
- From exported plan:
This operations will be executed by Consumer Group and will have the following scopes:
- All Topics consumed by Consumer Group: This scope will consider all the topics that has been consumed by a Consumer Group.
- Specific List of Topics: This scope will consider all the topics defined by user.
- One Topic, All Partitions: This scope will consider only one topic and all partitions.
- One Topic, Specific Partition: This scope will consider only one topic and partition specified by user.
And there will be 3 execution options:
- Plan: Print the result of the operation (i.e. show probable offsets to reset if the operation is executed) (Default)
- Execute: Users will have to explicitly ask to execute the reset tool.
- Export: Export plan to a JSON file to execute it later.
Public Interfaces
Consumer Group Reset Offset options
Main option
--reset-offset
This option should be executed independently from other Consumer Group options (list, describe, delete, etc.)
Additional Arguments
ID | Argument | Type | Descriptio |
---|---|---|---|
1. | --group | Required | Consumer Group ID. |
Scenarios
ID | Scenario | Arguments | Description |
---|---|---|---|
1.a. | Reset to Timestamp | --reset-to-datetime YYYY-MM-DDTHH:mm:SS.sss | This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets. |
1.b. | Reset to Period | --reset-to-period PnYnMnDTnHnMnS | This option will subtract the period to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. |
1.c. | Reset to Earliest | --reset-to-earliest | This option will reset offsets to the earliest using Kafka Consumer's `auto.offset.reset` to `earliest` |
1.d. | Reset to Latest | --reset-to-latest | This option will reset offsets to the latest using Kafka Consumer's `auto.offset.reset` to `latest` |
1.e. | Reset to Current Position | (no scenario arguments) | This option won't reset the offset. It will be used to print and export current offset. |
2.a. | Reset to Offset | --reset-to | This option will reset offsets to an specific value. |
2.b. | Reset to Current Offset Minus 'n' offsets | --reset-minus n | This option will subtract the `n` value to the current offset, and reset to the result. |
2.c. | Reset to Current Offset Plus 'n' offsets | --reset-plus n | This option will add the `n` value to the current offset, and reset to the result |
3.a | Reset from File | --reset-from-file PATH_TO_FILE | This option will take a Reset Plan file with the offsets to reset by partitions. |
Scopes
ID | Scope | Arguments | Description |
---|---|---|---|
1. | All topics | (no scope arguments) | In this case the tool will run the scenario considering all topics that has been consumed by Consumer Group. It will consider all partitions |
2. | List of Topics | --topics topic1,topic2... | In this case, the tool first will validate that input topics are been consumed by Consumer Group, and then run the scenario. It will consider all partitions |
3. | Topic | --topic topic1 | In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider all partitions |
4. | Topic and List of Partitions | --topic topic1 --partitions 0,1,2 | In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider only the partitions specified. |
Execution Options
ID | Option | Arguments | Description |
---|---|---|---|
1. | Plan | (no execution arguments) | This execution option will only print out the result of the scenario by scope. |
2. | Execute | --execute | This execution option will run the reset offset process based on scenario and scope. |
3. | Export | --export path_to_file | This execution option will export the plan to a file, that later could be used to reset the offsets. (i.e. as backup) |
Reset Plan File: JSON Format
{ “offsets”: { “group_id”: “ConsumerGroup1”, “topics”: [ “topic1”: [ { “partition”: 0, “offset”: 0 }, { “partition”: 1, “offset”: 0 } ], “topic2”: [ { “partition”: 0, “offset”: 0 }, { “partition”: 1, “offset”: 0 } ] ] }, “version”: 1 }
Compatibility, Deprecation, and Migration Plan
This KIP won’t require a Migration Plan.
‘reset-to-datetime’ and ‘reset-to-period’ will require Timestamp Index, which will make them only available from version 0.10.1.0.
Nevertheless, it is possible to achieve the same result since release 0.10.0 using timestamp metadata by record, doing a linear search starting from latest. However, we are not considering to support this option because it will be an expensive/slow option.
All the other options will be available for releases from 0.10.0.0 given that KIP-97 is implemented.
Test Plan
- A unit test to validate that --reset-offset is executed independently from other Consumer Group options (list, delete, describe, etc.)
- A unit test to validate that only one scenario is specified
- A unit test to validate that only one scope is specified
- A unit test to validate that only one execution option is specified
- A unit test by combination of scenario, scope and execution option.
- A unit test to validate that when calculated offset by partition is older that the earliest offset, tool resets offset to earliest (e.g. when we specified --reset-to 0, when earliest offset is 10)
- A unit test to validate that when calculated offset by partition is bigger that the latest offset, tool resets offset to latest (e.g. when we specified --reset-to 0, when earliest offset is 10)
Rejected Alternatives
None
(no scenario arguments)