...
This KIP is considering include current `reset-offsets` scenarios on `kafka-streams-application-reset` to have more options over input-topics offsets.
Public Interfaces
"kafka-streams-application-reset" supports the current features[2]:
...
Only one scenario should be defined for `input-topics` to proceed with the execution. If not,`to-earliest` will be used by default.
1. | Reset to Datetime | --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm --to-datetime YYYY-MM-DDTHH:mm:SS.sssZ --to-datetime YYYY-MM-DDTHH:mm:SS.sss | Datetime must be specified in ISO8601 format. This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets. If the Timezone is not specified, it will use UTC. | Reset to first offset since 01 January 2017, 00:00:00 hrs --application-id app1 --input-topics foo --to-datetime 2017-01-01T00:00: |
00.000Z | ||||
2. | Reset by Duration | --by-duration PnDTnHnMnS | Duration must be specified in ISO8601 format. This option will subtract the duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects. | Reset to first offset since one week ago (from current timestamp): --application-id app1 --input-topics foo,bar --by-duration P7D |
3. | Reset to Earliest (DEFAULT) | --to-earliest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToBeginning`. This scenario will be defined by default if none scenario is specified. | Reset to earliest offset available: --application-id app1 --input-topics foo,bar --to-earliest |
4. | Reset to Latest | --to-latest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToEnd` | Reset to latest offset available: --application-id app1 --input-topics foo,bar --to-latest |
5. | Reset to Offset | --to-offset | This option will reset offsets to an specific value. | Reset to offset 1 in all partitions: --application-id app1 --input-topics foo,bar --to-offset 1 |
6. | Shift Offset by 'n' | --shift-by n | This option will add the `n` value to the current offset, and reset to the result. `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive. If current offset + n is higher than the latest offset, new offset will be set to latest. If current offset + n is lower than the earliest offset, new offset will be set to earliest. | Reset to current offset plus 5 positions: --application-id app1 --input-topics foo,bar --shift-by 5 |
7. | Shift from file | --from-file PATH_TO_FILE | This option will take a Reset Plan CSV file with the offsets to reset by topics/partitions. It does not require scope, because topics and partitions are defined in the file. A validation will be done to the topics included in the file: it cannot be also included on the `Intermediate` and `Internal` topics managed by the tool. | Reset using a file with reset plan: --application-id app1 --from-file reset-plan.csv |
Execution Options
1. | Execute | (no execution arguments) | This execution option will run the reset offset process based on scenario and scope. | Prints and execute resetting all topics and partitions to earliest: --application-id app1 --input-topics foo,bar --to-earliest |
2. | Dry-Run | --dry-run | This execution option will only print out the result of the scenario by scope. (i.e. dry-run) The output will look like this: INPUT TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET INTERMEDIATE TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET INTERNAL TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET | Prints result of resetting all topics and partitions to earliest: --application-id app1 --input-topics foo,bar --to-earliest --dry-run |
Proposed Changes
1. Implement reset-offset Scenarios option described above on `kafka-streams-application-reset` using `KafkaConsumer`.
...
[2] https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/