Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of " Under Discussion", "Accepted", "Rejected"]
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] (not up to date)
JIRA: here [Change the link from KAFKA-1 to your own ticket] KAFKA-14972
Proposed implementation: pull request 14071
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
Design goal
The goal of this KIP is to allow consumer callbacks to call the consumer again from another thread.
Motivation
Rebalances cause a lot of message duplication. This can be prevented by doing commits in the partition-revoked callback. This KIP will make it much easier to do work in that callback when an async runtime is used.
The JVM based KafkaConsumer contains a check that rejects nested invocations from different threads (in method acquire). For programs that use an async runtime, this is an almost impossible requirement. Also, the check is more strict than is required; we only need to validate that there is no concurrent access to the consumer.
Examples of affected async runtimes are Kotlin co-routines (see KAFKA-7143) and Zio.
Here follows a condensed example of how we'd like to use ZIO in the rebalance listener callback from the zio-kafka library.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
def onRevoked(revokedTopicPartitions: Set[TopicPartition], consumer: KafkaConsumer) = {
for {
_ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked")
state <- currentStateRef.get
streamsToEnd = state.assignedStreams.filter(control => revokedTps.contains(control.tp)) // Note, we run 1 stream per partition.
_ <- ZIO.foreachParDiscard(streamsToEnd)(_.end(consumer)) // <== Streams will commit not yet committed offsets
_ <- awaitCommitsCompleted(consumer).timeout(15.seconds)
_ <- ZIO.logTrace("onRevoked done")
} yield ()
}
|
This code is run using the ZIO-runtime as follows from the {{ConsumerRebalanceListener::onPartitionsRevoked}} method:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit = {
Unsafe.unsafe { implicit u =>
runtime.unsafe
.run(onRevoked(partitions.asScala.toSet, consumer))
.getOrThrowFiberFailure()
()
}
} |
(Note that this code is complex on purpose, starting a ZIO workflow from scratch is not something you would normally do.)
Look at line 6 of the first code block. In method end
the stream will try to call consumer::commitAsync(offsets, callback)
. In awaitCommitsCompleted()
(line 7) we call consumer::commitSync(Collections.emptyMap)
to wait untill all callbacks are invoked.
Since this code is running in the rebalance listener callback, KafkaConsumer enforces that the commit methods must be invoked from the same thread as the thread that invoked onPartitionsRevoked
. Unfortunately, the ZIO runtime is inherently multi-threaded; tasks can be executed from any thread. There is no way Zio could support this limitation without a major rewrite.
Public Interfaces
Two methods will change from private to protected: org.apache.kafka.clients.consumer.KafkaConsumer:acquire
and in the same class method ::release
.
Proposed Changes
See section 'public interfaces' above.
The change will allow custom sub-classes to implement acquire
and release
any way they like.
Implementation rules for sub classes that override acquire and release
Methods acquire
and release
ensure that only 1 thread can invoke the consumer at a time. Similarly, they ensure that only 1 thread can invoke the consumer from code that is running in a consumer callback.
Methods acquire
and release
also need to make sure that memory writes from all threads involved are visible for each other.
When acquire
and release
are overridden, it is up to the implementation to uphold these requirementsDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
For existing users nothing changes. There is no need to deprecate anything. No migration is needed.
Test Plan
Since the change does not change behavior of the library, no additional tests are needed.
Rejected Alternatives
See KIP-944 for a viable alternativeIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.