Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Fetch errors will be recorded in two places:

Code Block
/**
 * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
 * an in-flight fetch or pending fetch data.
 * @return number of fetches sent
 */
public int sendFetches() {
...
client.send(fetchTarget, request)
    .addListener(new RequestFutureListener<ClientResponse>() {


		// (1) onSuccess handler of sendFetches when response is invalid

		@Override
		public void onSuccess(ClientResponse resp) {
    		FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
    		FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
    		if (handler == null) {
        		log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
            		fetchTarget.id());
        		return;
    		}
    		if (!handler.handleResponse(response)) {
        		sensors.recordFetchError();     // ***Record error
        		return;
    		}
    		...
		}		

		// (2) onFailure handler of sendFetches
		@Override
		public void onFailure(RuntimeException e) {
    		sensors.recordFetchError();        // ***Record error
    		FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
    		if (handler != null) {
        		handler.handleError(e);
    		}
		}
		...
}


Compatibility, Deprecation, and Migration Plan

...