Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
@Override
public List<FindCoordinatorRequest> build(short version) {
    if (version < minVersion)
        throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                "because we require features supported only in " + minVersion + " or later.");
    return groupIds.entrySet()
            .stream()
            .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version))
            .collect(Collectors.toList());
}


ThenThen in doSend(),  use instanceof to check if batching has been useused

Code Block
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
...
try {
    NodeApiVersions versionInfo = apiVersions.get(nodeId);
    short version;
    // Note: if versionInfo is null, we have no server version information. This would be
    // the case when sending the initial ApiVersionRequest which fetches the version
    // information itself.  It is also the case when discoverBrokerVersions is set to false.
    if (versionInfo == null) {
        version = builder.latestAllowedVersion();
        if (discoverBrokerVersions && log.isTraceEnabled())
            log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                    "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
    } else {
        version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                builder.latestAllowedVersion());
    }
/**
 * Check if there is batching optimization, if there is, break them down and send them out one by one:
 if (builder.build(version) instance AbstractRequest){
    doSend(clientRequest, isInternalRequest, now, builder.build(version));
 } else if (builder.build(version) instance List<T>){
   for (AbstractRequest request: builder.buildFromBatch(version)){
      doSend(clientRequest, isInternalRequest, now, builder.build(version))
    }
 } else {
	log.error(..)
 }
 ...
}

...