Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-7300

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The KafkaConsumer is a complex client that requires many different components to function properly. When a consumer failsis not operating properly, it can be difficult to identify the root cause and which component failed is causing issues (ConsumerCoordinator, Fetcher, ConsumerNetworkClient, etc).

...

Fetch errors will be recorded in two places:

Code Block
/**
 * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
 * an in-flight fetch or pending fetch data.
 * @return number of fetches sent
 */
public int sendFetches() {
...
client.send(fetchTarget, request)
    .addListener(new RequestFutureListener<ClientResponse>() {

		// (1) onSuccess handler of sendFetches when response is invalid

		@Override
		public void onSuccess(ClientResponse resp) {
    		FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
    		FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
    		if (handler == null) {
        		log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
            		fetchTarget.id());
        		return;
    		}
    		if (!handler.handleResponse(response)) {
        		sensors.recordFetchError();     // ***Record error
        		return;
    		}
    		...
		}		

		// (2) onFailure handler of sendFetches
		@Override
		public void onFailure(RuntimeException e) {
    		sensors.recordFetchError();        // ***Record error
    		FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
    		if (handler != null) {
        		handler.handleError(e);
    		}
		}
		...
}


Compatibility, Deprecation, and Migration Plan

...