...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
def onRevoked(revokedTopicPartitions: Set[TopicPartition], consumer: KafkaConsumer) = { for { _ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked") state <- currentStateRef.get streamsToEnd = state.assignedStreams.filter(control => revokedTps.contains(control.tp)) // Note, we run 1 stream per partition. _ <- ZIO.foreachParDiscardforeachDiscard(streamsToEnd)(_.end(consumer)) // <== Streams will commit not yet committed offsets _ <- awaitCommitsCompleted(consumer).timeout(15.seconds) _ <- ZIO.logTrace("onRevoked done") } yield () } |
...
Since this code is running in the rebalance listener callback, KafkaConsumer enforces that the commit methods must be invoked from the same thread as the thread that invoked onPartitionsRevoked
. Unfortunately, the ZIO runtime is inherently multi-threaded; tasks can be executed from any thread. There is no way Zio could support this limitation without a major rewrite.
The million dollar question: WHY CAN THIS CODE NOT RUN ON A SINGLE THREAD?
We want to use ZIO. ZIO does not support it.
Public Interfaces
Two new methods will be added to org.apache.kafka.clients.consumer.KafkaConsumer
: getThreadAccessKey
and setThreadAccessKey
.
...