Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Proposed Changes

We are considering to add the options in ConsumerGroupCommand to reset offsets to support the following scenarios:

Reset by time:

.

There will be 3 execution options available:
  1. Plan: Print out the result of the operation (i.e. show a list of topic, partition, current offset, new offset to reset if the operation is executed). This will be the default option.
  2. Execute: Users will have to explicitly ask to execute the reset operation depending on the scenario and the scope specified.
  3. Export: Export plan to a CSV file to execute it later.

 

The following scenarios will be supported:

IDScenarioDescription
1.Reset to Datetime
:
When we want to reset offsets to an specific point in time. (e.g. to 1/1/2017 at 00:00 to reprocess all records from these year)
2.Reset
to Period:
from DurationWhen we want to go back some period ago (e.g. P7D to reprocess all records from one week ago)
3.Reset to Earliest
:
When we want to reprocess all the records available by partition.
4.Reset to Latest
:
When a Consumer Groups has an offset lag and we don't want to process missing records, only move to the latest.
5.Reset to Current Time
:
When we want to only print out and/or backup the current offset by partition.
Reset by position:
6.Reset to Offset
:
When we want to move to an specific offset.
7.
Reset to Current
Shift Offset
Minus
by 'n'
offsets:
When we want to
reprocess an specific number of records by partition (e.g. `minus 5` will reprocess the 5 prior records
move forward or backward from the current
Consumer Group offset by partition).
  • Reset to Current Offset Plus 'n' offsets: When we want to avoid process an specific number of records after the current Consumer Group offset by partition (e.g. `plus 5` will move the Current Group offset 5 position from the current.)
  • position. 'n' can be a positive or negative value that will be add it to the current offset to move to a new position.
    8.Reset from fileWhen we have a file with the required offsets by topic/partition to reset to.

     

    And

  • Reset by file
    1. From exported plan:
  •  

    This operations will be executed by Consumer Group and will have the following scopes:

    1. All Topics consumed by Consumer Group: This scope will consider all the topics that has been consumed by a Consumer Group.
    2. Specific List of Topics: This scope will consider all the topics defined by user.
    3. One Topic, All Partitions: This scope will consider only one topic and all partitions.
    4. One Topic, Specific Partition: This scope will consider only one topic and partition specified by user.

     

    And there will be 3 execution options: 
  • Plan: Print the result of the operation (i.e. show probable offsets to reset if the operation is executed) (Default)
  • Execute: Users will have to explicitly ask to execute the reset tool.
  • Export: Export plan to a JSON file to execute it later

    You can check the concept-proof implementation of this feature on this branch.

    Public Interfaces

    Consumer Group Reset Offset options

    Main option

    --reset-offsetoffsets

    This option should be executed independently from other Consumer Group options (list, describe, delete, etc.)

    ...

    IDArgumentTypeDescriptio
    1.--groupRequiredConsumer Group ID.

    Scenarios

    At least one of the scenarios should be defined to proceed with the execution

    IDScenarioArgumentsDescriptionConsiderations
    1.a.Reset to Timestamp--reset-to-datetime YYYY-MM-DDTHH:mm:SS.sssThis option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets. It will use the datetime specified plus the default timezone where the client is running (i.e. ZoneId#systemDefault)
    21.b.Reset to PeriodDuration--reset-to-periodduration  PnYnMnDTnHnMnSThis option will subtract the period duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects.
    31.c.Reset to Earliest--reset-to-earliestThis option will reset offsets to the earliest using Kafka Consumer's `auto.offset.reset` to `earliest``#seekToBeginning`
    41.d.Reset to Latest--reset-to-latestThis option will reset offsets to the latest earliest using Kafka Consumer's `auto.offset.reset` to `latest``#seekToEnd`
    51.e.Reset to Current Position(no scenario arguments)This option won't reset the offset. It will be used to print and export current offset.
    2.a6.Reset to Offset--resetto-tooffsetThis option will reset offsets to an specific value.
    2.b7.Reset to Current Shift Offset Minus by 'n' offsets--resetshift-offset-minusby n

    This option will

    subtract

    add the `n` value to the current offset, and reset to the result.

    2.c.Reset to Current Offset Plus 'n' offsets--reset-plus nThis option will add the `n` value to the current offset, and reset to the result

    `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive.

    If current offset + n is higher than the latest offset, new offset will be set to latest.

    If current offset + n is lower than the earliest offset, new offset will be set to earliest.

    83.aReset from File--reset-from-file PATH_TO_FILEThis option will take a Reset Plan CSV file with the offsets to reset by topics/partitions.

    Scopes

    At least one of the scopes should be defined to proceed with the execution

    IDScopeArgumentsDescription
    1.All topics(no scope arguments)--all-topicsIn this case the tool will run the scenario considering all topics that has been consumed by Consumer Group. It will consider all partitions
    2.List of Topics--topics topic1,topic2...In this case, the tool first will validate that input topics are been consumed by Consumer Group, and then run the scenario. It will consider all partitions
    3.Topic--topic topic1In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider all partitions
    4.Topic and List of Partitions--topic topic1 --partitions 0,1,2In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider only the partitions specified.

    ...

    IDOptionArgumentsDescription
    1.Plan(no execution arguments)

    This execution option will only print out the result of the scenario by scope.

    2.Execute--executeThis execution option will run the reset offset process based on scenario and scope.
    3.Export--export path_to_fileThis execution option will export the plan to a file, that later could be used to reset the offsets. (i.e. as backup)

     

    Reset Plan File: JSON Format

     

    ...

    languagejs

    ...

    The output will look like this:

    TOPIC                 PARTITION CURRENT-OFFSET NEW-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
    record-metadata

    ...

     0

    ...

      

    ...

     

    ...

        

    ...

      100        

    ...

     

    ...

      

    ...

     0

    ...

         

    ...

         100 

    ...

      

    ...

     

    ...

            

    ...

    0   -       

    ...

     

    ...

      

    ...

     

    ...

    -    

    ...

    -
    2.Execute--executeThis execution option will run the reset offset process based on scenario and scope.
    3.Export--exportThis execution option will print out the reset plan in CSV format, that later could be used in the scenario 8. (i.e. as backup)

     

    Reset Plan File: CSV Format

    The CSV schema will consist on:

    <topic>,<partition>,<offset>

     

    Sample:

    topic-1,0,100
    topic-1,1,200
    topic-1,2,0
    topic-2,0,0

    ...


    Compatibility, Deprecation, and Migration Plan

    ...

    All the other options will be available for releases from 0.10.0.0 given that KIP-97 is implemented.

     Test Plan

    • A unit test to validate that --reset-offset is executed independently from other Consumer Group options (list, delete, describe, etc.)
    • A unit test to validate that only one scenario is specified
    • A unit test to validate that only one scope is specified
    • A unit test to validate that only one execution option is specified
    • A unit test by combination of scenario, scope and execution option.
    • A unit test to validate that when calculated offset by partition is older that the earliest offset, tool resets offset to earliest (e.g. when we specified --reset-to 0, when earliest offset is 10)
    • A unit test to validate that when calculated offset by partition is bigger that the latest offset, tool resets offset to latest (e.g. when we specified --reset-to 0, when earliest offset is 10)

    ...