Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP is aimed to extend current Consumer Group Reset Offset tooling (implemented by KIP-122[1]) for Kafka Streams applications.

There is an existing tooling that is specific for Streams Applications "kafkaCurrently `kafka-streams-application-reset" but it only supports Streams, overlapping with Reset Offset tooling, and also only support reset to earliest offset in input topicsreset` only supports returning to the earliest offset on input topics. But `kafka-consumer-groups` `reset-offsets` option support more options detailed in KIP-122.

This KIP is considering include current `reset-offsets` options on `kafka-streams-application-reset` to have more options over input-topics offsets.

Also, this KIP is considering removing Zookeeper dependency on `kafka-streams-application-reset` and use AdminClient tool instead.

Public Interfaces

"kafka-streams-application-reset" supports the current features[2]:

  1. for any specified input topic, it resets all offsets to zero
  2. for any specified intermediate topic, seeks to the end for all partitions
  3. for all internal topic
    3.1. resets all offsets to zero
    3.2. deletes the topic

This new functionality will add the same features to Consumer Group's reset-offset command:

  1. Input topics will be part of regular `--topics` option.
  2. There will be a new `--intermediate-topics` option to specify intermediate topics.
  3. Internal topics will be reset to zero.

Application ID input will be specified as `--group` option.

Proposed Changes

1. New Intermediate Topics option

A new `--intermediate-topics` will be available to specify Streams intermediate topics, where Consumer Group offsets will be moved to the latest offset.

2. Internal Changes

  • Group ID `--group` will be also considered as Application ID to search internal topics.

  • Internal Topics will be search and removed Consumer Group offsets will be moved to earliest offset.

This KIP is considering adding these options:

Scenarios

At least one of the scenarios could be defined for `input-topics` to proceed with the execution

1.Reset to Datetime

--to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm

--to-datetime YYYY-MM-DDTHH:mm:SS.sssZ

--to-datetime YYYY-MM-DDTHH:mm:SS.sss

Datetime must be specified in ISO8601 format.

This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets.

If the Timezone is not specified, it will use UTC.

Reset to first offset since 01 January 2017, 00:00:00 hrs

--application-id app1 --input-topics foo --to-datetime 2017-01-01T00:00:00Z

2.Reset by Duration--by-duration  PnDTnHnMnS

Duration must be specified in ISO8601 format.

This option will subtract the duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects.

Reset to first offset since one week ago (from current timestamp):

--application-id app1 --input-topics foo,bar --by-duration P7D

3.Reset to Earliest--to-earliestThis option will reset offsets to the earliest using Kafka Consumer's `#seekToBeginning`. This scenario will be defined by default if none scenario is specified.

Reset to earliest offset available:

--application-id app1 --input-topics foo,bar --to-earliest

4.Reset to Latest--to-latestThis option will reset offsets to the earliest using Kafka Consumer's `#seekToEnd`

Reset to latest offset available:

--application-id app1 --input-topics foo,bar --to-latest

5.Reset to Offset--to-offsetThis option will reset offsets to an specific value.

Reset to offset 1 in all partitions:

--application-id app1 --input-topics foo,bar --to-offset 1

6.Shift Offset by 'n'--shift-by n

This option will add the `n` value to the current offset, and reset to the result. `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive.

If current offset + n is higher than the latest offset, new offset will be set to latest.

If current offset + n is lower than the earliest offset, new offset will be set to earliest.

Reset to current offset plus 5 positions:

--application-id app1 --input-topics foo,bar --shift-by 5

Execution Options

1.Plan(no execution arguments)

This execution option will only print out the result of the scenario by scope. (i.e. dry-run)

The output will look like this:

TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
input-topic 0 90 10 100 - - -

Prints result of resetting all topics and partitions to earliest:

--application-id app1 --input-topics foo,bar --to-earliest

2.Execute--executeThis execution option will run the reset offset process based on scenario and scope.

Prints and execute resetting all topics and partitions to earliest:

--application-id app1 --input-topics foo,bar --to-earliest --execute

 

Proposed Changes

1. Add Scenarios option described above to `kafka-streams-application-reset`.

2. Use `kafka-consumer-groups` `reset-offsets` tool in the background to reset offset of input topics (including all partitions) on `kafka-streams-application-reset` execution.

2. Change `kafka-streams-application-reset` `dry-run` option for Execution Options described above.

3. Remove `kafka-streams-application-reset` `zookeeper` option, removing Zookeeper dependency.

Compatibility, Deprecation, and Migration Plan

This KIP won’t require a Migration Plan.  `kafka-streams-application-reset` command will be deprecated.

Rejected Alternatives

Current `kafka-streams-application-reset` as mentioned in the Motivation supports this functionality partially, but it will be deprecated.

We will keep both tools `kafka-streams-application-reset` and `reset-offsets`, reusing `reset-offsets` on the background when `kafka-streams-application-reset` is executed.


[1] KIP-122: Add Reset Consumer Group Offsets tooling

[2] https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

--application-id app1 --input-topics foo