You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: KAFKA-7996

Released: (Not decided yet)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As of 2.2.0, KafkaStream#close works like the following:

  1. Launch a daemon thread that closes StreamThreads, GlobalStreamThread, and AdminClient. StreamThread and GlobalStreamThread have their own Producers and Consumers. All close methods are called without any timeout.
  2. Wait for the daemon thread to complete its job and change KafkaStreams state into NOT_RUNNING. (see KafkaStreams#waitOnState)

In short, the timeout semantics in KafkaStreams#close function is not for "enforcing to complete closing before this timeout", but "try to block on close for a maximum of this time, and if it still not complete then give up and return false", as described in the Javadoc:

Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the threads to join.

The problem is, the close semantics of internal clients (i.e., Producers, Consumers, and AdminClient) are different from each other. Consumer is closed with a default timeout of KafkaConsumer#DEFAULT_CLOSE_TIMEOUT_MS (30 seconds). In contrast, Producer and AdminClient's default close timeout is Duration.ofMillis(Long.MAX_VALUE). By this reason, Kafka streams application may take a long time or even hang up for closing internal Producers or AdminClient.

To resolve this problem, this KIP proposes to improve KafkaSteams#close.

Public Interfaces

This KIP make any change on public interfaces; however, it makes a subtle change to the existing API's semantics. If this KIP is accepted, documenting these semantics with as much detail as possible may much better.

Proposed Changes

  1. Change the default close timeout of Producer, AdminClient into more reasonable one, not Long.MAX_VALUE.

  2. Take the given timeout parameter into KafkaStreams#close operation; that is,

    • It tries to close the resources in the remaining timeout as far as possible.

    • If succeeded, simply return; if not, close remaining resources with default close timeout.

With this approach, the internal client hang-up problem can be resolved. Moreover, KafkaStreams#close now respects users' intention without changing the semantics. (i.e., it *tries* to close the application in given timeout, at least.)

Compatibility, Deprecation, and Migration Plan

None.

Test Plan

By passing the existing unit tests.

Rejected Alternatives


Provide configuration options to close internal clients

These options have very limited functionality, so rejected.

Add default timeout to Producer, AdminClient only (i.e., timeout parameter is not taken account into KafkaStreams#close operation)

This approach is the simplest one; however, from the user's perspective, this approach totally ignores the user's intention.

  • No labels