Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

However, this KIP will change this pattern, because it introduces a batching optimization in the requests, meaning combining MANY requests into ONE request. So we need support a one-to-many backward compatible conversion for requests along with the implementation of this KIP.

This can be implemented in NetworkClient.java doSend(), in doSend(), it call builder.build() to build ONE request and send it out. What we need to do is modify this logic to something similar:

One The most tricky question is, how do we know if a higher version API has a batching optimization.  Discussion is still required and suggestions are highly encouraged, some of the possible solutions involving adding extra fields in AbstractRequest.java.

a) One solution is to let the request's builder.build() return either ONE request or a LIST of request. This is backward compatible. We can have a list of one single element.

For example, in FindCoordinatorRequest.java, buillder.build will become something like this:

Code Block
@Override
public List<FindCoordinatorRequest> build(short version) {
    if (version < minVersion)
        throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                "because we require features supported only in " + minVersion + " or later.");
    return groupIds.entrySet()
            .stream()
            .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version))
            .collect(Collectors.toList());
}


Thenin doSend(),  use instanceof to check if batching has been use. 

Code Block
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
...
try {
    NodeApiVersions versionInfo = apiVersions.get(nodeId);
    short version;
    // Note: if versionInfo is null, we have no server version information. This would be
    // the case when sending the initial ApiVersionRequest which fetches the version
    // information itself.  It is also the case when discoverBrokerVersions is set to false.
    if (versionInfo == null) {
        version = builder.latestAllowedVersion();
        if (discoverBrokerVersions && log.isTraceEnabled())
            log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                    "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
    } else {
        version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                builder.latestAllowedVersion());
    }
/**
 * Check if there is batching optimization, if there is, break them down and send them out one by one:
 if (builder.build(version) instance AbstractRequest){
    doSend(clientRequest, isInternalRequest, now, builder.build(version));
 } else if (builder.build(version) instance List<T>){
   for (AbstractRequest request: builder.buildFromBatch(version)){
      doSend(clientRequest, isInternalRequest, now, builder.build(version))
    }
 } else {
	log.error(..)
 }
 ...
}


b) An alternative solution is to add extra fields in AbstractRequest.java including Boolean isBatchingEnable() and List<AbstractRequest> buildFromBatch(). 

This way will decouple the two different build functions.


Rejected Alternatives

  • Update the batching logic in KafkaAdmin.java directly instead of modifying the structure of requests and response.

...