Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

topic-1,0,100
topic-1,1,200
topic-1,2,0
topic-2,0,0

Implementation Details 

These options will be implemented inside the `ConsumerGroupCommand`. The `resetOffsets` operation will looks like this:

Code Block
languagescala
def resetOffsets(): Map[PartitionAssignmentState, Long] = {
  val groupId = opts.options.valueOf(opts.groupOpt)
  val (state, assignments) = describeGroup() //(1)
  assignments match {
    case None =>
      // applies to both old and new consumer
      printError(s"The consumer group '$groupId' does not exist.")
      Map.empty
    case Some(assignments) =>
      state match {
        case Some("Dead") =>
          printError(s"Consumer group '$groupId' does not exist.")
          Map.empty
        case Some("Empty") => //(2)
          val assignmentsToReset = getAssignmentsToReset(assignments) //(3)
          val assignmentsPrepared = prepareAssignmentsToReset(assignmentsToReset) //(4)
          val execute = opts.options.has(opts.executeOpt)
          if(execute) //(5)
            resetAssignments(assignmentsPrepared)
          assignmentsPrepared
        case Some("PreparingRebalance") | Some("AwaitingSync") =>
          printError(s"Consumer group '$groupId' offsets cannot be reset if it is rebalancing.")
          Map.empty
        case Some("Stable") =>
          printError(s"Consumer group '$groupId' offsets cannot be reset if it has members active.")
          Map.empty
        case other =>
          // the control should never reach here
          throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
      }
  }
}

(1) It will get assignments from `describeGroup` operation

(2) Then use `getAssignmentsToReset` to filter the assignments with the values defined by `–topic` of or `--all-topics`.

(3) Then get new offset by assignments using `prepareAssignmentsToReset`.

(4) It will be only executed when the ConsumerGroup selected is inactive o avoid race conditions. 

(5) It will be executed only if it is asked explictly. This will consist in create a Consumer using the same `group.id` and use:

Code Block
languagescala
consumer.seek(topicPartition, offset)
consumer.commitSync()

To change the offsets.

 

Compatibility, Deprecation, and Migration Plan

...