Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion Adopted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

Vote thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]6987

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The admin client makes extensive use of KafkaFuture in its *Result classes to enable clients the choice between synchronous and asynchronous programming styles. KafkaFuture originally had to work on versions of Java which didn't support CompletionStage/CompletableFuture, and so was intended to provide a useful subset of that missing JDK functionality. That constraint no longer applies. It would be beneficial to provide a richer API, like that of CompletableFuture, or at least of CompletionStage, and also compatibility with 3rd party APIs which require an actual actual CompletionStage or CompletableFuture or CompletionStage instance.

This KIP proposes to add a KafkaFuture#toCompletableFutureKafkaFuture.toCompletionStage() method as a backward compatible solution to this problem.

A toCompletableFutureAdding toCompletionStage() method is preferred over toCompletionStage sufficient because CompletableFuture implements CompletionStage anyway, so for callers who actually need a CompletableFuture it's slightly simpler to call toCompletableFuture() than CompletionStage itself exposes toCompletableFuture(), so anyone who needs an actual CompletableFuture (e.g. for interoperating with 3rd party APIs that require one) can get one. However, CompletableFuture exposes methods for future completion which should not be called by users (only the Admin client should be completing the returned futures), so calling these will be prevented. It is expected that users wanting to block on the completion of the KafkaFuture would use kafkaFuture.get(), rather than calling kafkaFuture.toCompletionStage().toCompletableFuture() (and CompletableFuture#toCompletableFuture.get() returns this anyway, so there's nothing to be gained from trying to hide the CompletableFuture instance)the need to access the CompleteableFuture should be rare.

Now also seems like a good opportunity to:

  • Remove the @InterfaceStability.Evolving annotation on KafkaFuture to reflect the reality that changing this class incompatibly would cause of too much user code to break.
  • Deprecate the static class KafkaFuture.Function, which already had Javadoc noting that KafkaFuture.BaseFunction was preferred.
  • Annotating KafkaFuture.Function.BaseFunction and .BiFunction with @FunctionalInterface , like the corresponding interfaces in java.util.

The methods of future admin client *Result classes Future admin client methods would continue to use KafkaFuture for the sake of consistency.

...

Code Block
languagejava
linenumberstrue
/**
 * A flexible future which supports call chaining and other asynchronous programming patterns. This will
 * eventually become a thin shim on top of Java 8's CompletableFuture.
 * A flexible future which supports call chaining and other asynchronous programming patterns.
 *
 * <h3>Relation to {@code CompletableFuture}</h3>
 * <p>This class exists because support for a Future-like construct was once needed on Java versions predating
 * the addition of {@code CompletableFuture}. It is now possible to obtain a {@code CompletableFutureCompletionStage} from a
 * {@code KafkaFuture} instance by calling {@link #toCompletableFuture#toCompletionStage()}.
 * If converting {@link KafkaFuture#whenComplete(BiConsumer)} or {@link KafkaFuture#thenApply(BaseFunction)} to
 * {@link CompletableFuture#whenComplete(java.util.function.BiConsumer)} or
 * {@link CompletableFuture#thenApply(java.util.function.Function)} be aware that the returned
 * {@code KafkaFuture} will fail with an {@code ExecutionException}, where as a {@code CompletableFuture} fails
 * with a {@code CompletionException}.
 */
class KafkaFuture<Void> {

  // ... existing methods ...
  
  /**
   * Get a CompletableFutureCompletionStage with the same completion properties as this KafkaFuture.
   * The returned futureinstance will complete when this future completes and in the same way 
   * (with the same result or exception).
   *
   */
  CompletableFuture toCompletableFuture <p>Calling toCompletableFuture() on the returned instance will yield a CompletableFuture,
   * but invocation of the completion methods (complete() and other methods in the complete*() 
   * and obtrude*() families) on that CompleteableFuture instance will result in 
   * UnsupportedOperationException being thrown. Unlike a minimal CompletableFuture
   * the get*() and other methods of that CompletableFuture not inherited from CompletionStage 
   * will work normally.
   *
   * <p>If you want to block on the completion of a KafkaFuture you should use
   * {@link #get()}, {@link #get} or {@link #getNow(Object)}, rather then calling
   * {@code .toCompletionStage().toCompletableFuture().get()} etc.

   */
  CompletionStage toCompletionStage();
  
  /**
   * A function which takes objects of type A and returns objects of type B.
   *
   * Prefer the functional interface {@link BaseFunction} over the class {@link Function}.  This class is here for
   * backwards compatibility reasons and might be deprecated/removed in a future release.
   * @deprecated Replaced by the functional interface {@link BaseFunction} over the class {@link Function}.
   */
  @Deprecated // adding this
  public static abstract class Function<A, B> implements BaseFunction<A, B> { }

}

...

Some work has already been done to thoroughly test the existing KafkaFuture API and reimplement it using a CompletableFuture internally.

To get the required completion-safety properties a new (internal) KafkaCompletableFuture class, a subclass of CompletableFuture, will be introduced. This KIP would will allow access to the wrapped CompletableFuture instancethe instance of that subclass wrapped by a KafkaFutureImpl, and that instance will be completed within KafkaFutureImpl via a different method than the complete/completeExceptionally that it inherits from CompletableFuture.

KafkaFutureImpl would gain a new public constructor for wrapping a CompletableFutureKafkaCompletableFuture, which will allow implementation of KafkaFuture#allOf() to be simplified.

Compatibility, Deprecation, and Migration Plan

The addition of toCompetableFuture toCompetionStage is backwards compatible.

As noted, KafkaFuture.Function will be formally deprecated. The lambda-compatible BaseFunction has existed and been documented as preferred for since Kafka XXX1.1.
The actual removal of KafkaFuture.Function can be done in some future major version of Kafka.

...