Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: KAFKA-7206

Motivation

Today each FindCoordinatorRequest only contains a single consumer id at a time. That means if we have N number of groupIds that need to find their coordinator, we would have to send out N number of requests. This can be optimized by batching up the groupIds and send out fewer requests to save. Therefore we want to extend FindCoordinatorRequest to have multiple consumer ids. Further more, this is the foundation of some later possible optimizationoptimizations(enable batching in describeConsumerGroupsin describeConsumerGroups, batching in deleteConsumerGroupsin deleteConsumerGroups).

Proposed Changes

Send a single FindCoordinatorRequest to a broker asking for coordinators of all consumer groups. These are the changes that need to be make:

  1. Update the Schema in FindCoordinatorRequest to store an array of groupID (with a new FIND_COORDINATOR_REQUEST_V3); modify the class structure to take more than one coordinatorKey (List<String> groupIds). 
  2. Update FindCoordinatorResponse 's class to have a mapping structure (now it only has Node to support one result, we need to support multiple results so something like <GroupId<GroupId, CoordinatorNodeMetadata> CoordinatorNodeMetadata>)
  3. Update handleFindCoordinatorRequest to correctly parse the new FindCoordinatorRequest and map the result to the new new FindCoordinatorResponse.

Public Interfaces

API Changes

...

I suggest that we modify FindCoordinatorRequest.java to support multiple coordinator key by adding a new FIND_COORDINATOR_REQUEST_V3:

Code Block
languagejava
public class FindCoordinatorRequest extends AbstractRequest {
    private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
    private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
	private static final String COORDINATOR_GROUPIDS_KEY_NAME = "coordinator_groupIds"


	private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);

	private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
        new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
                        "for transactional producers, this is the transactional id)"),
        new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));


	/**
 	* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
 	*/
	private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;


	private static final Schema FIND_COORDINATOR_REQUEST_V3 = new Schema(
	new Field(COORDINATOR_GROUPIDS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "All the coordinator ids " +
        "for this request"));

	...


	private final Map<String, String> groupIds; // Use to store (coordinator_key, coordinator_type) pair


	...

FindCoordinatorResponse.java

We also want to modify FindCoordinatorResponsemodify FindCoordinatorResponse.java to support multiple coordinator key by adding a new FIND_COORDINATOR_REQUEST_V3:

Code Block
languagejava
public class FindCoordinatorResponse extends AbstractResponse {
    private static final String COORDINATOR_KEY_NAME = "coordinator";

    // coordinator level field names
    private static final String NODE_ID_KEY_NAME = "node_id";
    private static final String HOST_KEY_NAME = "host";
    private static final String PORT_KEY_NAME = "port";

    private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
            new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));

    private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
            ERROR_CODE,
            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " +
                    "for a consumer group."));

    private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
            THROTTLE_TIME_MS,
            ERROR_CODE,
            ERROR_MESSAGE,
            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));


    private static final Schema FIND_COORDINATOR_RESPONSE_V3 = new Schema(
            THROTTLE_TIME_MS,
			new ArrayOf(
				new Schema(
					GROUP_ID,
        			ERROR_CODE,
      				ERROR_MESSAGE,
        			new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));));

    /**
     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
     */
    private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1;

    public static Schema[] schemaVersions() {
        return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2, FIND_COORDINATOR_RESPONSE_V3};
    }


	...


	private final int throttleTimeMs;
	private final String errorMessage;
	private final Errors error;
	private final Map<String, Node> nodes; //store (groupID, Node)


	...


KafkaApis.scala/handleFindCoordinatorRequest


Code Block
languagescala
def handleFindCoordinatorRequest(request: RequestChannel.Request) {
  val findCoordinatorRequest = request.body[FindCoordinatorRequest]
	
  val groups = describeRequest.groupIds.asScala.map { groupId =>
	...

...


	// find the topicMetadata for each one 
  }
    ...
  def createResponse(requestThrottleMs: Int): AbstractResponse = {
	// Map the coordinator ID to group Ids
	}
    ...
	sendResponseMaybeThrottle(request, createResponse)
}



Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

...

  • FindCoordinatorResponse and FindCoordinatorRequests need to update to V3.
  • Requests that contains a single groupID still can be supported.

Rejected Alternatives

  • Update the batching logic in KafkaAdmin.java directly instead of modifying the structure of requests and response.