Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: here 

JIRA: KAFKA-4743  

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4743

Released: 0.11.0.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This option should be executed independently from other Consumer Group options (list, describe, delete, etc.) and it will only support new consumers.

Required Arguments

IDArgumentTypeDescriptio
1.--groupRequiredConsumer Group ID.
2.--bootstrap-serverRequiredServer to connect to.

Scenarios

At least one of the scenarios should be defined to proceed with the execution

IDScenarioArgumentsConsiderationsExample
1.Reset to Datetime

--to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm

--to-datetime YYYY-MM-DDTHH:mm:SS.sssZ

1.Reset to Timestamp

--to-datetime YYYY-MM-DDTHH:mm:SS.sss

Datetime must be specified in ISO8601 format.

This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets.

It will use the datetime specified plus the default timezone where the client is running (i.e. ZoneId#systemDefault)

If the Timezone is not specified, it will use UTC.

Reset to first offset since 01 January 2017, 00:00:00 hrs

--reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:0000Z

2.Reset by Duration--by-duration  PnDTnHnMnS

Duration must be specified in ISO8601 format.

This option will subtract the duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects.

Reset to first offset since one week ago (from current timestamp):

--reset-offsets --group test.group --topic foo --by-duration P7D

3.Reset to Earliest--to-earliestThis option will reset offsets to the earliest using Kafka Consumer's `#seekToBeginning`

Reset to earliest offset available:

--reset-offsets --group test.group --topic foo --to-earliest

4.Reset to Latest--to-latestThis option will reset offsets to the earliest using Kafka Consumer's `#seekToEnd`

Reset to latest offset available:

--reset-offsets --group test.group --topic foo --to-latest

5.Reset to Current Position(no scenario arguments)This option won't reset the offset. It will be used to print and export current offset.

Reset to current position:

--reset-offsets --group test.group --topic foo

6.Reset to Offset--to-offsetThis option will reset offsets to an specific value.

Reset to offset 1 in all partitions:

--reset-offsets --group test.group --topic foo --to-offset 1

7.Shift Offset by 'n'--shift-by n

This option will add the `n` value to the current offset, and reset to the result. `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive.

If current offset + n is higher than the latest offset, new offset will be set to latest.

If current offset + n is lower than the earliest offset, new offset will be set to earliest.

Reset to current offset plus 5 positions:

--reset-offsets --group test.group –topic foo --shift-by 5

8Reset from File--from-file PATH_TO_FILEThis option will take a Reset Plan CSV file with the offsets to reset by topics/partitions. It does not require scope, because topics and partitions are defined in the file.

Reset using a file with reset plan:

--reset-offsets --group test.group --from-file reset-plan.csv

...

IDOptionArgumentsDescriptionExamples
1.Plan(no execution arguments)

This execution option will only print out the result of the scenario by scope.

The output will look like this:

TOPIC                 PARTITION CURRENT-OFFSET CURRENT-LAG NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
foo 0 100 0 90 10 100 - - -

Prints result of resetting all topics and partitions to earliest:

--reset-offsets --group test.group --all-topics –to-earliest

2.Execute--executeThis execution option will run the reset offset process based on scenario and scope.

Prints and execute resetting all topics and partitions to earliest:

--reset-offsets --group test.group --all-topics –to-earliest --execute

3.Export--exportThis execution option will print out the reset plan in CSV format, that later could be used in the scenario 8. (i.e. as backup)

Prints plan for resetting all topics and partitions to earliest in CSV format:

--reset-offsets --group test.group --all-topics –to-earliest --export

Execute and Prints plan for resetting all topics and partitions to earliest in CSV format:

--reset-offsets --group test.group --all-topics –to-earliest --export --execute

...

Nevertheless, it is possible to achieve the same result since release 0.10.0 using timestamp metadata by record, doing a linear search starting from latest. However, we are not considering to support this option because it will be an expensive/slow option.

All the other options will be available for releases from 0.10.0.0 given that KIP-97 is implemented.

This KIP will not support old consumers that store offsets in Zookeeper.

Test Plan

  • A unit test to validate that --reset-offset is executed independently from other Consumer Group options (list, delete, describe, etc.)
  • A unit test to validate that only one scenario is specified
  • A unit test to validate that only one scope is specified
  • A unit test to validate that only one execution option is specified
  • A unit test by combination of scenario, scope and execution option.
  • A unit test to validate that when calculated offset by partition is older that the earliest offset, tool resets offset to earliest (e.g. when we specified --reset-to 0, when earliest offset is 10)
  • A unit test to validate that when calculated offset by partition is bigger that the latest offset, tool resets offset to latest (e.g. when we specified --reset-to 0, when earliest offset is 10)

...