Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
def listGroupOffsets(groupId: String): Map[TopicPartition, PartitionData] = {
 val coordinator = findCoordinator(groupId)
 val responseBody = send(coordinator, ApiKeys.OFFSET_FETCH, new OffsetFetchRequest(groupId, List()))
 val response = new OffsetFetchResponse(responseBody)responseBody.asInstanceOf[OffsetFetchResponse]
 response.responseData().asScala.toMap
}

One benefit of using this method instead of using the KafkaConsumer's committed method is that we no longer need to create the dummy consumer to retrieve offsets. The other benefit here is that with one API call all offsets within the group are returned. Whereas, in the existing describe group implementation, for each topic partition in the group one API call is made.

...