Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released: 0.11.0.0 [Change the link from the KIP proposal email archive to your own email thread]JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP wants to introduce a command-line tool to enable users to reset Consumer Group offsets.
Today, if users want to reprocess old records (i.e. records where offset is smaller than current consumer offset), they will need to modify client-side code (eg. using "KafkaConsumer#seek()" operations).
This process will involve involve write their own program which starts a consumer and commits offset or change client-side code, redeploy the application, and eventually rollback changes in application to consume records as usual. This is a cumbersome process given that users have to be aware of how many partitions are by topic, in which offset they are, and to which offset they want to move to.
...
* Some messages with earlier timestamp might also be consumed."
Proposed Changes
This process will be separated in 3 steps:
Generate Reset Plan file with specific offsets by partition to reset, for a specific Consumer Group
Verify that the Reset Plan file is consistent and the current state of Consumer Group offsets
Execute the Reset Plan file, resetting offsets to the values specified in it.
Also, we will consider an option to generate and execute in one step, for users that don't need to validate the Reset Plan file.
This options will enable clients to move back and forward, depending on where they are currently in the topic.
For instance: if a topic 'T1' with 1 partition and 100 records (offsets 0-99) has 2 Consumer Groups: one Consumer Group 'C1' inactive, with current offset 20, and other Consumer Group 'C2' that has been actively consuming records from T1. If we reset to a point in time where offset is 80, clients will be moving forward. But if C2 is reset to the same point in time, it will be moving backward.
We are considering to add the options in ConsumerGroupCommand to reset offsets.
- Plan: Print out the result of the operation (i.e. show a list of topic, partition, current offset, new offset to reset if the operation is executed). This will be the default option.
- Execute: Users will have to explicitly ask to execute the reset operation depending on the scenario and the scope specified.
- Export: Export plan to a CSV file to execute it later.
All these options are considered to be executed affecting only one Consumer Group.
The following scenarios will be supported:
ID | Scenario | Description |
---|---|---|
1. | Reset to Datetime | When we want to reset offsets to an specific point in time. (e.g. to 1/1/2017 at 00:00 to reprocess all records from these year) |
2. | Reset from Duration | When we want to go back some period ago (e.g. P7D to reprocess all records from one week ago) |
3. | Reset to Earliest | When we want to reprocess all the records available by partition. |
4. | Reset to Latest | When a Consumer Groups has an offset lag and we don't want to process missing records, only move to the latest. |
5. | Reset to Current Time | When we want to only print out and/or backup the current offset by partition. |
6. | Reset to Offset | When we want to move to an specific offset. |
7. | Shift Offset by 'n' | When we want to move forward or backward from the current position. 'n' can be a positive or negative value that will be add it to the current offset to move to a new position. |
8. | Reset from file | When we have a file with the required offsets by topic/partition to reset to. |
And the following scopes:
- All Topics consumed by Consumer Group: This scope will consider all the topics that has been consumed by a Consumer Group.
- Specific List of Topics: This scope will consider all the topics defined by user.
- One Topic, All Partitions: This scope will consider only one topic and all partitions.
- One Topic, Specific Partition: This scope will consider only one topic and partition specified by user.
You can check the concept-proof implementation of this feature on this pull request.
Public Interfaces
Consumer Group Reset Offset
...
This tool will offer 3 main operations:
--generate: This operation will generate a Reset Consumer Group Offset Reset Plan file.
--verify: This operation will validate that Consumer Group Offsets values defined in the Reset Plan file are valid in the cluster.
--execute: This operation will reset Consumer Group Offsets using values defined in the Reset Plan file.
Additionally, this tool with support an additional operation to generate and execute in the one step:
--generate-and-execute
Generate Reset Plan file
To reset Consumer Group offsets first we will need a Reset Plan file with the specific offsets by partitions to reset to. This file could be generated in 3 ways:
...
--reset-to-datetime YYYY-MM-DDTHH:mm:SS.sss
...
options
Main option
--reset-offsets
This option should be executed independently from other Consumer Group options (list, describe, delete, etc.) and it will only support new consumers.
Required Arguments
ID | Argument | Type | Descriptio |
---|---|---|---|
1. | --group | Required | Consumer Group ID. |
2. | --bootstrap-server | Required | Server to connect to. |
Scenarios
At least one of the scenarios should be defined to proceed with the execution
ID | Scenario | Arguments | Considerations | Example |
---|---|---|---|---|
1. | Reset to Datetime | --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm --to-datetime YYYY-MM-DDTHH:mm:SS.sssZ --to-datetime YYYY-MM-DDTHH:mm:SS.sss | Datetime must be specified in ISO8601 format. This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets. If the Timezone is not specified, it will use UTC. | Reset to first offset since 01 January 2017, 00:00:00 hrs --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z |
2. | Reset by Duration | --by-duration PnDTnHnMnS |
...
Default
...
Duration must be specified in ISO8601 format. This option will subtract the duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects. | Reset to first offset since one week ago (from current timestamp): -- |
...
--topics: comma-separated list of topics. Optional, if it’s not specified it will use all topics consumed by Consumer Group.
...
--output-file: relative/absolute path to an output file that will be populated with JSON file generated. Optional, if it’s not specified, JSON result will be printed in the terminal.
reset-offsets --group test.group --topic foo --by-duration P7D | ||||
3. | Reset to Earliest | --to-earliest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToBeginning` | Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest |
4. | Reset to Latest | --to-latest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToEnd` | Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest |
5. | Reset to Current Position | (no scenario arguments) | This option won't reset the offset. It will be used to print and export current offset. | Reset to current position: --reset-offsets --group test.group --topic foo |
6. | Reset to Offset | --to-offset | This option will reset offsets to an specific value. | Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1 |
7. | Shift Offset by 'n' | --shift-by n | This option will add the `n` value to the current offset, and reset to the result. `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive. If current offset + n is higher than the latest offset, new offset will be set to latest. If current offset + n is lower than the earliest offset, new offset will be set to earliest. | Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5 |
8 | Reset from File | --from-file PATH_TO_FILE | This option will take a Reset Plan CSV file with the offsets to reset by topics/partitions. It does not require scope, because topics and partitions are defined in the file. | Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv |
Scopes
The scopes will be defined extending the existing `topic` argument, using the following format:
Code Block |
---|
--topic <topic-name>:<partition numbers>
ex: --topic topic1 --topic topic2:0,1,2 |
ID | Scope | Arguments | Description | Example |
---|---|---|---|---|
1. | All topics | --all-topics | In this case the tool will run the scenario considering all topics that has been consumed by Consumer Group. It will consider all partitions by topic assigned. | Reset offsets in all topics and partitions: --reset-offsets --group test.group --all-topics –to-earliest |
2. | List of Topics | --topic topic1 --topic topic2 | In this case, the tool first will validate that input topics are been consumed by Consumer Group, and then run the scenario. It will consider all partitions by topic defined. | Reset offsets on all partitions of a list of topics: --reset-offsets --group test.group --topic foo --topic bar –to-earliest |
3. | Topics and List of Partitions | --topic topic1:0,1,2 --topic topic2:0 | In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider only the partitions specified by topic. | Reset offsets on specific partitions by topics: --reset-offsets --group test.group --topic foo:1,2 --topic bar:0,1,2,3 –to-earliest |
4. | Topic | --topic topic1 | In this case, the tool first will validate that input topic is been consumed by Consumer Group, and then run the scenario. It will consider all partitions. | Reset offsets on a specifc topic, all partitions; --reset-offsets --group test.group --topic foo --to-earliest |
Execution Options
ID | Option | Arguments | Description | Examples |
---|---|---|---|---|
1. | Plan | (no execution arguments) | This execution option will only print out the result of the scenario by scope. The output will look like this: TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID | Prints result of resetting all topics and partitions to earliest: --reset-offsets --group test.group --all-topics –to-earliest |
2. | Execute | --execute | This execution option will run the reset offset process based on scenario and scope. | Prints and execute resetting all topics and partitions to earliest: --reset-offsets --group test.group --all-topics –to-earliest --execute |
3. | Export | --export | This execution option will print out the reset plan in CSV format, that later could be used in the scenario 8. (i.e. as backup) | Prints plan for resetting all topics and partitions to earliest in CSV format: --reset-offsets --group test.group --all-topics –to-earliest --export Execute and Prints plan for resetting all topics and partitions to earliest in CSV format: --reset-offsets --group test.group --all-topics –to-earliest --export --execute |
Reset Plan File: CSV Format
The CSV schema will consist on:
<topic>,<partition>,<offset>
Sample:
topic-1,0,100
topic-1,1,200
topic-1,2,0
topic-2,0,0
Implementation Details
These options will be implemented inside the `ConsumerGroupCommand`. The `resetOffsets` operation will looks like this:
Code Block | ||
---|---|---|
| ||
def resetOffsets(): Map[PartitionAssignmentState, Long] = {
val groupId = opts.options.valueOf(opts.groupOpt)
val (state, assignments) = describeGroup() //(1)
assignments match {
case None =>
// applies to both old and new consumer
printError(s"The consumer group '$groupId' does not exist.")
Map.empty
case Some(assignments) =>
state match {
case Some("Dead") =>
printError(s"Consumer group '$groupId' does not exist.")
Map.empty
case Some("Empty") => //(2)
val assignmentsToReset = getAssignmentsToReset(assignments) //(3)
val assignmentsPrepared = prepareAssignmentsToReset(assignmentsToReset) //(4)
val execute = opts.options.has(opts.executeOpt)
if(execute) //(5)
resetAssignments(assignmentsPrepared)
assignmentsPrepared
case Some("PreparingRebalance") | Some("AwaitingSync") =>
printError(s"Consumer group '$groupId' offsets cannot be reset if it is rebalancing.")
Map.empty
case Some("Stable") =>
printError(s"Consumer group '$groupId' offsets cannot be reset if it has members active.")
Map.empty
case other =>
// the control should never reach here
throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
}
}
} |
(1) It will get assignments from `describeGroup` operation
(2) Then use `getAssignmentsToReset` to filter the assignments with the values defined by `–topic` of or `--all-topics`.
(3) Then get new offset by assignments using `prepareAssignmentsToReset`.
(4) It will be only executed when the ConsumerGroup selected is inactive o avoid race conditions.
(5) It will be executed only if it is asked explictly. This will consist in create a Consumer using the same `group.id` and use:
Code Block | ||
---|---|---|
| ||
consumer.assign(List(topicPartition).asJava)
consumer.seek(topicPartition, offset)
consumer.commitSync() |
To change the offsets.
Compatibility, Deprecation, and Migration Plan
Reset Plan File: JSON Format
Code Block | ||
---|---|---|
| ||
{
“offsets”: {
“group_id”: “ConsumerGroup1”,
“topics”: [
“topic1”: [
{
“partition”: 0,
“offset”: 0
},
{
“partition”: 1,
“offset”: 0
}
],
“topic2”: [
{
“partition”: 0,
“offset”: 0
},
{
“partition”: 1,
“offset”: 0
}
],
]
},
“version”: 1
} |
Reset to Timestamp
This option will support a DateTime format (https://www.w3.org/TR/xmlschema-2/#dateTime) as input. This value will be transformed to epoch milliseconds and then used to get offsets by partition.
Pros:
This options will be the most precise using timestamp, as you move to a specific point in time.
Cons:
This option will require users to know exactly where they want to move.
Command:
´´´
bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-to-timestamp 2017-01-01T09:00:00.000 --output kafka-consumer-group-reset.json
´´´
Reset From Period
This options will support Duration format (https://www.w3.org/TR/xmlschema-2/#duration) as input. This value will be subtracted from current timestamp from client’s machine to obtain the epoch milliseconds to be used to get offsets by partition.
Pros:
This option will be the easiest to use, as it does not require to know a specific timestamp, but give a point in time depending on when you execute the command.
Useful for debugging/development purposes where you just need to reprocess a bunch of records from an previous period.
Cons:
This option will be less accurate than previous one.
Command:
´´´
bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --reset-from-period P1M --output kafka-consumer-group-reset.json
´´´
Default
This option will generate a file with the current Consumer Groups offsets by partition. The result of this execution will generate a JSON file that could be used as a template to modify the offsets manually for all partitions.
This option will be useful for clusters prior to release 0.10.0.0, to create an initial file where they can specify specific offsets later. And also could be use as a fixed point in time before starting the application and later reset to this point.
Pros:
Easiest way to generate a JSON file with the right format.
Cons:
Could be cumbersome to users to modify specific offsets by partition.
Command:
´´´
bin/kafka-consumer-group-reset.sh --generate --group ConsumerGroup1 --topics topic1 --output kafka-consumer-group-reset.json
´´´
Verify JSON file
Once JSON file is generated, the tool will offer an option to validate it with the current state of the cluster. The tool will validate the following:
Topics exist
Partitions exist
Offsets exist
Latest/Current/Oldest offset
If topics or partitions don’t exist in the cluster, tool will advise that those topics/partitions won’t be processed.
If offset is not available anymore (e.g. retention remove it from the log), tool will advise that will reset the offset to the oldest or latest (i.e. resetting the offset using ‘auto.offset.reset’ policy).
Latest/Current/Oldest offset information will give the user the idea about where in the log the offset will be set.
This process will require 1 argument:
--reset-json-file: relative or absolute path to JSON file generated/customized.
Command:
´´´
bin/kafka-consumer-group-reset.sh --verify --reset-json-file kafka-consumer-group-reset.json
´´´
Execute JSON file
Once user feels comfortable with the Reset Plan file, could prepare the environment to execute the file.
To execute the reset process, the tool will require and validate that the Consumer Group become inactive first, to avoid inconsistencies in the client side.
This process will require 1 argument:
--reset-json-file: relative or absolute path to JSON file generated/customized.
Command:
´´´
bin/kafka-consumer-group-reset.sh --execute --reset-json-file kafka-consumer-group-reset.json
´´´
Finally user can use the ´verify´ option to validate that current and defined offset are the same.
Compatibility, Deprecation, and Migration Plan
...
This KIP won’t require a Migration Plan.
‘reset-to-datetime’ and ‘reset-fromto-period’ will require Timestamp Index, which will make them only available from version 0.10.1.0.
Nevertheless, it is possible to achieve the same result since release 0.10.0 using timestamp metadata by record, doing a linear search starting from latest. However, we are not considering to support this option because it will be an expensive/slow option.
All the other options will be available for releases from 0.10.0.0 given that KIP-97 is implemented.
...
This KIP will not support old consumers that store offsets in Zookeeper.
Test Plan
...
- A unit test to validate
...
- that --reset-offset is executed independently from other Consumer Group options (list, delete, describe, etc.)
- A unit test to validate
...
Generate:
...
- that only one scenario is specified
...
Validate that current offsets by partition are obtained when default option is executed
...
Validate that an offset is obtained when a datetime is specified
...
Validate that oldest offset is obtained when a datetime is older than the oldest timestamp by partition
...
Validate that latest offset is obtained when a datetime is higher than the latest timestamp by partition
...
Validate that an offset is obtained when a period is specified
...
Validate that oldest offset is obtained when a period minus current timestamp is older than the oldest timestamp by partition
...
Validate that latest offset is obtained when a period minus current timestamp is higher than the latest timestamp by partition
- A unit test to validate that only one scope is specified
- A unit test to validate that only one execution option is specified
- A unit test by combination of scenario, scope and execution option.
- A unit test to validate that when calculated offset by partition is older that the earliest offset, tool resets offset to earliest (e.g. when we specified --reset-to 0, when earliest offset is 10)
- A unit test to validate that when calculated offset by partition is bigger that the latest offset, tool resets offset to latest (e.g. when we specified --reset-to 0, when earliest offset is 10)
Rejected Alternatives
None
(no scenario arguments)
...
Verify:
Validate that JSON structure is correct
Validate that topics exist
Validate that partitions exist
Validate that topics are actually consumed by Consumer Group
Warn that topic does not exist and nothing will be executed
Warn that topic is not been consumed by Consumer Group and nothing will be executed
Warn that partition does not exist and nothing will be executed.
Warn that specified offset is out of range (ie. from oldest to latest) and it will be reset to oldest or latest depending on which one is closer.
...
Execute:
Validate that ConsumerGroup is inactive.
Validate that offsets are updated in cluster after execution.
Warn that non-existing/not-consumed topic has not been updated
Warn that non-existing partitions has not been updated
- Warn that offsets out of bound has been updated to oldest or latest, depending which one is closer.
Rejected Alternatives
None
1. Reset to specific offset: Move Consumer Group to specific offset by partition.
2. Reset to datetime: Find last offset by datetime, and reset offset by partition.
3. Reset by period: Find last offset from datetime calculated from an input period and current timestamp (e.g. 5 days ago.).
This process will be separated in 3 steps:
Generate JSON file with specific offsets by partition to reset
Verify JSON file consistency and current state of consumer group offsets
Execute the JSON file, resetting offsets to values specified.
Also, it will consider an option to generate and execute in one step.
This options will enable clients to move back and forward, depending on where they are currently in the topic.
For instance: if a topic T1 with 1 partition and 100 records (offsets 0-99), has one Consumer Group C1 inactive, with current offset 20. If we reset to a point in time where offset is 80, client will be moving forward. Another Consumer Group C2 that has been actively consuming records from T1, if it’s reset to the same point in time, it will be moving backward.