...
Today each FindCoordinatorRequest only contains a single consumer id at a time. That means if we have N number of groupIds that need to find it's coordinators, we would have to send out N number of requests. This is not as efficient in some scenarios. Therefore we want to extend FindCoordinatorRequest to have multiple consumer ids.
Proposed Changes
Send a single FindCoordinatorRequest to a broker asking for coordinators of all consumer groups. These are the changes that need to be make:
- Update the Schema in FindCoordinatorRequest to store an array of groupID (with a new FIND_COORDINATOR_REQUEST_V3); modify the class structure to take more than one coordinatorKey (List<String> groupIds).
- Update FindCoordinatorResponse 's class to have a mapping structure (now it only has Node to support one result, we need to support multiple results so something like <GroupId, CoordinatorNodeMetadata>)
- Update handleFindCoordinatorRequest to correctly parse the new FindCoordinatorRequest and map the result to the new FindCoordinatorResponse.
Public Interfaces
API Changes
I suggest that we modify FindCoordinatorRequest.java to support multiple coordinator key by adding a new FIND_COORDINATOR_REQUEST_V3:
Code Block | ||
---|---|---|
| ||
public class FindCoordinatorRequest extends AbstractRequest {
private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
private static final String COORDINATOR_GROUPIDS_KEY_NAME = "coordinator_groupIds"
private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);
private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
"for transactional producers, this is the transactional id)"),
new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;
private static final Schema FIND_COORDINATOR_REQUEST_V3 = new Schema(
new Field(COORDINATOR_GROUPIDS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "All the coordinator ids " +
"for this request"));
...
private final Map<String, String> groupIds; // Use to store <coordinator_key, coordinator_type> pair
|
Compatibility, Deprecation, and Migration Plan
...