Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • New states will be introduced (see Rebalance States section above).  The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
  • Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.

    • It will still require a fetch event to poll heartbeat.  As only polling thread issues fetch events, and we want to respect the existing implementation.
  • Timeouts and timers will be reevaluated and possibly removed.
  • while loops will be reevaluated and possibly thrown out.  In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Rebalance State Timeout: maybe using the request timeout

Is there a better way to configure session interval and heartbeat interval?

API Timeouts

Many of the Consumer APIs provide a means for users to express that an operation should adhere to a timeout, provided as a Duration object as one of the API method parameters. Not all of the APIs allow users to provide a timeout, but of those that do, the timeout is either required or optional

Note
titleTimeouts are open to interpretation

A precise definition of the timeout policy and behavior is, unfortunately, undefined. The main clues as to their interworking are based on the API-level documentation as well as the source code itself. The documentation can be a little vague and the source code is not consistent throughout the different APIs. Also, Kafka does not provide any real time guarantees, so the level of precision in describing the timeouts is rough. This leaves us in the situation in which there may be more than one way to interpret how a timeout is implemented.

The following Consumer APIs require a timeout:

  • clientInstanceId
  • poll

The following Consumer APIs provide overloaded versions that allow the user to pass in an optional timeout:

  • beginningOffsets
  • close
  • commitSync
  • committed
  • endOffsets
  • listTopics
  • offsetsForTimes
  • partitionsFor
  • position 

Network I/O Timeouts

When a timeout-based Consumer API is invoked, that timeout value provides an upper-bound for the aggregation of the entire set of operations required by that API call. That is, the length of time for all the constituent operations of that API call must be less than or equal to the timeout provided by the user. In practice, timeouts are largely used to time-bound network I/O. The communication between the client and brokers is going to constitute the majority of the time for many operations. Allowing the user to provide an upper bound on the total time of these operations provides some protection against network issues.

Retries

In the following diagram, we see that the user has invoked a Consumer API call with a timeout:

...

For API calls that require network I/O operations, the Consumer will issue network requests to the Kafka cluster. Each of those distinct network requests include their own timeout value. The 

and, which will be the smaller of the request.timeout.ms configuration value and the remaining timeout value.

Timer

When a user provides a timeout value to a Consumer API, a Timer object is immediately created to track the elapsed/remaining time for processing. While a Duration object provides a fixed value of the overall timeout, the Timer tracks how much time remains since it was first created. At certain points during processing, the Timer.update() API is invoked to determine the elapsed/remaining time for processing.

The logic in the Timer class does not in itself magically enforce any timeouts. The code that uses the Timer object must interact with it explicitly to update it (update()) and query it (remainingMs(), isExpired(), and notExpired()) to determine the remaining value of the timeout.

The Timer class is not designed to be thread-safe. Although it might be useful to reuse the same Timer object in both the application and network I/O threads, this is currently ill-advised due to the lack of thread safety. This will likely force us to have two separate Timer instances (one for each thread), which is less than ideal (sad)Please see Java client Consumer timeouts for more detail on timeouts.

Compatibility

The new consumer should be backward compatible.

...