Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • New states will be introduced (see Rebalance States section above).  The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
  • Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.

    • It will still require a fetch event to poll heartbeat.  As only polling thread issues fetch events, and we want to respect the existing implementation.
  • Timeouts and timers will be reevaluated and possibly removed.
  • while loops will be reevaluated and possibly thrown out.  In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Rebalance State Timeout: maybe using the request timeout

Is there a better way to configure session interval and heartbeat interval?

User-provided Timeouts

The Consumer API provides a means for users to express that a given operation should be time-bound via optional timeout values.

Note
titleTimeouts are open to interpretation

A precise definition of the timeout policy and behavior is, unfortunately, undefined. The main clues as to their interworking are based on the API-level documentation as well as the source code itself. The documentation can be a little vague and the source code is not consistent throughout the different APIs. Also, Kafka does not provide any real time guarantees, of course. So the level of precision in describing the timeouts is rough. This leaves us in the situation in which there may more than one way to interpret how a timeout is implemented.

The approach taken is for users to provide a Duration object as one of the API method parameters. Not all of the APIs allow users to provide a timeout, but of those that do, they are either required or optional. For example, the following Consumer APIs require a timeout:

  • clientInstanceId
  • poll

On the other hand, the following APIs provide overloaded versions that allow the user to pass in an optional timeout:

...

Please see Java client Consumer timeouts for more detail on timeouts.

Compatibility

The new consumer should be backward compatible.

...