Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated the --to-datetime example to have a valid format

...

This KIP is considering include current `reset-offsets` scenarios on `kafka-streams-application-reset` to have more options over input-topics offsets.

 


Public Interfaces

"kafka-streams-application-reset" supports the current features[2]:

...

Only one scenario should be defined for `input-topics` to proceed with the execution. If not,`to-earliest` will be used by default.

1.Reset to Datetime

--to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm

--to-datetime YYYY-MM-DDTHH:mm:SS.sssZ

--to-datetime YYYY-MM-DDTHH:mm:SS.sss

Datetime must be specified in ISO8601 format.

This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets.

If the Timezone is not specified, it will use UTC.

Reset to first offset since 01 January 2017, 00:00:00 hrs

--application-id app1 --input-topics foo --to-datetime 2017-01-01T00:00:

00Z

00.000Z

2.Reset by Duration--by-duration  PnDTnHnMnS

Duration must be specified in ISO8601 format.

This option will subtract the duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects.

Reset to first offset since one week ago (from current timestamp):

--application-id app1 --input-topics foo,bar --by-duration P7D

3.Reset to Earliest (DEFAULT)--to-earliestThis option will reset offsets to the earliest using Kafka Consumer's `#seekToBeginning`. This scenario will be defined by default if none scenario is specified.

Reset to earliest offset available:

--application-id app1 --input-topics foo,bar --to-earliest

4.Reset to Latest--to-latestThis option will reset offsets to the earliest using Kafka Consumer's `#seekToEnd`

Reset to latest offset available:

--application-id app1 --input-topics foo,bar --to-latest

5.Reset to Offset--to-offsetThis option will reset offsets to an specific value.

Reset to offset 1 in all partitions:

--application-id app1 --input-topics foo,bar --to-offset 1

6.Shift Offset by 'n'--shift-by n

This option will add the `n` value to the current offset, and reset to the result. `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive.

If current offset + n is higher than the latest offset, new offset will be set to latest.

If current offset + n is lower than the earliest offset, new offset will be set to earliest.

Reset to current offset plus 5 positions:

--application-id app1 --input-topics foo,bar --shift-by 5

7.Shift from file--from-file PATH_TO_FILE

This option will take a Reset Plan CSV file with the offsets to reset by topics/partitions. It does not require scope, because topics and partitions are defined in the file.

A validation will be done to the topics included in the file: it cannot be also included on the `Intermediate` and `Internal` topics managed by the tool.

Reset using a file with reset plan:

--application-id app1 --from-file reset-plan.csv

Execution Options

1.Execute(no execution arguments)

This execution option will run the reset offset process based on scenario and scope.

Prints and execute resetting all topics and partitions to earliest:

--application-id app1 --input-topics foo,bar --to-earliest

2.Dry-Run--dry-run

This execution option will only print out the result of the scenario by scope. (i.e. dry-run)

The output will look like this:

INPUT
TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET 
input-topic 0 90 10 100
INTERMEDIATE
TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET 
intermediate-topic 0 0 100 100
INTERNAL
TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET 
internal-topic 0 0 0 0

Prints result of resetting all topics and partitions to earliest:

--application-id app1 --input-topics foo,bar --to-earliest --dry-run

 


Proposed Changes

1. Implement reset-offset Scenarios option described above on `kafka-streams-application-reset` using `KafkaConsumer`.

...

[2] https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/