Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Launch a daemon thread that closes StreamThreads, GlobalStreamThread, and AdminClient. StreamThread and GlobalStreamThread have their own Producers and Consumers. All close methods are called without any timeout.
  2. Wait for the daemon thread to complete its job and change KafkaStreams state into NOT_RUNNING. (see KafkaStreams#waitOnState)

In short, the timeout semantics in KafkaStreams#close function is not for "enforcing to complete closing before this timeout", but "try to block on close for a maximum of this time, and if it still not complete then give up and return false", as described in the Javadoc:

Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the threads to join.

The problem is, the close semantics of internal clients (i.e., Producers, Consumers, and AdminClient) are different from each other. Consumer is closed with a default timeout of KafkaConsumer#DEFAULT_CLOSE_TIMEOUT_MS (30 seconds). In contrast, However, for Producer and AdminClient's default close timeout is Duration.ofMillis(Long.MAX_VALUE). By this reason, Kafka streams application may take a long time or even hang up for closing internal Producers or AdminClient.

To resolve this problem, this KIP proposes to improve KafkaSteams#closeprovide a close timeout parameter for closing internal clients.

Public Interfaces

There are 3 alternatives for providing close timeout parameter.

Type A. fix the close timeout as a constant

This approach makes This KIP make any change on the public interfacesinterface; however, it makes a subtle change to the existing API's semantics. If this KIP is accepted, documenting these semantics with as much detail as possible may much better.

Proposed Changes

...

it just uses a constant like KafkaStreams#DEFAULT_CLOSE_TIMEOUT as a client close timeout.

Pros

Easy to implement.

Cons

Users have any way to configure close timeout. Moreover, some users may want current close timeout, i.e., Duration.ofMillis(Long.MAX_VALUE).

Type B.

...

Take the given timeout parameter into KafkaStreams#close operation; that is,

  • It tries to close the resources in the remaining timeout as far as possible.

  • If succeeded, simply return; if not, close remaining resources with default close timeout.

Provide a new configuration option

Adding a new configuration option, close.wait.ms, and allow users to configure the close timeout. (default: 500ms)

Pros

Easy to configure.

Cons

Adds a new configuration option, which is already so many.

Type C. Extend KafkaStreams constructor

By adding a new optional parameter (closeTimeout) to the constructor of KafkaStreams, allow users to configure the close timeout if required (default: 500ms):

Code Block
languagejava
public KafkaStreams(final Topology topology, final Properties props, final Duration closeWaitTime);
public KafkaStreams(final Topology topology, final Properties props, final KafkaClientSupplier clientSupplier, final Duration closeWaitTime);
public KafkaStreams(final Topology topology, final Properties props, final Time time, final Duration closeWaitTime);
public KafkaStreams(final Topology topology, final Properties props, final KafkaClientSupplier clientSupplier, final Time time, final Duration closeWaitTime);

Pros

Allows the users to configure the close timeout, without adding new configuration option.

Cons

This approach adds 4 overload constructors; Since there are already 2 required parameters and 2 optional parameters, there are 4 constructors and now becomes 8 constructors - it is too many.

This problem can be mitigated by deprecating the overloaded constructors and providing KafkaStreams.Builder instead. However, this issue is above the scope of this KIP.

Proposed Changes

(under discussionWith this approach, the internal client hang-up problem can be resolved. Moreover, KafkaStreams#close now respects users' intention without changing the semantics. (i.e., it *tries* to close the application in given timeout, at least.)

Compatibility, Deprecation, and Migration Plan

...

By passing the existing unit tests.

Rejected Alternatives

...


Change the default close timeout for [Producer, AdminClient]#close

This approach does not allow the users to configure a close timeout, and breaks backward compatibility. So rejected

These options have very limited functionality, so rejected.

Add default timeout to Producer, AdminClient only (i.e., timeout parameter is not taken account into KafkaStreams#close operation)

This approach is the simplest one; however, from the user's perspective, this approach totally ignores the user's intention.