Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
firstline1
titleonRevoked callback
linenumberstrue
def onRevoked(revokedTopicPartitions: Set[TopicPartition], consumer: KafkaConsumer) = {
  for {
    _            <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked")
    state        <- currentStateRef.get
    streamsToEnd = state.assignedStreams.filter(control => revokedTps.contains(control.tp)) // Note, we run 1 stream per partition.
    _            <- ZIO.foreachParDiscard(streamsToEnd)(_.end(consumer))    // <== Streams will commit not yet committed offsets
    _            <- ZIO.foreachParDiscard(streamsToEnd)(_.awaitCommitsCompleted())
                       consumer).timeout(15.seconds)
    _            <- ZIO.logTrace("onRevoked done")
  } yield ()
}

...