...
topic-1,0,100
topic-1,1,200
topic-1,2,0
topic-2,0,0
Implementation Details
These options will be implemented inside the `ConsumerGroupCommand`. The `resetOffsets` operation will looks like this:
Code Block | ||
---|---|---|
| ||
def resetOffsets(): Map[PartitionAssignmentState, Long] = {
val groupId = opts.options.valueOf(opts.groupOpt)
val (state, assignments) = describeGroup() //(1)
assignments match {
case None =>
// applies to both old and new consumer
printError(s"The consumer group '$groupId' does not exist.")
Map.empty
case Some(assignments) =>
state match {
case Some("Dead") =>
printError(s"Consumer group '$groupId' does not exist.")
Map.empty
case Some("Empty") => //(2)
val assignmentsToReset = getAssignmentsToReset(assignments) //(3)
val assignmentsPrepared = prepareAssignmentsToReset(assignmentsToReset) //(4)
val execute = opts.options.has(opts.executeOpt)
if(execute) //(5)
resetAssignments(assignmentsPrepared)
assignmentsPrepared
case Some("PreparingRebalance") | Some("AwaitingSync") =>
printError(s"Consumer group '$groupId' offsets cannot be reset if it is rebalancing.")
Map.empty
case Some("Stable") =>
printError(s"Consumer group '$groupId' offsets cannot be reset if it has members active.")
Map.empty
case other =>
// the control should never reach here
throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
}
}
} |
(1) It will get assignments from `describeGroup` operation
(2) Then use `getAssignmentsToReset` to filter the assignments with the values defined by `–topic` of or `--all-topics`.
(3) Then get new offset by assignments using `prepareAssignmentsToReset`.
(4) It will be only executed when the ConsumerGroup selected is inactive o avoid race conditions.
(5) It will be executed only if it is asked explictly. This will consist in create a Consumer using the same `group.id` and use:
Code Block | ||
---|---|---|
| ||
consumer.seek(topicPartition, offset)
consumer.commitSync() |
To change the offsets.
Compatibility, Deprecation, and Migration Plan
...