You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-4743 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP wants to introduce a command-line tool to enable users to reset Consumer Group offsets.

Today, if users want to reprocess old records (i.e. records where offset is smaller than current consumer offset), they will need to modify client-side code (eg. using "KafkaConsumer#seek()" operations).

This process will involve change client-side code, redeploy the application, and eventually rollback changes in application to consume records as usual. This is a cumbersome process given that users have to be aware of how many partitions are by topic, in which offset they are, and to which offset they want to move to.

Bringing the option to reset offsets from outside of the application will enable a cleaner and consistent way to achieve reprocessing and move along the topic.

Currently, there are 2 indexes that allow users to reprocess records: offset index and timestamp index.

Offset index represent a pointer over the sequential id that each record has by partition, available from Kafka inception.

Timestamp index (added in release 0.10.1.0 by KIP-33) represents a pointer over the epoch milliseconds from record creation, that could be defined by the Producer or the Broker, depending in the Timestamp Type.

From KIP-33:

"Searching by timestamp will have better accuracy. The guarantees provided are:

* The messages whose timestamp are after the searched timestamp will be consumed.

* Some messages with earlier timestamp might also be consumed."

 

Proposed Changes

 

This process will be separated in 3 steps:

 
  • Generate Reset Plan file with specific offsets by partition to reset, for a specific Consumer Group

  • Verify that the Reset Plan file is consistent and the current state of Consumer Group offsets

  • Execute the Reset Plan file, resetting offsets to the values specified in it.

 

 

Also, we will consider an option to generate and execute in one step, for users that don't need to validate the Reset Plan file.

 


 

This options will enable clients to move back and forward, depending on where they are currently in the topic.

 

For instance: if a topic 'T1' with 1 partition and 100 records (offsets 0-99) has 2 Consumer Groups: one Consumer Group 'C1' inactive, with current offset 20, and other Consumer Group 'C2' that has been actively consuming records from T1. If we reset to a point in time where offset is 80, clients will be moving forward. But if C2 is reset to the same point in time, it will be moving backward.

 


Public Interfaces

Consumer Group Reset Offset Tool

This tool will offer 3 main operations:

  • --generate: This operation will generate a Reset Consumer Group Offset Reset Plan file.

  • --verify: This operation will validate that Consumer Group Offsets values defined in the Reset Plan file are valid in the cluster.

  • --execute: This operation will reset Consumer Group Offsets using values defined in the Reset Plan file.

 

Additionally, this tool with support an additional operation to generate and execute in the one step:

  • --generate-and-execute

Generate Reset Plan file

To reset Consumer Group offsets first we will need a Reset Plan file with the specific offsets by partitions to reset to. This file could be generated in 3 ways:

  • --reset-to-datetime YYYY-MM-DDTHH:mm:SS.sss

  • --reset-from-period PnYnMnDTnHnMnS

  • Default

 

This process will require 3 arguments:

  • --group: defines the Consumer Group ID (i.e. ´group.id´). Required.

  • --topics: comma-separated list of topics. Optional, if it’s not specified it will use all topics consumed by Consumer Group.

  • --output-file: relative/absolute path to an output file that will be populated with JSON file generated. Optional, if it’s not specified, JSON result will be printed in the terminal.

 

Reset Plan File: JSON Format

 

{
	“offsets”: {
		“group_id”: “ConsumerGroup1”,
		“topics”: [
		“topic1”: [
{
“partition”: 0,
		“offset”: 0
	},
		{
“partition”: 1,
		“offset”: 0
	}
],
		“topic2”: [
{
“partition”: 0,
		“offset”: 0
	},
		{
“partition”: 1,
		“offset”: 0
	}
],
		]
},
“version”: 1
}

 


Reset to Timestamp

