...
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-7300
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The KafkaConsumer is a complex client that requires many different components to function properly. When a consumer failsis not operating properly, it can be difficult to identify the root cause and which component failed is causing issues (ConsumerCoordinator, Fetcher, ConsumerNetworkClient, etc).
...
Code Block |
---|
/**
* Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
* an in-flight fetch or pending fetch data.
* @return number of fetches sent
*/
public int sendFetches() {
...
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
// (1) onSuccess handler of sendFetches when response is invalid
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
sensors.recordFetchError(); // ***Record error
return;
}
...
}
// (2) onFailure handler of sendFetches
@Override
public void onFailure(RuntimeException e) {
sensors.recordFetchError(); // ***Record error
FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
}
}
...
} |
...