...
Table of Contents |
---|
Status
Current state: Under DiscussionSuperseeded by KIP-699
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
...
Code Block | ||
---|---|---|
| ||
public class FindCoordinatorRequest extends AbstractRequest { private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key"; private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type"; private static final String COORDINATOR_GROUPIDS_KEY_NAME = "coordinator_groupIds" private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID); private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema( new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " + "for transactional producers, this is the transactional id)"), new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)")); /** * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. */ private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1; private static final Schema FIND_COORDINATOR_REQUEST_V3 = new Schema( new Field(COORDINATOR_GROUPIDS_KEY_NAME, new ArrayOf(STRING), "All the coordinator ids " + "for this request")); public static Schema[] schemaVersions() { return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2, FIND_COORDINATOR_REQUEST_V3}; } ... public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> { private final Map<String, String> groupIds; // Use to store (coordinator_key, coordinator_type) pair ... private final short minVersion; public Builder(CoordinatorType coordinatorType, String coordinatorKey) { super(ApiKeys.FIND_COORDINATOR); groupIds.put(coordiateType, coordinatorKey); this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0; } @Override public FindCoordinatorRequest build(short version) { if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); return new FindCoordinatorRequest(coordinatorType(), coordinatorKey(), version); } public List<FindCoordinatorRequest> buildToList(short version){ if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); return Collections.singletonList(new FindCoordinatorRequest(coordinatorType, coordinatorKey, version)); } public String coordinatorKey() { return groupIds.keySet().stream().findFirst().get(); } public CoordinatorType coordinatorType() { return groupIds.get(coordinatorKey()); } public Map groupIds(){ return groupIds; } } ... private final Map<String, String> groupIds; // Use to store (coordinator_key, coordinator_type) pair ... |
FindCoordinatorResponse.java
...
One tricky question is, how do we know if a higher version API has a batching optimization. Discussion is Discussions are still required and suggestions are highly encouraged.
a) One solution is to let the request's builder.build() return either ONE request or a LIST of requestrequests. This is backward compatible. We can have a list of one single element.
For example, in FindCoordinatorRequest.java, buillder.build will become something will have a new buildToList() (Should come up with a better name) method like this:
Code Block |
---|
@Override public List<FindCoordinatorRequest> buildbuildToList(short version) { if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); return groupIds.entrySet() .stream() .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version)) .collect(Collectors.toList()); } |
ThenThen in NetworkClient.java/doSend(), use instanceof to check if batching has been useused.
Code Block |
---|
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) { ... try { NodeApiVersions versionInfo = apiVersions.get(nodeId); short version; // Note: if versionInfo is null, we have no server version information. This would be // the case when sending the initial ApiVersionRequest which fetches the version // information itself. It is also the case when discoverBrokerVersions is set to false. if (versionInfo == null) { version = builder.latestAllowedVersion(); if (discoverBrokerVersions && log.isTraceEnabled()) log.trace("No version information found when sending {} with correlation id {} to node {}. " + "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version); } else { version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion()); } /** */ Check if there is batching optimization, if there is, break them down and send them out one by one: if (builder.build(version) instance AbstractRequest){ doSend(clientRequest, isInternalRequest, now, builder.build(version)); } else // We can use an enum to list all the batch-optimized API here. if (builder.buildapiKey(version) instance List<T>== FIND_COORDINATOR){ for (AbstractRequest request: ((FindCoordinatorRequest.Builder) builder).buildFromBatchbuildToList(version)){ doSend(clientRequest, isInternalRequest, now, builder.build(version))request); } } else { log.error(..) doSend(clientRequest, isInternalRequest, now, builder.build(version)); } ... } |
...
This way will decouple the two different build functions.
Rejected Alternatives
- Update the batching logic in KafkaAdmin.java directly instead of modifying the structure of requests and response.
- Change the builder.build()'s return type to a Collection;