Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added explanation of the new and old semantics for poll, which we previously discussed in the mailing list.

...

the new KafkaConsumer#commitSync will accept user-specified timeout.  

Consumer#poll

Additionally, The pre-existing variant poll() currently has two use cases: to block on initial assignment metadata (and not poll for records), and to poll for records. We'll discard the first (unintentional) use case and truly enforce the timeout in poll for both metadata and datalong timeout) would block indefinitely for metadata updates if they were needed, then it would issue a fetch and poll for timeout ms for new records. The initial indefinite metadata block caused applications to become stuck when the brokers became unavailable. The existence of the timeout parameter made the indefinite block especially unintuitive.

We will add a new method poll(Duration timeout) with the semantics:

  1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses (counts against timeout)
      • if no response within timeout, return an empty collection immediately
  2. if there is fetch data available, return it immediately
  3. if there is no fetch request in flight, send fetch requests
  4. poll for fetch responses (counts against timeout)
    • if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
    • if we get a response, return the response

We will deprecate the original method, poll(long timeout), and we will not change its semantics, so it remains:

  1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses indefinitely until we get it
  2. if there is fetch data available, return it immediately
  3. if there is no fetch request in flight, send fetch requests
  4. poll for fetch responses (counts against timeout)
    • if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
    • if we get a response, return the response

One notable usage is prohibited by the new poll: previously, you could call poll(0) to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee that poll(0) won't return records the first time it's called. Therefore, it has always been unsafe to ignore the response.


Note that poll() doesn't throw a TimeoutException because its async semantics are well defined. I.e., it is well defined to return an empty response when there's no data available, and it's designed to be called repeatedly to check for data (hence the name).

...