THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Fetch errors will be recorded in two places:
Code Block |
---|
/** * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have * an in-flight fetch or pending fetch data. * @return number of fetches sent */ public int sendFetches() { ... client.send(fetchTarget, request) .addListener(new RequestFutureListener<ClientResponse>() { // (1) onSuccess handler of sendFetches when response is invalid @Override public void onSuccess(ClientResponse resp) { FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody(); FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id()); if (handler == null) { log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", fetchTarget.id()); return; } if (!handler.handleResponse(response)) { sensors.recordFetchError(); // ***Record error return; } ... } // (2) onFailure handler of sendFetches @Override public void onFailure(RuntimeException e) { sensors.recordFetchError(); // ***Record error FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id()); if (handler != null) { handler.handleError(e); } } ... } |
Compatibility, Deprecation, and Migration Plan
...