Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: KAFKA-4426

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

A new close method with timeout will be added to the Consumer interface. The existing close() method will attempt to close gracefully (timeout=Long.MAX_VALUE).with a default timeout of 30 seconds.

Code Block
languagejava
titleConsumer close with timeout
/**
 * Tries to close the consumer cleanly within the specified timeout. If the close does not complete within the
 * timeout, force close the producerconsumer.
 */
public void close(long timeout, TimeUnit unit);

 

 

Proposed Changes

Both close methods will attempt to commit offsets if necessary and leave group before the coordinator connection is closed. The maximum time allowed to complete each request initiated during close will be the request timeout. This ensures that close with large timeout doesn't wait forever if requests are never completed, for example when brokers are shutdown. Note that this KIP only adds timeouts to the close code path. Unbounded waits in the consumer during normal operation are not addressed by this KIP.

The new close(long timeout, TimeUnit unit) method will use the specified timeout as the maximum time for attempting to close gracefully. If commit or leave group request is not complete within the timeout, the connection is closed and the requests may not be sent to the broker.

The existing close() method without a timeout will attempt to close the consumer gracefully with a default timeout of 30 seconds. This is different from the producer default of Long.MAX_VALUE instead of the since consumers don't have large requests. Applications can specify a timeout of Long.MAX_VALUE to attempt graceful close without a timeout. Since commit and leave group requests are timed out after the request timeout, the upper bound will be approximately 2*request.timeout.ms (around 10 minutes by default).

The hard-coded 5 second timeout . The current close logic that attempts to commit offsets if the coordinator is known and leave group will remain the same but will be executed with the specified timeout instead of the hard-coded 5-second timeout.to send requests to the broker before closing the coordinator connection will be removed. This will be replaced with the lower of request timeout or the time remaining from the close timeout specified. 

Compatibility, Deprecation, and Migration Plan

...

Versions 0.10.1.0 and earlier closed consumers without waiting for commits or leave group requests to be sent. With this change, consumer.close() method waits for all pending requests to be sent sent  for upto 30 seconds and hence close() could take longer to complete.

...

Unit tests added in KAFKA-3703 to test the hard-coded timeout will be altered to work with the new close timeout. Existing integration and system tests already test close logic without with default timeout.

Rejected Alternatives

...

A single close method with a small hard-coded timeout can be used to attempt to close gracefully if possible. But this is inconsistent with the Producer where applications can choose to wait for a longer time or not wait at all for graceful close.

Use a Long.MAX_VALUE as default timeout

Producer.close() uses Long.MAX_VALUE as default timeout. Consumers could also use the same default. But this could result in waits of upto 10 minutes for the default close() method to complete with the default request timeout of 5 minutes. Since consumers only attempt to commit offsets and leave group during close() on a best-effort basis, a smaller default value is more suitable.