THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
@Override public List<FindCoordinatorRequest> build(short version) { if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); return groupIds.entrySet() .stream() .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version)) .collect(Collectors.toList()); } |
ThenThen in doSend(), use instanceof to check if batching has been useused.
Code Block |
---|
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) { ... try { NodeApiVersions versionInfo = apiVersions.get(nodeId); short version; // Note: if versionInfo is null, we have no server version information. This would be // the case when sending the initial ApiVersionRequest which fetches the version // information itself. It is also the case when discoverBrokerVersions is set to false. if (versionInfo == null) { version = builder.latestAllowedVersion(); if (discoverBrokerVersions && log.isTraceEnabled()) log.trace("No version information found when sending {} with correlation id {} to node {}. " + "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version); } else { version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion()); } /** * Check if there is batching optimization, if there is, break them down and send them out one by one: if (builder.build(version) instance AbstractRequest){ doSend(clientRequest, isInternalRequest, now, builder.build(version)); } else if (builder.build(version) instance List<T>){ for (AbstractRequest request: builder.buildFromBatch(version)){ doSend(clientRequest, isInternalRequest, now, builder.build(version)) } } else { log.error(..) } ... } |
...