Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  [One of " Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] (not up to date)

JIRA: here [Change the link from KAFKA-1 to your own ticket] KAFKA-14972

Proposed implementation: pull request 14071

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Design goal

The goal of this KIP is to allow consumer callbacks to call the consumer again from another thread.

Motivation

Rebalances cause a lot of message duplication. This can be prevented by doing commits in the partition-revoked callback. This KIP will make it much easier to do work in that callback when an async runtime is used.

The JVM based KafkaConsumer contains a check that rejects nested invocations from different threads (in method acquire). For programs that use an async runtime, this is an almost impossible requirement. Also, the check is more strict than is required; we only need to validate that there is no concurrent access to the consumer.

Examples of affected async runtimes are Kotlin co-routines (see KAFKA-7143) and Zio.

Here follows a condensed example of how we'd like to use ZIO in the rebalance listener callback from the zio-kafka library.

Code Block
languagescala
firstline1
titleonRevoked callback
linenumberstrue
def onRevoked(revokedTopicPartitions: Set[TopicPartition], consumer: KafkaConsumer) = {
  for {
    _            <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked")
    state        <- currentStateRef.get
    streamsToEnd = state.assignedStreams.filter(control => revokedTps.contains(control.tp)) // Note, we run 1 stream per partition.
    _            <- ZIO.foreachParDiscard(streamsToEnd)(_.end(consumer))    // <== Streams will commit not yet committed offsets
    _            <- awaitCommitsCompleted(consumer).timeout(15.seconds)
    _            <- ZIO.logTrace("onRevoked done")
  } yield ()
}

This code is run using the ZIO-runtime as follows from the {{ConsumerRebalanceListener::onPartitionsRevoked}} method:

Code Block
languagescala
titleRunning ZIO code from callback
linenumberstrue
def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit = {
  Unsafe.unsafe { implicit u =>
    runtime.unsafe
           .run(onRevoked(partitions.asScala.toSet, consumer))
           .getOrThrowFiberFailure()
   ()
  }
}

(Note that this code is complex on purpose, starting a ZIO workflow from scratch is not something you would normally do.)

Look at line 6 of the first code block. In method end the stream will try to call consumer::commitAsync(offsets, callback). In awaitCommitsCompleted() (line 7) we call consumer::commitSync(Collections.emptyMap) to wait untill all callbacks are invoked.

Since this code is running in the rebalance listener callback, KafkaConsumer enforces that the commit methods must be invoked from the same thread as the thread that invoked onPartitionsRevoked. Unfortunately, the ZIO runtime is inherently multi-threaded; tasks can be executed from any thread. There is no way Zio could support this limitation without a major rewrite.

Public Interfaces

Two methods will change from private to protected: org.apache.kafka.clients.consumer.KafkaConsumer:acquire and in the same class method ::release.

Proposed Changes

See section 'public interfaces' above.

The change will allow custom sub-classes to implement acquire and release any way they like.

Implementation rules for sub classes that override acquire and release

Methods acquire and release ensure that only 1 thread can invoke the consumer at a time. Similarly, they ensure that only 1 thread can invoke the consumer from code that is running in a consumer callback.

Methods acquire and release also need to make sure that memory writes from all threads involved are visible for each other.

When acquire and release are overridden, it is up to the implementation to uphold these requirementsDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

For existing users nothing changes. There is no need to deprecate anything. No migration is needed.

Test Plan

Since the change does not change behavior of the library, no additional tests are needed.

Rejected Alternatives

See KIP-944 for a viable alternativeIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.