Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]4743 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP wants to introduce a command-line tool to enable users to reset Consumer Group offsets.

...

* Some messages with earlier timestamp might also be consumed."

 

Proposed Changes

 

This process will be separated in 3 steps:

 
  • Generate Reset Plan file with specific offsets by partition to reset, for a specific Consumer Group

  • Verify that the Reset Plan file is consistent and the current state of Consumer Group offsets

  • Execute the Reset Plan file, resetting offsets to the values specified in it.

 

 

Also, we will consider an option to generate and execute in one step, for users that don't need to validate the Reset Plan file.

 


 

This options will enable clients to move back and forward, depending on where they are currently in the topic.

 

For instance: if a topic 'T1' with 1 partition and 100 records (offsets 0-99) has 2 Consumer Groups: one Consumer Group 'C1' inactive, with current offset 20, and other Consumer Group 'C2' that has been actively consuming records from T1. If we reset to a point in time where offset is 80, clients will be moving forward. But if C2 is reset to the same point in time, it will be moving backward.

 


Public Interfaces

Consumer Group Reset Offset Tool

...

Additionally, this tool with support an additional operation to generate and execute in the one step:

  • --generate-and-execute

Generate Reset Plan file

To reset Consumer Group offsets first we will need a Reset Plan file with the specific offsets by partitions to reset to. This file could be generated in 3 ways:

...

  • --group: defines the Consumer Group ID (i.e. ´group.id´). Required.

  • --topics: comma-separated list of topics. Optional, if it’s not specified it will use all topics consumed by Consumer Group.

  • --output-file: relative/absolute path to an output file that will be populated with JSON file generated. Optional, if it’s not specified, JSON result will be printed in the terminal.

 

Reset Plan File: JSON Format

 

Code Block
languagejs
{
	“offsets”: {
		“group_id”: “ConsumerGroup1”,
		“topics”: [
		“topic1”: [
{
“partition”: 0,
		“offset”: 0
	},
		{
“partition”: 1,
		“offset”: 0
	}
],
		“topic2”: [
{
“partition”: 0,
		“offset”: 0
	},
		{
“partition”: 1,
		“offset”: 0
	}
],
		]
},
“version”: 1
}

 


Reset to Timestamp

This option will support a DateTime format (https://www.w3.org/TR/xmlschema-2/#dateTime) as input. This value will be transformed to epoch milliseconds and then used to get offsets by partition.
 

...

bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-to-timestamp 2017-01-01T09:00:00.000 --output kafka-consumer-group-reset.json

´´´

Reset From Period

This options will support Duration format (https://www.w3.org/TR/xmlschema-2/#duration) as input. This value will be subtracted from current timestamp from client’s machine to obtain the epoch milliseconds to be used to get offsets by partition.

...

bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-from-period P1M --output kafka-consumer-group-reset.json

´´´

Default

This option will generate a file with the current Consumer Groups offsets by partition. The result of this execution will generate a JSON file that could be used as a template to modify the offsets manually for all partitions.

...

bin/kafka-consumer-group-reset.sh --generate  --group ConsumerGroup1 --topics topic1 --output kafka-consumer-group-reset.json

´´´

Verify JSON file

Once JSON file is generated, the tool will offer an option to validate it with the current state of the cluster. The tool will validate the following:

...

bin/kafka-consumer-group-reset.sh --verify --reset-json-file kafka-consumer-group-reset.json

´´´

Execute JSON file

Once user feels comfortable with the Reset Plan file, could prepare the environment to execute the file.

...

Finally user can use the ´verify´ option to validate that current and defined offset are the same.

 

Compatibility, Deprecation, and Migration Plan

 

This KIP won’t require a Migration Plan.

...

All the other options will be available for releases from 0.10.0.0 given that KIP-97 is implemented.

 Test Plan

 

  • A unit test to validate correct command options by operation (generate, verify, execute)

  • A unit test to validate the functionality for each operation:

    • Generate:

      • Validate that topics consumed are obtained when no ´--topics´ argument is specified

      • Validate that current offsets by partition are obtained when default option is executed

      • Validate that an offset is obtained when a datetime is specified

      • Validate that oldest offset is obtained when a datetime is older than the oldest timestamp by partition

      • Validate that latest offset is obtained when a datetime is higher than the latest timestamp by partition

      • Validate that an offset is obtained when a period is specified

      • Validate that oldest offset is obtained when a period minus current timestamp is older than the oldest timestamp by partition

      • Validate that latest offset is obtained when a period minus current timestamp is higher than the latest timestamp by partition

    • Verify:

      • Validate that JSON structure is correct

      • Validate that topics exist

      • Validate that partitions exist

      • Validate that topics are actually consumed by Consumer Group

      • Warn that topic does not exist and nothing will be executed

      • Warn that topic is not been consumed by Consumer Group and nothing will be executed

      • Warn that partition does not exist and nothing will be executed.

      • Warn that specified offset is out of range (ie. from oldest to latest) and it will be reset to oldest or latest depending on which one is closer.

    • Execute:

      • Validate that ConsumerGroup is inactive.

      • Validate that offsets are updated in cluster after execution.

      • Warn that non-existing/not-consumed topic has not been updated

      • Warn that non-existing partitions has not been updated

      • Warn that offsets out of bound has been updated to oldest or latest, depending which one is closer.

Rejected Alternatives

None


1. Reset to specific offset: Move Consumer Group to specific offset by partition.

...