...
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-7300
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The KafkaConsumer is a complex client that requires many different components to function properly. When a consumer failsis not operating properly, it can be difficult to identify the root cause and which component failed is causing issues (ConsumerCoordinator, Fetcher, ConsumerNetworkClient, etc).
...
Fetch errors will be recorded in two places:
Code Block |
---|
/** * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have * an in-flight fetch or pending fetch data. * @return number of fetches sent */ public int sendFetches() { ... client.send(fetchTarget, request) .addListener(new RequestFutureListener<ClientResponse>() { // (1) onSuccess handler of sendFetches when response is invalid @Override public void onSuccess(ClientResponse resp) { FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody(); FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id()); if (handler == null) { log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", fetchTarget.id()); return; } if (!handler.handleResponse(response)) { sensors.recordFetchError(); // ***Record error return; } ... } // (2) onFailure handler of sendFetches @Override public void onFailure(RuntimeException e) { sensors.recordFetchError(); // ***Record error FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id()); if (handler != null) { handler.handleError(e); } } ... } |
Compatibility, Deprecation, and Migration Plan
...