This option will support a DateTime format (https://www.w3.org/TR/xmlschema-2/#dateTime) as input. This value will be transformed to epoch milliseconds and then used to get offsets by partition.
 


 

Pros:

  • This options will be the most precise using timestamp, as you move to a specific point in time.

Cons:

  • This option will require users to know exactly where they want to move.

 

Command:

´´´

bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-to-timestamp 2017-01-01T09:00:00.000 --output kafka-consumer-group-reset.json

´´´

Reset From Period

This options will support Duration format (https://www.w3.org/TR/xmlschema-2/#duration) as input. This value will be subtracted from current timestamp from client’s machine to obtain the epoch milliseconds to be used to get offsets by partition.

 

Pros:

  • This option will be the easiest to use, as it does not require to know a specific timestamp, but give a point in time depending on when you execute the command.

  • Useful for debugging/development purposes where you just need to reprocess a bunch of records from an previous period.

 

Cons:

  • This option will be less accurate than previous one.

 

Command:

´´´

bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-from-period P1M --output kafka-consumer-group-reset.json

´´´

Default

This option will generate a file with the current Consumer Groups offsets by partition. The result of this execution will generate a JSON file that could be used as a template to modify the offsets manually for all partitions.

This option will be useful for clusters prior to release 0.10.0.0, to create an initial file where they can specify specific offsets later. And also could be use as a fixed point in time before starting the application and later reset to this point.

 

Pros:

  • Easiest way to generate a JSON file with the right format.

 

Cons:

  • Could be cumbersome to users to modify specific offsets by partition.

 

Command:

´´´

bin/kafka-consumer-group-reset.sh --generate  --group ConsumerGroup1 --topics topic1 --output kafka-consumer-group-reset.json

´´´

Verify JSON file

Once JSON file is generated, the tool will offer an option to validate it with the current state of the cluster. The tool will validate the following:

 

  • Topics exist

  • Partitions exist

  • Offsets exist

  • Latest/Current/Oldest offset

 

If topics or partitions don’t exist in the cluster, tool will advise that those topics/partitions won’t be processed.

If offset is not available anymore (e.g. retention remove it from the log), tool will advise that will reset the offset to the oldest or latest (i.e. resetting the offset using ‘auto.offset.reset’ policy).

Latest/Current/Oldest offset information will give the user the idea about where in the log the offset will be set.

 

This process will require 1 argument:

 

  • --reset-json-file: relative or absolute path to JSON file generated/customized.

 

Command:

´´´

bin/kafka-consumer-group-reset.sh --verify --reset-json-file kafka-consumer-group-reset.json

´´´

Execute JSON file

Once user feels comfortable with the Reset Plan file, could prepare the environment to execute the file.

To execute the reset process, the tool will require and validate that the Consumer Group become inactive first, to avoid inconsistencies in the client side.

This process will require 1 argument:

  • --reset-json-file: relative or absolute path to JSON file generated/customized.

 

Command:

´´´

bin/kafka-consumer-group-reset.sh --execute --reset-json-file kafka-consumer-group-reset.json

´´´

Finally user can use the ´verify´ option to validate that current and defined offset are the same.

 

Compatibility, Deprecation, and Migration Plan

 

This KIP won’t require a Migration Plan.


‘reset-to-datetime’ and ‘reset-from-period’ will require Timestamp Index, which will make them only available from version 0.10.1.0.

 

Nevertheless, it is possible to achieve the same result since release 0.10.0 using timestamp metadata by record, doing a linear search starting from latest. However, we are not considering to support this option because it will be an expensive/slow option.

 

All the other options will be available for releases from 0.10.0.0 given that KIP-97 is implemented.

 Test Plan

 

  • A unit test to validate correct command options by operation (generate, verify, execute)

  • A unit test to validate the functionality for each operation:

    • Generate:

      • Validate that topics consumed are obtained when no ´--topics´ argument is specified

      • Validate that current offsets by partition are obtained when default option is executed

      • Validate that an offset is obtained when a datetime is specified

      • Validate that oldest offset is obtained when a datetime is older than the oldest timestamp by partition

      • Validate that latest offset is obtained when a datetime is higher than the latest timestamp by partition

      • Validate that an offset is obtained when a period is specified

      • Validate that oldest offset is obtained when a period minus current timestamp is older than the oldest timestamp by partition

      • Validate that latest offset is obtained when a period minus current timestamp is higher than the latest timestamp by partition

    • Verify:

      • Validate that JSON structure is correct

      • Validate that topics exist

      • Validate that partitions exist

      • Validate that topics are actually consumed by Consumer Group

      • Warn that topic does not exist and nothing will be executed

      • Warn that topic is not been consumed by Consumer Group and nothing will be executed

      • Warn that partition does not exist and nothing will be executed.

      • Warn that specified offset is out of range (ie. from oldest to latest) and it will be reset to oldest or latest depending on which one is closer.

    • Execute:

      • Validate that ConsumerGroup is inactive.

      • Validate that offsets are updated in cluster after execution.

      • Warn that non-existing/not-consumed topic has not been updated

      • Warn that non-existing partitions has not been updated

      • Warn that offsets out of bound has been updated to oldest or latest, depending which one is closer.

Rejected Alternatives

None


1. Reset to specific offset: Move Consumer Group to specific offset by partition.

2. Reset to datetime: Find last offset by datetime, and reset offset by partition.

3. Reset by period: Find last offset from datetime calculated from an input period and current timestamp (e.g. 5 days ago.).


This process will be separated in 3 steps:


 

Generate JSON file with specific offsets by partition to reset

 

 

Verify JSON file consistency and current state of consumer group offsets

 

 

Execute the JSON file, resetting offsets to values specified.

 


Also, it will consider an option to generate and execute in one step.


This options will enable clients to move back and forward, depending on where they are currently in the topic.

For instance: if a topic T1 with 1 partition and 100 records (offsets 0-99), has one Consumer Group C1 inactive, with current offset 20. If we reset to a point in time where offset is 80, client will be moving forward. Another Consumer Group C2 that has been actively consuming records from T1, if it’s reset to the same point in time, it will be moving backward.

  • No labels