...
Proposed Changes
We are considering to add the options in ConsumerGroupCommand to reset offsets.
- Plan: Print out the result of the operation (i.e. show a list of topic, partition, current offset, new offset to reset if the operation is executed). This will be the default option.
- Execute: Users will have to explicitly ask to execute the reset operation depending on the scenario and the scope specified.
- Export: Export plan to a CSV file to execute it later.
All these options are considered to be executed affecting only one Consumer Group.
The following scenarios will be supported:
ID | Scenario | Description |
---|---|---|
1. | Reset to Datetime | When we want to reset offsets to an specific point in time. (e.g. to 1/1/2017 at 00:00 to reprocess all records from these year) |
2. | Reset from Duration | When we want to go back some period ago (e.g. P7D to reprocess all records from one week ago) |
3. | Reset to Earliest | When we want to reprocess all the records available by partition. |
4. | Reset to Latest | When a Consumer Groups has an offset lag and we don't want to process missing records, only move to the latest. |
5. | Reset to Current Time | When we want to only print out and/or backup the current offset by partition. |
6. | Reset to Offset | When we want to move to an specific offset. |
7. | Shift Offset by 'n' | When we want to move forward or backward from the current position. 'n' can be a positive or negative value that will be add it to the current offset to move to a new position. |
8. | Reset from file | When we have a file with the required offsets by topic/partition to reset to. |
And the following scopes:
- All Topics consumed by Consumer Group: This scope will consider all the topics that has been consumed by a Consumer Group.
- Specific List of Topics: This scope will consider all the topics defined by user.
- One Topic, All Partitions: This scope will consider only one topic and all partitions.
- One Topic, Specific Partition: This scope will consider only one topic and partition specified by user.
You can check the concept-proof implementation of this feature on this branch.
...
This option should be executed independently from other Consumer Group options (list, describe, delete, etc.)
...
Required Arguments
ID | Argument | Type | Descriptio |
---|---|---|---|
1. | --group | Required | Consumer Group ID. |
...
ID | Scenario | Arguments | Considerations | Example |
---|---|---|---|---|
1. | Reset to Timestamp | --to-datetime YYYY-MM-DDTHH:mm:SS.sss | This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets. It will use the datetime specified plus the default timezone where the client is running (i.e. ZoneId#systemDefault) | Reset to first offset since 01 January 2017, 00:00:00 hrs --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00 |
2. | Reset to Duration | --by-duration PnDTnHnMnS | This option will subtract the duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects. | Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D |
3. | Reset to Earliest | --to-earliest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToBeginning` | Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest |
4. | Reset to Latest | --to-latest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToEnd` | Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest |
5. | Reset to Current Position | (no scenario arguments) | This option won't reset the offset. It will be used to print and export current offset. | Reset to current position: --reset-offsets --group test.group --topic foo |
6. | Reset to Offset | --to-offset | This option will reset offsets to an specific value. | Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1 |
7. | Shift Offset by 'n' | --shift-by n | This option will add the `n` value to the current offset, and reset to the result. `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive. If current offset + n is higher than the latest offset, new offset will be set to latest. If current offset + n is lower than the earliest offset, new offset will be set to earliest. | Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5 |
8 | Reset from File | --reset-from-file PATH_TO_FILE | This option will take a Reset Plan CSV file with the offsets to reset by topics/partitions. It does not require scope, because topics and partitions are defined in the file. | Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv |
Scopes
The scopes will be defined extending the existing `topic` argument, using the following format:
...
ID | Scope | Arguments | Description | Example |
---|---|---|---|---|
1. | All topics | --all-topics | In this case the tool will run the scenario considering all topics that has been consumed by Consumer Group. It will consider all partitions by topic assigned. | Reset offsets in all topics and partitions: --reset-offsets --group test.group --all-topics –to-earliest |
2. | List of Topics | --topic topic1 --topic topic2 | In this case, the tool first will validate that input topics are been consumed by Consumer Group, and then run the scenario. It will consider all partitions by topic defined. | Reset offsets on all partitions of a list of topics: --reset-offsets --group test.group --topic foo --topic bar –to-earliest |
3. | Topics and List of Partitions | --topic topic1:0,1,2 --topic topic2:0 | In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider only the partitions specified by topic. | Reset offsets on specific partitions by topics: --reset-offsets --group test.group --topic foo:1,2 --topic bar:0,1,2,3 –to-earliest |
4. | Topic | --topic topic1 | In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider all partitions. | Reset offsets on a specifc topic, all partitions; --reset-offsets --group test.group --topic foo --to-earliest |
Execution Options
ID | Option | Arguments | Description | Examples |
---|---|---|---|---|
1. | Plan | (no execution arguments) | This execution option will only print out the result of the scenario by scope. The output will look like this: TOPIC PARTITION CURRENT-OFFSET NEW-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID | Prints result of resetting all topics and partitions to earliest: --reset-offsets --group test.group --all-topics –to-earliest |
2. | Execute | --execute | This execution option will run the reset offset process based on scenario and scope. | Prints and execute resetting all topics and partitions to earliest: --reset-offsets --group test.group --all-topics –to-earliest --execute |
3. | Export | --export | This execution option will print out the reset plan in CSV format, that later could be used in the scenario 8. (i.e. as backup) | Prints plan for resetting all topics and partitions to earliest in CSV format: --reset-offsets --group test.group --all-topics –to-earliest --export Execute and Prints plan for resetting all topics and partitions to earliest in CSV format: --reset-offsets --group test.group --all-topics –to-earliest --export --execute |
Reset Plan File: CSV Format
...