Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state:  Under DiscussionSuperseeded by KIP-699

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

For example, in FindCoordinatorRequest.java, buillder will have a new buildToList() //  (Should come up with a better name method name) method like this:

Code Block
public List<FindCoordinatorRequest> buildToList(short version) {
    if (version < minVersion)
        throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                "because we require features supported only in " + minVersion + " or later.");
    return groupIds.entrySet()
            .stream()
            .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version))
            .collect(Collectors.toList());
}


Then in NetworkClient.java/doSend(),  use instanceof to check if batching has been used. 

Code Block
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
...
try {
    NodeApiVersions versionInfo = apiVersions.get(nodeId);
    short version;
    // Note: if versionInfo is null, we have no server version information. This would be
    // the case when sending the initial ApiVersionRequest which fetches the version
    // information itself.  It is also the case when discoverBrokerVersions is set to false.
    if (versionInfo == null) {
        version = builder.latestAllowedVersion();
        if (discoverBrokerVersions && log.isTraceEnabled())
            log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                    "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
    } else {
        version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                builder.latestAllowedVersion());
    }
 // Check if there is batching optimization, if there is, break them down and send them out one by one:
 // We can use an enum to list all the batch-optimized API here.
if (builder.apiKey() == FIND_COORDINATOR){
    for (AbstractRequest request: ((FindCoordinatorRequest.Builder) builder).buildToList(version)){
        doSend(clientRequest, isInternalRequest, now, request);
    }
} else {
    doSend(clientRequest, isInternalRequest, now, builder.build(version));
}
 ...
}

...

  • Update the batching logic in KafkaAdmin.java directly instead of modifying the structure of requests and response.
  • Change the builder.build()'s return type to a Collection;