THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
def onRevoked(revokedTopicPartitions: Set[TopicPartition], consumer: KafkaConsumer) = { for { _ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked") state <- currentStateRef.get streamsToEnd = state.assignedStreams.filter(control => revokedTps.contains(control.tp)) // Note, we run 1 stream per partition. _ <- ZIO.foreachParDiscard(streamsToEnd)(_.end(consumer)) // <== Streams will commit not yet committed offsets _ <- ZIO.foreachParDiscard(streamsToEnd)(_.awaitCommitsCompleted()) consumer).timeout(15.seconds) _ <- ZIO.logTrace("onRevoked done") } yield () } |
...