THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Table of Contents |
---|
Status
Current state: Under DiscussionSuperseeded by KIP-699
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
...
For example, in FindCoordinatorRequest.java, buillder will have a new buildToList()// (Should come up with a better name) method like this:
Code Block |
---|
public List<FindCoordinatorRequest> buildToList(short version) { if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); return groupIds.entrySet() .stream() .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version)) .collect(Collectors.toList()); } |
Then in NetworkClient.java/doSend(), use instanceof to check if batching has been used.
Code Block |
---|
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
...
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// Check if there is batching optimization, if there is, break them down and send them out one by one:
// We can use an enum to list all the batch-optimized API here.
if (builder.apiKey() == FIND_COORDINATOR){
for (AbstractRequest request: ((FindCoordinatorRequest.Builder) builder).buildToList(version)){
doSend(clientRequest, isInternalRequest, now, request);
}
} else {
doSend(clientRequest, isInternalRequest, now, builder.build(version));
}
...
} |
...
- Update the batching logic in KafkaAdmin.java directly instead of modifying the structure of requests and response.
- Change the builder.build()'s return type to a Collection;