Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Examples of affected async runtimes are Kotlin co-routines (see KAFKA-7143) and Zio.

Here follows a condensed example of how we'd like to use ZIO in the rebalance listener callback from the zio-kafka library.

Code Block
languagescala
firstline1
titleonRevoked callback
linenumberstrue
def onRevoked(revokedTopicPartitions: Set[TopicPartition], consumer: KafkaConsumer) = {
  for {
    _            <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked")
    state        <- currentStateRef.get
    streamsToEnd = state.assignedStreams.filter(control => revokedTps.contains(control.tp)) // Note, we run 1 stream per partition.
    _            <- ZIO.foreachParDiscard(streamsToEnd)(_.end(consumer))    // <== Streams will commit not yet committed offsets
    _            <- ZIO.foreachParDiscard(streamsToEnd)(_.awaitCommitsCompleted())
                       .timeout(15.seconds)
    _            <- ZIO.logTrace("onRevoked done")
  } yield ()
}

This code is run using the ZIO-runtime as follows from the {{ConsumerRebalanceListener::onPartitionsRevoked}} method:

Code Block
languagescala
titleRunning ZIO code from callback
linenumberstrue
def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit = {
  Unsafe.unsafe { implicit u =>
    runtime.unsafe
           .run(onRevoked(partitions.asScala.toSet, consumer))
           .getOrThrowFiberFailure()
   ()
  }
}

(Note that this code is complex on purpose, starting a ZIO workflow from scratch is not something you would normally do.)

Look at line 6 of the first code block. In method end the stream will try to call consumer::commitAsync(offsets, callback). In awaitCommitsCompleted() we call consumer::commitSync(Collections.emptyMap) to wait untill all callbacks are invoked.

Since this code is running in the rebalance listener callback, KafkaConsumer enforces that the commit methods must be invoked from the same thread as the thread that invoked onPartitionsRevoked. Unfortunately, the ZIO runtime is inherently multi-threaded; tasks can be executed from any thread. There is no way Zio could support this limitation without a major rewrite.

Public Interfaces

Two new methods will be added to org.apache.kafka.clients.consumer.KafkaConsumer: getThreadAccessKey and setThreadAccessKey.

...