Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-4426

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleConsumer close with timeout
/**
 * Tries to close the consumer cleanly within the specified timeout. If the close does not complete within the
 * timeout, force close the producer.
 */
public void close(long timeout, TimeUnit unit);

 

 

Proposed Changes

Both close methods will attempt to commit offsets if necessary and leave group before the coordinator connection is closed. The maximum time allowed to complete each request initiated during close will be the request timeout. This ensures that close without timeout doesn't wait forever if requests are never completed, for example when brokers are shutdown. Note that this KIP only adds timeouts to the close codepath. Unbounded waits in the consumer during normal operation are not addressed by this KIP.

The new close(long timeout, TimeUnit unit) method will use the specified timeout as the maximum time for attempting to close gracefully. If commit or leave group request is not complete within the timeout, the connection is closed and the requests may not be sent to the broker.

The existing close() method without a timeout will attempt to close the consumer gracefully with a timeout of Long.MAX_VALUE instead of the . Since commit and leave group requests are timed out after the request timeout, the upper bound will be approximately 2*request.timeout.ms (around 10 minutes by default).

The hard-coded 5 second timeout . The current close logic that attempts to commit offsets if the coordinator is known and leave group will remain the same but will be executed with the specified timeout instead of the hard-coded 5-second timeout.to send requests to the broker before closing the coordinator connection will be removed. This will be replaced with the lower of request timeout or the time remaining from the close timeout specified. 

Compatibility, Deprecation, and Migration Plan

...