Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-7300

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The KafkaConsumer is a complex client that requires many different components to function properly. When a consumer failsis not operating properly, it can be difficult to identify the root cause and which component failed is causing issues (ConsumerCoordinator, Fetcher, ConsumerNetworkClient, etc).

...

Code Block
/**
 * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
 * an in-flight fetch or pending fetch data.
 * @return number of fetches sent
 */
public int sendFetches() {
...
client.send(fetchTarget, request)
    .addListener(new RequestFutureListener<ClientResponse>() {


		// (1) onSuccess handler of sendFetches when response is invalid
		@Override
		public void onSuccess(ClientResponse resp) {
    		FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
    		FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
    		if (handler == null) {
        		log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
            		fetchTarget.id());
        		return;
    		}
    		if (!handler.handleResponse(response)) {
        		sensors.recordFetchError();     // ***Record error
        		return;
    		}
    		...
		}		

		// (2) onFailure handler of sendFetches
		@Override
		public void onFailure(RuntimeException e) {
    		sensors.recordFetchError();        // ***Record error
    		FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
    		if (handler != null) {
        		handler.handleError(e);
    		}
		}
		...
}

...