THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public class FindCoordinatorRequest extends AbstractRequest { private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key"; private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type"; private static final String COORDINATOR_GROUPIDS_KEY_NAME = "coordinator_groupIds" private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID); private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema( new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " + "for transactional producers, this is the transactional id)"), new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)")); /** * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. */ private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1; private static final Schema FIND_COORDINATOR_REQUEST_V3 = new Schema( new Field(COORDINATOR_GROUPIDS_KEY_NAME, new ArrayOf(STRING), "All the coordinator ids " + "for this request")); public static Schema[] schemaVersions() { return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2, FIND_COORDINATOR_REQUEST_V3}; } ... public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> { private final Map<String, String> groupIds; // Use to store (coordinator_key, coordinator_type) pair private final short minVersion; public Builder(CoordinatorType coordinatorType, String coordinatorKey) { super(ApiKeys.FIND_COORDINATOR); this.coordinatorType = coordinatorType; this.coordinatorKey = coordinatorKey;groupIds.put(coordiateType, coordinatorKey) this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0; } @Override public FindCoordinatorRequest build(short version) { if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); return newCoordinatorType FindCoordinatorRequest(coordinatorType, coordinatorKey, version= groupIds.keySet().stream().findFirst().get(); } String coordinatorKey = gruopIds.get(coordinatorType); return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version); } public List<FindCoordinatorRequest> buildToList(short version){ if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); because we require features supported only in " + minVersion + " or later."); return Collections.singletonList(new FindCoordinatorRequest(coordinatorType, coordinatorKey, version)); } } public String coordinatorKey() { return groupIds.keySet().stream().findFirst().get(); } public CoordinatorType coordinatorType() { return CollectionsgroupIds.singletonListget(new FindCoordinatorRequest(coordinatorType, coordinatorKey, versioncoordinatorKey()); } } ... private final Map<String, String> groupIds; // Use to store (coordinator_key, coordinator_type) pair ... |
...
For example, in FindCoordinatorRequest.java, buillder.build will become something like this:
Code Block |
---|
@Override public List<FindCoordinatorRequest> buildbuildToList(short version) { if (version < minVersion) throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in " + minVersion + " or later."); return groupIds.entrySet() .stream() .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version)) .collect(Collectors.toList()); } |
...