Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state:  Under DiscussionSuperseeded by KIP-699

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Code Block
languagejava
public class FindCoordinatorRequest extends AbstractRequest {
    private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
    private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
	private static final String COORDINATOR_GROUPIDS_KEY_NAME = "coordinator_groupIds"


	private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);

	private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
        new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
                        "for transactional producers, this is the transactional id)"),
        new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));


	/**
 	* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
 	*/
	private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;


	private static final Schema FIND_COORDINATOR_REQUEST_V3 = new Schema(
	new Field(COORDINATOR_GROUPIDS_KEY_NAME, new ArrayOf(STRING), "All the coordinator ids " +
        "for this request"));


	public static Schema[] schemaVersions() {
    	return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2, FIND_COORDINATOR_REQUEST_V3};
	}

	...

    public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
       private final Map<String, String> groupIds; // Use to store (coordinator_key, coordinator_type) pair
       private final short minVersion;

       public Builder(CoordinatorType coordinatorType, String coordinatorKey) {
          super(ApiKeys.FIND_COORDINATOR);
          groupIds.put(coordiateType, coordinatorKey);
          this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0;
       }

      @Override
      public FindCoordinatorRequest build(short version) {
          if (version < minVersion)
            throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                    "because we require features supported only in " + minVersion + " or later.");
          CoordinatorTypereturn coordinatorType = groupIds.keySet().stream().findFirst().get(new FindCoordinatorRequest(coordinatorType(), coordinatorKey(), version);
      }

     String coordinatorKeypublic =List<FindCoordinatorRequest> gruopIds.get(coordinatorType);buildToList(short version){
          returnif new FindCoordinatorRequest(coordinatorType, coordinatorKey, version);
      }

      public List<FindCoordinatorRequest> buildToList(short version){
          if (version < version < minVersion)
              throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                      "because we require features supported only in " + minVersion + " or later.");
          return Collections.singletonList(new FindCoordinatorRequest(coordinatorType, coordinatorKey, version));
      }
     }
	  public String coordinatorKey() {
    	 return groupIds.keySet().stream().findFirst().get();
	  }

      public CoordinatorType coordinatorType() {
        return groupIds.get(coordinatorKey());
      }


    ...
	 public Map groupIds(){
        return groupIds;
     }
    }


    ...
	  private final Map<String, String> groupIds; // Use to store (coordinator_key, coordinator_type) pair


	...

...

a) One solution is to let the request's builder.build() return either ONE request or a LIST of requests. This is backward compatible. We can have a list of one single element.

For example, in FindCoordinatorRequest.java, buillder.build will become something like this: will have a new buildToList() (Should come up with a better name) method like this:

Code Block
public List<FindCoordinatorRequest> buildToList(short version) {
    if (version < minVersion)
        throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                "because we require features supported only in " + minVersion + " or later.");
    return groupIds.entrySet()
            .stream()
            .map(x-> new FindCoordinatorRequest(x.value(), x.key(), version))
            .collect(Collectors.toList());
}


Then in NetworkClient.java/doSend(),  use instanceof to check if batching has been used. 

Code Block
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
...
try {
    NodeApiVersions versionInfo = apiVersions.get(nodeId);
    short version;
    // Note: if versionInfo is null, we have no server version information. This would be
    // the case when sending the initial ApiVersionRequest which fetches the version
    // information itself.  It is also the case when discoverBrokerVersions is set to false.
    if (versionInfo == null) {
        version = builder.latestAllowedVersion();
        if (discoverBrokerVersions && log.isTraceEnabled())
            log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                    "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
    } else {
        version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                builder.latestAllowedVersion());
    }
 // Check if there is batching optimization, if there is, break them down and send them out one by one:
 // We can use an enum to list all the batch-optimized API here.
if (builder.apiKey() == FIND_COORDINATOR){
    for (AbstractRequest request: ((FindCoordinatorRequest.Builder) builder).buildToList(version)){
        doSend(clientRequest, isInternalRequest, now, request);
    }
} else {
    doSend(clientRequest, isInternalRequest, now, builder.build(version));
}
 ...
}

...


Rejected Alternatives

  • Update the batching logic in KafkaAdmin.java directly instead of modifying the structure of requests and response.
  • Change the builder.build()'s return type to a Collection;